package io.streamnative.pulsar.handlers.kop.coordinator.transaction;

import com.fasterxml.jackson.databind.deser.std.StdKeyDeserializer;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionMarkerChannelManager;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
import org.apache.kafka.common.requests.WriteTxnMarkersResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerRequestCompletionHandler.class */
public class TransactionMarkerRequestCompletionHandler {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) TransactionMarkerRequestCompletionHandler.class);
    private Integer brokerId;
    private TransactionStateManager txnStateManager;
    private TransactionMarkerChannelManager txnMarkerChannelManager;
    private List<TransactionMarkerChannelManager.TxnIdAndMarkerEntry> txnIdAndMarkerEntries;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerRequestCompletionHandler$AbortSendingRetryPartitions.class */
    public static class AbortSendingRetryPartitions {
        private AtomicBoolean abortSending;
        private Set<TopicPartition> retryPartitions;

        private AbortSendingRetryPartitions() {
            this.abortSending = new AtomicBoolean(false);
            this.retryPartitions = new HashSet();
        }
    }

    public void onComplete(WriteTxnMarkersResponse writeTxnMarkersResponse) {
        log.info("Received WriteTxnMarker response from node with correlation id $correlationId");
        for (TransactionMarkerChannelManager.TxnIdAndMarkerEntry txnIdAndMarkerEntry : this.txnIdAndMarkerEntries) {
            String transactionalId = txnIdAndMarkerEntry.getTransactionalId();
            WriteTxnMarkersRequest.TxnMarkerEntry entry = txnIdAndMarkerEntry.getEntry();
            Map<TopicPartition, Errors> errors = writeTxnMarkersResponse.errors(entry.producerId());
            if (errors == null) {
                throw new IllegalStateException("WriteTxnMarkerResponse does not contain expected error map for producer id " + entry.producerId());
            }
            ErrorsAndData<Optional<TransactionStateManager.CoordinatorEpochAndTxnMetadata>> transactionState = this.txnStateManager.getTransactionState(transactionalId);
            if (transactionState.hasErrors()) {
                switch (transactionState.getErrors()) {
                    case NOT_COORDINATOR:
                        log.info("I am no longer the coordinator for {}; cancel sending transaction markers {} to the brokers", transactionalId, entry);
                        this.txnMarkerChannelManager.removeMarkersForTxnId(transactionalId);
                        return;
                    case COORDINATOR_LOAD_IN_PROGRESS:
                        log.info("I am loading the transaction partition that contains {} which means the current markers have to be obsoleted; cancel sending transaction markers {} to the brokers", transactionalId, entry);
                        this.txnMarkerChannelManager.removeMarkersForTxnId(transactionalId);
                        return;
                    default:
                        throw new IllegalStateException("Unhandled error " + transactionState.getErrors() + " when fetching current transaction state");
                }
            }
            if (!transactionState.getData().isPresent()) {
                throw new IllegalStateException("The coordinator still owns the transaction partition for " + transactionalId + ", but there is no metadata in the cache; this is not expected");
            }
            AbortSendingRetryPartitions hasAbortSendOrRetryPartitions = hasAbortSendOrRetryPartitions(transactionalId, entry, transactionState, errors);
            if (hasAbortSendOrRetryPartitions.abortSending.get()) {
                return;
            }
            if (hasAbortSendOrRetryPartitions.retryPartitions.isEmpty()) {
                this.txnMarkerChannelManager.maybeWriteTxnCompletion(transactionalId);
                return;
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("Re-enqueuing {} transaction markers for transactional id {} under coordinator epoch {}", entry.transactionResult(), transactionalId, Integer.valueOf(entry.coordinatorEpoch()));
                }
                this.txnMarkerChannelManager.addTxnMarkersToBrokerQueue(transactionalId, Long.valueOf(entry.producerId()), Short.valueOf(entry.producerEpoch()), entry.transactionResult(), Integer.valueOf(entry.coordinatorEpoch()), hasAbortSendOrRetryPartitions.retryPartitions);
            }
        }
    }

    private AbortSendingRetryPartitions hasAbortSendOrRetryPartitions(String str, WriteTxnMarkersRequest.TxnMarkerEntry txnMarkerEntry, ErrorsAndData<Optional<TransactionStateManager.CoordinatorEpochAndTxnMetadata>> errorsAndData, Map<TopicPartition, Errors> map) {
        TransactionStateManager.CoordinatorEpochAndTxnMetadata coordinatorEpochAndTxnMetadata = errorsAndData.getData().get();
        TransactionMetadata transactionMetadata = coordinatorEpochAndTxnMetadata.getTransactionMetadata();
        AbortSendingRetryPartitions abortSendingRetryPartitions = new AbortSendingRetryPartitions();
        if (coordinatorEpochAndTxnMetadata.getCoordinatorEpoch().intValue() != txnMarkerEntry.coordinatorEpoch()) {
            log.info("Transaction coordinator epoch for {} has changed from {} to {}; cancel sending transaction markers {} to the brokers", str, Integer.valueOf(txnMarkerEntry.coordinatorEpoch()), coordinatorEpochAndTxnMetadata.getCoordinatorEpoch(), txnMarkerEntry);
            this.txnMarkerChannelManager.removeMarkersForTxnId(str);
            abortSendingRetryPartitions.abortSending.set(true);
        } else {
            transactionMetadata.inLock(() -> {
                for (Map.Entry entry : map.entrySet()) {
                    TopicPartition topicPartition = (TopicPartition) entry.getKey();
                    Errors errors = (Errors) entry.getValue();
                    switch (AnonymousClass1.$SwitchMap$org$apache$kafka$common$protocol$Errors[errors.ordinal()]) {
                        case 3:
                            transactionMetadata.removePartition(topicPartition);
                            break;
                        case 4:
                        case 5:
                        case 6:
                        case 7:
                            throw new IllegalStateException("Received fatal error " + errors.exceptionName() + " while sending txn marker for " + str);
                        case 8:
                        case 9:
                        case 10:
                        case 11:
                        case 12:
                            log.info("Sending {}'s transaction marker for partition {} has failed with error {}, retrying with current coordinator epoch {}", str, topicPartition, errors.exceptionName(), coordinatorEpochAndTxnMetadata.getCoordinatorEpoch());
                            abortSendingRetryPartitions.retryPartitions.add(topicPartition);
                            break;
                        case 13:
                        case 14:
                            log.info("Sending {}'s transaction marker for partition {} has permanently failed with error {} with the current coordinator epoch {}; cancel sending any more transaction markers {} to the brokers", str, topicPartition, errors.exceptionName(), coordinatorEpochAndTxnMetadata.getCoordinatorEpoch(), txnMarkerEntry);
                            this.txnMarkerChannelManager.removeMarkersForTxnId(str);
                            abortSendingRetryPartitions.abortSending.set(true);
                            break;
                        case StdKeyDeserializer.TYPE_CLASS /* 15 */:
                        case 16:
                            log.info("Sending {}'s transaction marker from partition {} has failed with  {}. This partition will be removed from the set of partitions waiting for completion", str, topicPartition, errors.name());
                            transactionMetadata.removePartition(topicPartition);
                            break;
                        default:
                            throw new IllegalStateException("Unexpected error " + errors.exceptionName() + " while sending txn marker for $transactionalId");
                    }
                }
                return null;
            });
        }
        return abortSendingRetryPartitions;
    }

    public TransactionMarkerRequestCompletionHandler(Integer num, TransactionStateManager transactionStateManager, TransactionMarkerChannelManager transactionMarkerChannelManager, List<TransactionMarkerChannelManager.TxnIdAndMarkerEntry> list) {
        this.brokerId = num;
        this.txnStateManager = transactionStateManager;
        this.txnMarkerChannelManager = transactionMarkerChannelManager;
        this.txnIdAndMarkerEntries = list;
    }
}
