package io.streamnative.pulsar.handlers.kop.coordinator.transaction;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/transaction/ProducerIdManager.class */
public class ProducerIdManager {
    public static final String KOP_PID_BLOCK_ZNODE = "/kop_latest_producer_id_block";
    private final Integer brokerId;
    private final ZooKeeper zkClient;
    private ProducerIdBlock currentProducerIdBlock;
    private Long nextProducerId = -1L;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ProducerIdManager.class);
    private static final Long currentVersion = 1L;
    public static final Long PID_BLOCK_SIZE = 1000L;
    private static final ObjectMapper objectMapper = new ObjectMapper();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/transaction/ProducerIdManager$DataAndVersion.class */
    public static class DataAndVersion {
        private byte[] data;
        private int zkVersion;

        public DataAndVersion(byte[] bArr, int i) {
            this.data = bArr;
            this.zkVersion = i;
        }
    }

    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/transaction/ProducerIdManager$ProducerIdBlock.class */
    public static class ProducerIdBlock {
        private Integer brokerId;
        private Long blockStartId;
        private Long blockEndId;

        /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/transaction/ProducerIdManager$ProducerIdBlock$ProducerIdBlockBuilder.class */
        public static class ProducerIdBlockBuilder {
            private Integer brokerId;
            private Long blockStartId;
            private Long blockEndId;

            ProducerIdBlockBuilder() {
            }

            public ProducerIdBlockBuilder brokerId(Integer num) {
                this.brokerId = num;
                return this;
            }

            public ProducerIdBlockBuilder blockStartId(Long l) {
                this.blockStartId = l;
                return this;
            }

            public ProducerIdBlockBuilder blockEndId(Long l) {
                this.blockEndId = l;
                return this;
            }

            public ProducerIdBlock build() {
                return new ProducerIdBlock(this.brokerId, this.blockStartId, this.blockEndId);
            }

            public String toString() {
                return "ProducerIdManager.ProducerIdBlock.ProducerIdBlockBuilder(brokerId=" + this.brokerId + ", blockStartId=" + this.blockStartId + ", blockEndId=" + this.blockEndId + ")";
            }
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            ProducerIdBlock producerIdBlock = (ProducerIdBlock) obj;
            return Objects.equals(this.brokerId, producerIdBlock.brokerId) && Objects.equals(this.blockStartId, producerIdBlock.blockStartId) && Objects.equals(this.blockEndId, producerIdBlock.blockEndId);
        }

        public int hashCode() {
            return Objects.hash(this.brokerId, this.blockStartId, this.blockEndId);
        }

        public static ProducerIdBlockBuilder builder() {
            return new ProducerIdBlockBuilder();
        }

        public Integer getBrokerId() {
            return this.brokerId;
        }

        public Long getBlockStartId() {
            return this.blockStartId;
        }

        public Long getBlockEndId() {
            return this.blockEndId;
        }

        public void setBrokerId(Integer num) {
            this.brokerId = num;
        }

        public void setBlockStartId(Long l) {
            this.blockStartId = l;
        }

        public void setBlockEndId(Long l) {
            this.blockEndId = l;
        }

        public ProducerIdBlock(Integer num, Long l, Long l2) {
            this.brokerId = num;
            this.blockStartId = l;
            this.blockEndId = l2;
        }

        public String toString() {
            return "ProducerIdManager.ProducerIdBlock(brokerId=" + getBrokerId() + ", blockStartId=" + getBlockStartId() + ", blockEndId=" + getBlockEndId() + ")";
        }
    }

    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/transaction/ProducerIdManager$SetDataResult.class */
    private static class SetDataResult {
        private int zkVersion;

        public SetDataResult(int i) {
            this.zkVersion = i;
        }
    }

    public ProducerIdManager(Integer num, ZooKeeper zooKeeper) {
        this.brokerId = num;
        this.zkClient = zooKeeper;
    }

    public static byte[] generateProducerIdBlockJson(ProducerIdBlock producerIdBlock) throws JsonProcessingException {
        HashMap hashMap = new HashMap();
        hashMap.put(ConsumerProtocol.VERSION_KEY_NAME, currentVersion);
        hashMap.put("broker", producerIdBlock.brokerId);
        hashMap.put("block_start", producerIdBlock.blockStartId);
        hashMap.put("block_end", producerIdBlock.blockEndId);
        return objectMapper.writeValueAsBytes(hashMap);
    }

    public static ProducerIdBlock parseProducerIdBlockData(byte[] bArr) throws IOException {
        JsonNode readTree = objectMapper.readTree(bArr);
        return ProducerIdBlock.builder().brokerId(Integer.valueOf(readTree.get("broker").asInt())).blockStartId(Long.valueOf(readTree.get("block_start").asLong())).blockEndId(Long.valueOf(readTree.get("block_end").asLong())).build();
    }

    private CompletableFuture<Void> getNewProducerIdBlock() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        getDataAndVersion().whenComplete((dataAndVersion, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
                return;
            }
            if (dataAndVersion == null) {
                this.currentProducerIdBlock = new ProducerIdBlock(this.brokerId, 0L, Long.valueOf(PID_BLOCK_SIZE.longValue() - 1));
            } else {
                try {
                    ProducerIdBlock parseProducerIdBlockData = parseProducerIdBlockData(dataAndVersion.data);
                    if (parseProducerIdBlockData.blockEndId.longValue() > Long.MAX_VALUE - PID_BLOCK_SIZE.longValue()) {
                        log.error("Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is {})", parseProducerIdBlockData.blockEndId);
                        completableFuture.completeExceptionally(new Exception("Have exhausted all producerIds."));
                        return;
                    }
                    this.currentProducerIdBlock = new ProducerIdBlock(this.brokerId, Long.valueOf(parseProducerIdBlockData.blockEndId.longValue() + 1), Long.valueOf(parseProducerIdBlockData.blockEndId.longValue() + PID_BLOCK_SIZE.longValue()));
                } catch (Exception e) {
                    log.error("Failed to parse producerIdBlock data.", (Throwable) e);
                    completableFuture.completeExceptionally(e);
                    return;
                }
            }
            try {
                conditionalSetData(generateProducerIdBlockJson(this.currentProducerIdBlock), dataAndVersion == null ? -1 : dataAndVersion.zkVersion).whenComplete((setDataResult, th) -> {
                    if (th != null) {
                        log.error("Failed to set new producerId block.", th);
                        completableFuture.completeExceptionally(th);
                    } else {
                        log.info("Acquire new producerId block {} by writing to Zk with path version {}", this.currentProducerIdBlock, Integer.valueOf(setDataResult.zkVersion));
                        completableFuture.complete(null);
                    }
                });
            } catch (JsonProcessingException e2) {
                log.error("Failed to generate producerIdBlock json bytes data. pidBlock: {}", this.currentProducerIdBlock, e2);
            }
        });
        return completableFuture;
    }

    public CompletableFuture<DataAndVersion> getDataAndVersion() {
        final CompletableFuture<DataAndVersion> completableFuture = new CompletableFuture<>();
        try {
            this.zkClient.getData(KOP_PID_BLOCK_ZNODE, (Watcher) null, new AsyncCallback.DataCallback() { // from class: io.streamnative.pulsar.handlers.kop.coordinator.transaction.ProducerIdManager.1
                public void processResult(int i, String str, Object obj, byte[] bArr, Stat stat) {
                    if (i != KeeperException.Code.OK.intValue() || bArr == null) {
                        completableFuture.complete(null);
                    } else {
                        completableFuture.complete(new DataAndVersion(bArr, stat.getVersion()));
                    }
                }
            }, (Object) null);
        } catch (Exception e) {
            log.error("Failed to get producerId block data.", (Throwable) e);
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    private CompletableFuture<SetDataResult> conditionalSetData(final byte[] bArr, int i) {
        final CompletableFuture<SetDataResult> completableFuture = new CompletableFuture<>();
        this.zkClient.setData(KOP_PID_BLOCK_ZNODE, bArr, i, new AsyncCallback.StatCallback() { // from class: io.streamnative.pulsar.handlers.kop.coordinator.transaction.ProducerIdManager.2
            public void processResult(int i2, String str, Object obj, Stat stat) {
                if (i2 == KeeperException.Code.OK.intValue()) {
                    completableFuture.complete(new SetDataResult(stat.getVersion()));
                    return;
                }
                if (i2 == KeeperException.Code.BADVERSION.intValue()) {
                    CompletableFuture checkProducerIdBlockZkData = ProducerIdManager.this.checkProducerIdBlockZkData(ProducerIdManager.this.zkClient, bArr);
                    CompletableFuture completableFuture2 = completableFuture;
                    checkProducerIdBlockZkData.whenComplete((setDataResult, th) -> {
                        if (th != null) {
                            completableFuture2.completeExceptionally(th);
                        } else {
                            completableFuture2.complete(setDataResult);
                        }
                    });
                } else if (i2 == KeeperException.Code.NONODE.intValue()) {
                    ProducerIdManager.log.error("Update of path {} with data {} and expected version {} failed due to {}", ProducerIdManager.KOP_PID_BLOCK_ZNODE, ProducerIdManager.this.getProducerIdBlockStr(bArr), Integer.valueOf(stat.getVersion()), "NoNode for path /kop_latest_producer_id_block");
                    completableFuture.completeExceptionally(new Exception("NoNode for path /kop_latest_producer_id_block"));
                } else {
                    ProducerIdManager.log.error("Update of path {} with data {} and expected version {} keeperException code {}", ProducerIdManager.KOP_PID_BLOCK_ZNODE, ProducerIdManager.this.getProducerIdBlockStr(bArr), Integer.valueOf(stat.getVersion()), Integer.valueOf(i2));
                    completableFuture.completeExceptionally(new Exception("KeeperException code " + i2));
                }
            }
        }, (Object) null);
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public CompletableFuture<SetDataResult> checkProducerIdBlockZkData(ZooKeeper zooKeeper, byte[] bArr) {
        final CompletableFuture<SetDataResult> completableFuture = new CompletableFuture<>();
        try {
            final ProducerIdBlock parseProducerIdBlockData = parseProducerIdBlockData(bArr);
            zooKeeper.getData(KOP_PID_BLOCK_ZNODE, (Watcher) null, new AsyncCallback.DataCallback() { // from class: io.streamnative.pulsar.handlers.kop.coordinator.transaction.ProducerIdManager.3
                public void processResult(int i, String str, Object obj, byte[] bArr2, Stat stat) {
                    try {
                        if (parseProducerIdBlockData.equals(ProducerIdManager.parseProducerIdBlockData(bArr2))) {
                            completableFuture.complete(new SetDataResult(stat.getVersion()));
                        } else {
                            completableFuture.completeExceptionally(new Exception(""));
                        }
                    } catch (IOException e) {
                        ProducerIdManager.log.error("Failed to parse producerIdBlock data {}.", ProducerIdManager.this.getProducerIdBlockStr(bArr2), e);
                        completableFuture.completeExceptionally(e);
                    }
                }
            }, (Object) null);
        } catch (Exception e) {
            log.error("Error while checking for producerId block Zk data on path {}: expected data {}", KOP_PID_BLOCK_ZNODE, getProducerIdBlockStr(bArr), e);
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    private CompletableFuture<Void> makeSurePathExists() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.zkClient.create(KOP_PID_BLOCK_ZNODE, (byte[]) null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (i, str, obj, str2) -> {
            if (i == KeeperException.Code.OK.intValue() || i == KeeperException.Code.NODEEXISTS.intValue()) {
                completableFuture.complete(null);
            } else {
                completableFuture.completeExceptionally(new Exception("Failed to create path /kop_latest_producer_id_block keeperException code " + i));
            }
        }, (Object) null);
        return completableFuture;
    }

    public CompletableFuture<Void> initialize() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        makeSurePathExists().thenCompose(r3 -> {
            return getNewProducerIdBlock();
        }).thenAccept((Consumer<? super U>) r5 -> {
            this.nextProducerId = this.currentProducerIdBlock.blockStartId;
            completableFuture.complete(null);
        }).exceptionally(th -> {
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    public synchronized CompletableFuture<Long> generateProducerId() {
        CompletableFuture<Long> completableFuture = new CompletableFuture<>();
        if (this.nextProducerId.longValue() > this.currentProducerIdBlock.blockEndId.longValue()) {
            getNewProducerIdBlock().whenComplete((r8, th) -> {
                if (th != null) {
                    completableFuture.completeExceptionally(th);
                } else {
                    this.nextProducerId = Long.valueOf(this.currentProducerIdBlock.blockStartId.longValue() + 1);
                    completableFuture.complete(Long.valueOf(this.nextProducerId.longValue() - 1));
                }
            });
        } else {
            this.nextProducerId = Long.valueOf(this.nextProducerId.longValue() + 1);
            completableFuture.complete(Long.valueOf(this.nextProducerId.longValue() - 1));
        }
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getProducerIdBlockStr(byte[] bArr) {
        return new String(bArr, StandardCharsets.UTF_8);
    }
}
