package io.streamnative.pulsar.handlers.kop.coordinator.transaction;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import io.streamnative.pulsar.handlers.kop.KopBrokerLookupManager;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionMetadata;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.ssl.SSLUtils;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.netty.ChannelFutures;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManager.class */
public class TransactionMarkerChannelManager {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) TransactionMarkerChannelManager.class);
    private final KafkaServiceConfiguration kafkaConfig;
    private final EventLoopGroup eventLoopGroup;
    private final boolean enableTls;
    private final SslContextFactory sslContextFactory;
    private final KopBrokerLookupManager kopBrokerLookupManager;
    private final Bootstrap bootstrap;
    private TransactionStateManager txnStateManager;
    private Map<InetSocketAddress, CompletableFuture<TransactionMarkerChannelHandler>> handlerMap = new HashMap();
    private ConcurrentHashMap<String, PendingCompleteTxn> transactionsWithPendingMarkers = new ConcurrentHashMap<>();
    private Map<InetSocketAddress, TxnMarkerQueue> markersQueuePerBroker = new ConcurrentHashMap();
    private TxnMarkerQueue markersQueueForUnknownBroker = new TxnMarkerQueue(null);
    private BlockingQueue<PendingCompleteTxn> txnLogAppendRetryQueue = new LinkedBlockingQueue();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManager$PendingCompleteTxn.class */
    public static class PendingCompleteTxn {
        private final String transactionalId;
        private final Integer coordinatorEpoch;
        private final TransactionMetadata txnMetadata;
        private final TransactionMetadata.TxnTransitMetadata newMetadata;

        public PendingCompleteTxn(String str, Integer num, TransactionMetadata transactionMetadata, TransactionMetadata.TxnTransitMetadata txnTransitMetadata) {
            this.transactionalId = str;
            this.coordinatorEpoch = num;
            this.txnMetadata = transactionMetadata;
            this.newMetadata = txnTransitMetadata;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManager$TxnIdAndMarkerEntry.class */
    public static class TxnIdAndMarkerEntry {
        private final String transactionalId;
        private final WriteTxnMarkersRequest.TxnMarkerEntry entry;

        public String getTransactionalId() {
            return this.transactionalId;
        }

        public WriteTxnMarkersRequest.TxnMarkerEntry getEntry() {
            return this.entry;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof TxnIdAndMarkerEntry)) {
                return false;
            }
            TxnIdAndMarkerEntry txnIdAndMarkerEntry = (TxnIdAndMarkerEntry) obj;
            if (!txnIdAndMarkerEntry.canEqual(this)) {
                return false;
            }
            String transactionalId = getTransactionalId();
            String transactionalId2 = txnIdAndMarkerEntry.getTransactionalId();
            if (transactionalId == null) {
                if (transactionalId2 != null) {
                    return false;
                }
            } else if (!transactionalId.equals(transactionalId2)) {
                return false;
            }
            WriteTxnMarkersRequest.TxnMarkerEntry entry = getEntry();
            WriteTxnMarkersRequest.TxnMarkerEntry entry2 = txnIdAndMarkerEntry.getEntry();
            return entry == null ? entry2 == null : entry.equals(entry2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof TxnIdAndMarkerEntry;
        }

        public int hashCode() {
            String transactionalId = getTransactionalId();
            int hashCode = (1 * 59) + (transactionalId == null ? 43 : transactionalId.hashCode());
            WriteTxnMarkersRequest.TxnMarkerEntry entry = getEntry();
            return (hashCode * 59) + (entry == null ? 43 : entry.hashCode());
        }

        public String toString() {
            return "TransactionMarkerChannelManager.TxnIdAndMarkerEntry(transactionalId=" + getTransactionalId() + ", entry=" + getEntry() + ")";
        }

        public TxnIdAndMarkerEntry(String str, WriteTxnMarkersRequest.TxnMarkerEntry txnMarkerEntry) {
            this.transactionalId = str;
            this.entry = txnMarkerEntry;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManager$TxnMarkerQueue.class */
    public static class TxnMarkerQueue {
        private final InetSocketAddress address;
        private final Map<Integer, BlockingQueue<TxnIdAndMarkerEntry>> markersPerPartition = new ConcurrentHashMap();

        public TxnMarkerQueue(InetSocketAddress inetSocketAddress) {
            this.address = inetSocketAddress;
        }

        public BlockingQueue<TxnIdAndMarkerEntry> removeMarkersForTxnTopicPartition(Integer num) {
            return this.markersPerPartition.remove(num);
        }

        public void addMarkers(Integer num, TxnIdAndMarkerEntry txnIdAndMarkerEntry) {
            this.markersPerPartition.computeIfAbsent(num, num2 -> {
                return new BlockingArrayQueue();
            }).add(txnIdAndMarkerEntry);
        }

        public void getMarkerEntries(List<TxnIdAndMarkerEntry> list) {
            Iterator<BlockingQueue<TxnIdAndMarkerEntry>> it = this.markersPerPartition.values().iterator();
            while (it.hasNext()) {
                it.next().drainTo(list);
            }
        }
    }

    public TransactionMarkerChannelManager(KafkaServiceConfiguration kafkaServiceConfiguration, TransactionStateManager transactionStateManager, KopBrokerLookupManager kopBrokerLookupManager, boolean z) {
        this.kafkaConfig = kafkaServiceConfiguration;
        this.txnStateManager = transactionStateManager;
        this.kopBrokerLookupManager = kopBrokerLookupManager;
        this.enableTls = z;
        if (this.enableTls) {
            this.sslContextFactory = SSLUtils.createSslContextFactory(kafkaServiceConfiguration);
        } else {
            this.sslContextFactory = null;
        }
        this.eventLoopGroup = new NioEventLoopGroup();
        this.bootstrap = new Bootstrap();
        this.bootstrap.group(this.eventLoopGroup);
        this.bootstrap.channel(NioSocketChannel.class);
        this.bootstrap.handler(new TransactionMarkerChannelInitializer(kafkaServiceConfiguration, z));
        new Thread(() -> {
            while (true) {
                drainQueuedTransactionMarkers();
                try {
                    Thread.sleep(1L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    public CompletableFuture<TransactionMarkerChannelHandler> getChannel(InetSocketAddress inetSocketAddress) {
        return this.handlerMap.computeIfAbsent(inetSocketAddress, inetSocketAddress2 -> {
            CompletableFuture completableFuture = new CompletableFuture();
            ChannelFutures.toCompletableFuture(this.bootstrap.connect(inetSocketAddress)).thenAccept(channel -> {
                completableFuture.complete(channel.pipeline().get("txnHandler"));
            }).exceptionally(th -> {
                completableFuture.completeExceptionally(th);
                return null;
            });
            return completableFuture;
        });
    }

    public void addTxnMarkersToSend(Integer num, TransactionResult transactionResult, TransactionMetadata transactionMetadata, TransactionMetadata.TxnTransitMetadata txnTransitMetadata) {
        String transactionalId = transactionMetadata.getTransactionalId();
        this.transactionsWithPendingMarkers.put(transactionalId, new PendingCompleteTxn(transactionalId, num, transactionMetadata, txnTransitMetadata));
        addTxnMarkersToBrokerQueue(transactionalId, Long.valueOf(transactionMetadata.getProducerId()), Short.valueOf(transactionMetadata.getProducerEpoch()), transactionResult, num, transactionMetadata.getTopicPartitions());
        maybeWriteTxnCompletion(transactionalId);
    }

    private boolean hasPendingMarkersToWrite(TransactionMetadata transactionMetadata) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        transactionMetadata.inLock(() -> {
            atomicBoolean.set(!transactionMetadata.getTopicPartitions().isEmpty());
            return null;
        });
        return atomicBoolean.get();
    }

    public void maybeWriteTxnCompletion(String str) {
        PendingCompleteTxn pendingCompleteTxn = this.transactionsWithPendingMarkers.get(str);
        if (hasPendingMarkersToWrite(pendingCompleteTxn.txnMetadata) || !this.transactionsWithPendingMarkers.remove(str, pendingCompleteTxn)) {
            return;
        }
        writeTxnCompletion(pendingCompleteTxn);
    }

    public void addTxnMarkersToBrokerQueue(String str, Long l, Short sh, TransactionResult transactionResult, Integer num, Set<TopicPartition> set) {
        Integer valueOf = Integer.valueOf(this.txnStateManager.partitionFor(str));
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (TopicPartition topicPartition : set) {
            CompletableFuture<InetSocketAddress> findBroker = this.kopBrokerLookupManager.findBroker(new KopTopic(topicPartition.topic()).getPartitionName(topicPartition.partition()));
            CompletableFuture completableFuture = new CompletableFuture();
            arrayList2.add(completableFuture);
            findBroker.whenComplete((inetSocketAddress, th) -> {
                if (th == null) {
                    concurrentHashMap.compute(inetSocketAddress, (inetSocketAddress, list) -> {
                        if (list == null) {
                            list = new ArrayList();
                        }
                        list.add(topicPartition);
                        return list;
                    });
                    completableFuture.complete(null);
                } else {
                    log.warn("Failed to find broker for topic partition {}", topicPartition, th);
                    arrayList.add(topicPartition);
                    completableFuture.completeExceptionally(th);
                }
            });
        }
        FutureUtil.waitForAll(arrayList2).whenComplete((r18, th2) -> {
            concurrentHashMap.forEach((inetSocketAddress2, list) -> {
                this.markersQueuePerBroker.computeIfAbsent(inetSocketAddress2, inetSocketAddress2 -> {
                    return new TxnMarkerQueue(inetSocketAddress2);
                }).addMarkers(valueOf, new TxnIdAndMarkerEntry(str, new WriteTxnMarkersRequest.TxnMarkerEntry(l.longValue(), sh.shortValue(), num.intValue(), transactionResult, list)));
            });
            if (arrayList.size() > 0) {
                this.markersQueueForUnknownBroker.addMarkers(valueOf, new TxnIdAndMarkerEntry(str, new WriteTxnMarkersRequest.TxnMarkerEntry(l.longValue(), sh.shortValue(), num.intValue(), transactionResult, arrayList)));
            }
        });
    }

    private void writeTxnCompletion(PendingCompleteTxn pendingCompleteTxn) {
        String str = pendingCompleteTxn.transactionalId;
        TransactionMetadata transactionMetadata = pendingCompleteTxn.txnMetadata;
        TransactionMetadata.TxnTransitMetadata txnTransitMetadata = pendingCompleteTxn.newMetadata;
        int intValue = pendingCompleteTxn.coordinatorEpoch.intValue();
        log.info("Completed sending transaction markers for {}; begin transition to {}", str, txnTransitMetadata.getTxnState());
        ErrorsAndData<Optional<TransactionStateManager.CoordinatorEpochAndTxnMetadata>> transactionState = this.txnStateManager.getTransactionState(str);
        if (transactionState.hasErrors()) {
            switch (transactionState.getErrors()) {
                case NOT_COORDINATOR:
                    log.info("No longer the coordinator for {} with coordinator epoch {}; cancel appending {} to transaction log", str, Integer.valueOf(intValue), txnTransitMetadata);
                    return;
                case COORDINATOR_LOAD_IN_PROGRESS:
                    log.info("Loading the transaction partition that contains {} while my current coordinator epoch is {}; so cancel appending {} to transaction log since the loading process will continue the remaining work", str, Integer.valueOf(intValue), txnTransitMetadata);
                    return;
                default:
                    throw new IllegalStateException("Unhandled error {} when fetching current transaction state", transactionState.getErrors().exception());
            }
        }
        if (!transactionState.getData().isPresent()) {
            throw new IllegalStateException(String.format("The coordinator still owns the transaction partition for %s, but there is no metadata in the cache; this is not expected", str));
        }
        TransactionStateManager.CoordinatorEpochAndTxnMetadata coordinatorEpochAndTxnMetadata = transactionState.getData().get();
        if (coordinatorEpochAndTxnMetadata.getCoordinatorEpoch().intValue() != intValue) {
            log.info("The cached metadata {} has changed to {} after completed sending the markers with coordinator epoch {}; abort transiting the metadata to {} as it may have been updated by another process", transactionMetadata, coordinatorEpochAndTxnMetadata, Integer.valueOf(intValue), txnTransitMetadata);
        } else {
            log.debug("Sending {}'s transaction markers for {} with coordinator epoch {} succeeded, trying to append complete transaction log now", str, transactionMetadata, Integer.valueOf(intValue));
            tryAppendToLog(new PendingCompleteTxn(str, Integer.valueOf(intValue), transactionMetadata, txnTransitMetadata));
        }
    }

    private void tryAppendToLog(final PendingCompleteTxn pendingCompleteTxn) {
        this.txnStateManager.appendTransactionToLog(pendingCompleteTxn.transactionalId, pendingCompleteTxn.coordinatorEpoch.intValue(), pendingCompleteTxn.newMetadata, new TransactionStateManager.ResponseCallback() { // from class: io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionMarkerChannelManager.1
            @Override // io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager.ResponseCallback
            public void complete() {
                TransactionMarkerChannelManager.log.info("Completed transaction for {} with coordinator epoch {}, final state after commit: {}", pendingCompleteTxn.transactionalId, pendingCompleteTxn.coordinatorEpoch, pendingCompleteTxn.txnMetadata.getState());
            }

            @Override // io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager.ResponseCallback
            public void fail(Errors errors) {
                switch (AnonymousClass2.$SwitchMap$org$apache$kafka$common$protocol$Errors[errors.ordinal()]) {
                    case 1:
                        TransactionMarkerChannelManager.log.info("No longer the coordinator for transactionalId: {} while trying to append to transaction log, skip writing to transaction log", pendingCompleteTxn.transactionalId);
                        return;
                    case 2:
                        TransactionMarkerChannelManager.log.info("Coordinator is loading the partition {} and hence cannot complete append of {}; skip writing to transaction log as the loading process should complete it", Integer.valueOf(TransactionMarkerChannelManager.this.txnStateManager.partitionFor(pendingCompleteTxn.transactionalId)), pendingCompleteTxn);
                        return;
                    case 3:
                        TransactionMarkerChannelManager.log.info("Not available to append {}: possible causes include {}, {}, {} and {}; retry appending", pendingCompleteTxn, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NOT_ENOUGH_REPLICAS, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, Errors.REQUEST_TIMED_OUT);
                        TransactionMarkerChannelManager.this.txnLogAppendRetryQueue.add(pendingCompleteTxn);
                        return;
                    default:
                        throw new IllegalStateException(String.format("Unexpected error %s while appending to transaction log for %s", errors.exceptionName(), pendingCompleteTxn.transactionalId));
                }
            }
        }, null);
    }

    public void removeMarkersForTxnId(String str) {
        this.transactionsWithPendingMarkers.remove(str);
    }

    private void retryLogAppends() {
        ArrayList<PendingCompleteTxn> arrayList = new ArrayList();
        this.txnLogAppendRetryQueue.drainTo(arrayList);
        for (PendingCompleteTxn pendingCompleteTxn : arrayList) {
            if (log.isDebugEnabled()) {
                log.debug("Retry appending {} transaction log", pendingCompleteTxn);
            }
            tryAppendToLog(pendingCompleteTxn);
        }
    }

    private void drainQueuedTransactionMarkers() {
        retryLogAppends();
        ArrayList<TxnIdAndMarkerEntry> arrayList = new ArrayList();
        this.markersQueueForUnknownBroker.getMarkerEntries(arrayList);
        for (TxnIdAndMarkerEntry txnIdAndMarkerEntry : arrayList) {
            String transactionalId = txnIdAndMarkerEntry.getTransactionalId();
            long producerId = txnIdAndMarkerEntry.getEntry().producerId();
            short producerEpoch = txnIdAndMarkerEntry.getEntry().producerEpoch();
            addTxnMarkersToBrokerQueue(transactionalId, Long.valueOf(producerId), Short.valueOf(producerEpoch), txnIdAndMarkerEntry.getEntry().transactionResult(), Integer.valueOf(txnIdAndMarkerEntry.getEntry().coordinatorEpoch()), new HashSet(txnIdAndMarkerEntry.getEntry().partitions()));
        }
        for (TxnMarkerQueue txnMarkerQueue : this.markersQueuePerBroker.values()) {
            arrayList.clear();
            txnMarkerQueue.getMarkerEntries(arrayList);
            if (!arrayList.isEmpty()) {
                getChannel(txnMarkerQueue.address).whenComplete((transactionMarkerChannelHandler, th) -> {
                    ArrayList arrayList2 = new ArrayList();
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        arrayList2.add(((TxnIdAndMarkerEntry) it.next()).entry);
                    }
                    transactionMarkerChannelHandler.enqueueRequest(new WriteTxnMarkersRequest.Builder(arrayList2).build(), new TransactionMarkerRequestCompletionHandler(0, this.txnStateManager, this, arrayList));
                });
            }
        }
    }
}
