package io.streamnative.pulsar.handlers.kop.coordinator.transaction;

import io.streamnative.pulsar.handlers.kop.KafkaRequestHandler;
import io.streamnative.pulsar.handlers.kop.KopBrokerLookupManager;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionMetadata;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager;
import io.streamnative.pulsar.handlers.kop.utils.ProducerIdAndEpoch;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.class */
public class TransactionCoordinator {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) TransactionCoordinator.class);
    private final TransactionConfig transactionConfig;
    private final ProducerIdManager producerIdManager;
    private final TransactionStateManager txnManager;
    private final TransactionMarkerChannelManager transactionMarkerChannelManager;
    private final Map<TopicName, NavigableMap<Long, Long>> activeOffsetPidMap = new HashMap();
    private final Map<TopicName, ConcurrentHashMap<Long, Long>> activePidOffsetMap = new HashMap();
    private final List<AbortedIndexEntry> abortedIndexList = new ArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator$EndTxnCallback.class */
    public interface EndTxnCallback {
        void complete(Errors errors);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator$EpochAndTxnTransitMetadata.class */
    public static class EpochAndTxnTransitMetadata {
        private final int coordinatorEpoch;
        private final TransactionMetadata.TxnTransitMetadata txnTransitMetadata;

        public EpochAndTxnTransitMetadata(int i, TransactionMetadata.TxnTransitMetadata txnTransitMetadata) {
            this.coordinatorEpoch = i;
            this.txnTransitMetadata = txnTransitMetadata;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator$PreSendResult.class */
    public static class PreSendResult {
        private TransactionMetadata transactionMetadata;
        private TransactionMetadata.TxnTransitMetadata txnTransitMetadata;

        public PreSendResult(TransactionMetadata transactionMetadata, TransactionMetadata.TxnTransitMetadata txnTransitMetadata) {
            this.transactionMetadata = transactionMetadata;
            this.txnTransitMetadata = txnTransitMetadata;
        }

        public TransactionMetadata getTransactionMetadata() {
            return this.transactionMetadata;
        }

        public TransactionMetadata.TxnTransitMetadata getTxnTransitMetadata() {
            return this.txnTransitMetadata;
        }

        public void setTransactionMetadata(TransactionMetadata transactionMetadata) {
            this.transactionMetadata = transactionMetadata;
        }

        public void setTxnTransitMetadata(TransactionMetadata.TxnTransitMetadata txnTransitMetadata) {
            this.txnTransitMetadata = txnTransitMetadata;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof PreSendResult)) {
                return false;
            }
            PreSendResult preSendResult = (PreSendResult) obj;
            if (!preSendResult.canEqual(this)) {
                return false;
            }
            TransactionMetadata transactionMetadata = getTransactionMetadata();
            TransactionMetadata transactionMetadata2 = preSendResult.getTransactionMetadata();
            if (transactionMetadata == null) {
                if (transactionMetadata2 != null) {
                    return false;
                }
            } else if (!transactionMetadata.equals(transactionMetadata2)) {
                return false;
            }
            TransactionMetadata.TxnTransitMetadata txnTransitMetadata = getTxnTransitMetadata();
            TransactionMetadata.TxnTransitMetadata txnTransitMetadata2 = preSendResult.getTxnTransitMetadata();
            return txnTransitMetadata == null ? txnTransitMetadata2 == null : txnTransitMetadata.equals(txnTransitMetadata2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof PreSendResult;
        }

        public int hashCode() {
            TransactionMetadata transactionMetadata = getTransactionMetadata();
            int hashCode = (1 * 59) + (transactionMetadata == null ? 43 : transactionMetadata.hashCode());
            TransactionMetadata.TxnTransitMetadata txnTransitMetadata = getTxnTransitMetadata();
            return (hashCode * 59) + (txnTransitMetadata == null ? 43 : txnTransitMetadata.hashCode());
        }

        public String toString() {
            return "TransactionCoordinator.PreSendResult(transactionMetadata=" + getTransactionMetadata() + ", txnTransitMetadata=" + getTxnTransitMetadata() + ")";
        }
    }

    private TransactionCoordinator(TransactionConfig transactionConfig, Integer num, ZooKeeper zooKeeper, KopBrokerLookupManager kopBrokerLookupManager) {
        this.transactionConfig = transactionConfig;
        this.txnManager = new TransactionStateManager(transactionConfig);
        this.producerIdManager = new ProducerIdManager(num, zooKeeper);
        this.transactionMarkerChannelManager = new TransactionMarkerChannelManager(null, this.txnManager, kopBrokerLookupManager, false);
    }

    public static TransactionCoordinator of(TransactionConfig transactionConfig, Integer num, ZooKeeper zooKeeper, KopBrokerLookupManager kopBrokerLookupManager) {
        return new TransactionCoordinator(transactionConfig, num, zooKeeper, kopBrokerLookupManager);
    }

    public CompletableFuture<Void> loadTransactionMetadata(int i) {
        return this.txnManager.loadTransactionsForTxnTopicPartition(i, -1, (i2, transactionResult, transactionMetadata, txnTransitMetadata) -> {
        });
    }

    public CompletableFuture<Void> startup() {
        return this.producerIdManager.initialize();
    }

    public int partitionFor(String str) {
        return partitionFor(str, this.transactionConfig.getTransactionLogNumPartitions());
    }

    public static int partitionFor(String str, int i) {
        return MathUtils.signSafeMod(str.hashCode(), i);
    }

    public String getTopicPartitionName() {
        return this.transactionConfig.getTransactionMetadataTopicName();
    }

    public String getTopicPartitionName(int i) {
        return getTopicPartitionName(getTopicPartitionName(), i);
    }

    public static String getTopicPartitionName(String str, int i) {
        return str + "-partition-" + i;
    }

    public void handleInitProducerId(String str, int i, Optional<ProducerIdAndEpoch> optional, KafkaRequestHandler kafkaRequestHandler, CompletableFuture<AbstractResponse> completableFuture) {
        if (str == null) {
            this.producerIdManager.generateProducerId().whenComplete((l, th) -> {
                if (th != null) {
                    completableFuture.complete(new InitProducerIdResponse(0, Errors.UNKNOWN_SERVER_ERROR, l.longValue(), (short) 0));
                } else {
                    completableFuture.complete(new InitProducerIdResponse(0, Errors.NONE, l.longValue(), (short) 0));
                }
            });
            return;
        }
        if (StringUtils.isEmpty(str)) {
            completableFuture.complete(new InitProducerIdResponse(0, Errors.INVALID_REQUEST));
            return;
        }
        if (!this.txnManager.validateTransactionTimeoutMs(i)) {
            completableFuture.complete(new InitProducerIdResponse(0, Errors.INVALID_TRANSACTION_TIMEOUT));
            return;
        }
        ErrorsAndData<Optional<TransactionStateManager.CoordinatorEpochAndTxnMetadata>> transactionState = this.txnManager.getTransactionState(str);
        CompletableFuture completableFuture2 = new CompletableFuture();
        if (transactionState.getData().isPresent()) {
            completableFuture2.complete(transactionState);
        } else {
            this.producerIdManager.generateProducerId().whenComplete((l2, th2) -> {
                if (th2 != null) {
                    completableFuture.complete(new InitProducerIdResponse(0, Errors.UNKNOWN_SERVER_ERROR, -1L, (short) 0));
                } else {
                    completableFuture2.complete(this.txnManager.putTransactionStateIfNotExists(TransactionMetadata.builder().transactionalId(str).producerId(l2.longValue()).producerEpoch((short) 0).state(TransactionState.EMPTY).topicPartitions(new HashSet()).build()));
                }
            });
        }
        completableFuture2.whenComplete((errorsAndData, th3) -> {
            int intValue = ((TransactionStateManager.CoordinatorEpochAndTxnMetadata) ((Optional) errorsAndData.getData()).get()).getCoordinatorEpoch().intValue();
            TransactionMetadata transactionMetadata = ((TransactionStateManager.CoordinatorEpochAndTxnMetadata) ((Optional) errorsAndData.getData()).get()).getTransactionMetadata();
            transactionMetadata.inLock(() -> {
                prepareInitProducerIdTransit(str, Integer.valueOf(i), Integer.valueOf(intValue), transactionMetadata, optional).whenComplete((errorsAndData, th3) -> {
                    completeInitProducer(str, intValue, errorsAndData, th3, kafkaRequestHandler, completableFuture);
                });
                return null;
            });
        });
    }

    private void completeInitProducer(final String str, int i, ErrorsAndData<EpochAndTxnTransitMetadata> errorsAndData, Throwable th, KafkaRequestHandler kafkaRequestHandler, final CompletableFuture<AbstractResponse> completableFuture) {
        if (errorsAndData.hasErrors()) {
            initTransactionError(completableFuture, errorsAndData.getErrors());
            return;
        }
        if (th != null) {
            log.error("Failed to init producerId.", th);
            initTransactionError(completableFuture, Errors.forException(th));
            return;
        }
        final TransactionMetadata.TxnTransitMetadata txnTransitMetadata = errorsAndData.getData().txnTransitMetadata;
        if (errorsAndData.getData().txnTransitMetadata.getTxnState() == TransactionState.PREPARE_EPOCH_FENCE) {
            endTransaction(str, Long.valueOf(txnTransitMetadata.getProducerId()), Short.valueOf(txnTransitMetadata.getProducerEpoch()), TransactionResult.ABORT, false, kafkaRequestHandler, errors -> {
                if (errors != Errors.NONE) {
                    initTransactionError(completableFuture, errors);
                } else {
                    initTransactionError(completableFuture, Errors.CONCURRENT_TRANSACTIONS);
                }
            });
        } else {
            this.txnManager.appendTransactionToLog(str, i, txnTransitMetadata, new TransactionStateManager.ResponseCallback() { // from class: io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator.1
                @Override // io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager.ResponseCallback
                public void complete() {
                    TransactionCoordinator.log.info("Initialized transactionalId {} with producerId {} and producer epoch {} on partition {}-{}", str, Long.valueOf(txnTransitMetadata.getProducerId()), Short.valueOf(txnTransitMetadata.getProducerEpoch()), Topic.TRANSACTION_STATE_TOPIC_NAME, Integer.valueOf(TransactionCoordinator.this.txnManager.partitionFor(str)));
                    completableFuture.complete(new InitProducerIdResponse(0, Errors.NONE, txnTransitMetadata.getProducerId(), txnTransitMetadata.getProducerEpoch()));
                }

                @Override // io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager.ResponseCallback
                public void fail(Errors errors2) {
                    TransactionCoordinator.log.info("Returning {} error code to client for {}'s InitProducerId request", errors2, str);
                    TransactionCoordinator.this.initTransactionError(completableFuture, errors2);
                }
            }, errors2 -> {
                return true;
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initTransactionError(CompletableFuture<AbstractResponse> completableFuture, Errors errors) {
        completableFuture.complete(new InitProducerIdResponse(0, errors, -1L, (short) -1));
    }

    private boolean isValidProducerId(TransactionMetadata transactionMetadata, ProducerIdAndEpoch producerIdAndEpoch) {
        return transactionMetadata.getProducerEpoch() == -1 || producerIdAndEpoch.producerId == transactionMetadata.getProducerId() || (producerIdAndEpoch.producerId == ((long) transactionMetadata.getLastProducerEpoch()) && transactionMetadata.isEpochExhausted(producerIdAndEpoch.epoch));
    }

    private CompletableFuture<ErrorsAndData<EpochAndTxnTransitMetadata>> prepareInitProducerIdTransit(String str, Integer num, Integer num2, TransactionMetadata transactionMetadata, Optional<ProducerIdAndEpoch> optional) {
        CompletableFuture<ErrorsAndData<EpochAndTxnTransitMetadata>> completableFuture = new CompletableFuture<>();
        if (transactionMetadata.getPendingState().isPresent()) {
            completableFuture.complete(new ErrorsAndData<>(Errors.CONCURRENT_TRANSACTIONS));
            return completableFuture;
        }
        if (optional.isPresent() && !isValidProducerId(transactionMetadata, optional.get())) {
            completableFuture.complete(new ErrorsAndData<>(producerEpochFenceErrors()));
            return completableFuture;
        }
        switch (transactionMetadata.getState()) {
            case PREPARE_ABORT:
            case PREPARE_COMMIT:
                completableFuture.complete(new ErrorsAndData<>(Errors.CONCURRENT_TRANSACTIONS));
                break;
            case COMPLETE_ABORT:
            case COMPLETE_COMMIT:
            case EMPTY:
                CompletableFuture completableFuture2 = new CompletableFuture();
                if (transactionMetadata.isProducerEpochExhausted()) {
                    this.producerIdManager.generateProducerId().thenAccept(l -> {
                        completableFuture2.complete(transactionMetadata.prepareProducerIdRotation(l, num, Long.valueOf(SystemTime.SYSTEM.milliseconds()), Boolean.valueOf(optional.isPresent())));
                    });
                } else {
                    completableFuture2.complete(transactionMetadata.prepareIncrementProducerEpoch(num, optional.map((v0) -> {
                        return v0.getEpoch();
                    }), Long.valueOf(SystemTime.SYSTEM.milliseconds())).getData());
                }
                completableFuture2.whenComplete((txnTransitMetadata, th) -> {
                    completableFuture.complete(new ErrorsAndData(new EpochAndTxnTransitMetadata(num2.intValue(), txnTransitMetadata)));
                });
                break;
            case ONGOING:
                completableFuture.complete(new ErrorsAndData<>(new EpochAndTxnTransitMetadata(num2.intValue(), transactionMetadata.prepareFenceProducerEpoch())));
                break;
            case DEAD:
            case PREPARE_EPOCH_FENCE:
                completableFuture.completeExceptionally(new IllegalStateException(String.format("Found transactionalId %s with state %s. This is illegal as we should never have transitioned to this state.", str, transactionMetadata.getState())));
                break;
        }
        return completableFuture;
    }

    public void handleAddPartitionsToTransaction(String str, long j, short s, final List<TopicPartition> list, final CompletableFuture<AbstractResponse> completableFuture) {
        if (str == null || str.isEmpty()) {
            if (log.isDebugEnabled()) {
                log.debug("Returning {} error code to client for {}'s AddPartitions request", Errors.INVALID_REQUEST, str);
            }
            completableFuture.complete(new AddPartitionsToTxnResponse(0, addPartitionError(list, Errors.INVALID_REQUEST)));
            return;
        }
        ErrorsAndData<Optional<TransactionStateManager.CoordinatorEpochAndTxnMetadata>> transactionState = this.txnManager.getTransactionState(str);
        ErrorsAndData errorsAndData = new ErrorsAndData();
        if (transactionState.getData().isPresent()) {
            TransactionStateManager.CoordinatorEpochAndTxnMetadata coordinatorEpochAndTxnMetadata = transactionState.getData().get();
            int intValue = coordinatorEpochAndTxnMetadata.getCoordinatorEpoch().intValue();
            TransactionMetadata transactionMetadata = coordinatorEpochAndTxnMetadata.getTransactionMetadata();
            transactionMetadata.inLock(() -> {
                if (transactionMetadata.getProducerId() != j) {
                    errorsAndData.setErrors(Errors.INVALID_PRODUCER_ID_MAPPING);
                    return null;
                }
                if (transactionMetadata.getProducerEpoch() != s) {
                    errorsAndData.setErrors(producerEpochFenceErrors());
                    return null;
                }
                if (transactionMetadata.getPendingState().isPresent()) {
                    errorsAndData.setErrors(Errors.CONCURRENT_TRANSACTIONS);
                    return null;
                }
                if (transactionMetadata.getState() == TransactionState.PREPARE_COMMIT || transactionMetadata.getState() == TransactionState.PREPARE_ABORT) {
                    errorsAndData.setErrors(Errors.CONCURRENT_TRANSACTIONS);
                    return null;
                }
                if (transactionMetadata.getState() == TransactionState.ONGOING && transactionMetadata.getTopicPartitions().containsAll(list)) {
                    errorsAndData.setErrors(Errors.NONE);
                    return null;
                }
                errorsAndData.setData(new EpochAndTxnTransitMetadata(intValue, transactionMetadata.prepareAddPartitions(new HashSet(list), Long.valueOf(SystemTime.SYSTEM.milliseconds()))));
                return null;
            });
        } else {
            completableFuture.complete(new AddPartitionsToTxnResponse(0, addPartitionError(list, Errors.INVALID_PRODUCER_ID_MAPPING)));
        }
        if (errorsAndData.getErrors() != null) {
            completableFuture.complete(new AddPartitionsToTxnResponse(0, addPartitionError(list, errorsAndData.getErrors())));
        } else {
            this.txnManager.appendTransactionToLog(str, ((EpochAndTxnTransitMetadata) errorsAndData.getData()).coordinatorEpoch, ((EpochAndTxnTransitMetadata) errorsAndData.getData()).txnTransitMetadata, new TransactionStateManager.ResponseCallback() { // from class: io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator.2
                @Override // io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager.ResponseCallback
                public void complete() {
                    completableFuture.complete(new AddPartitionsToTxnResponse(0, TransactionCoordinator.this.addPartitionError(list, Errors.NONE)));
                }

                @Override // io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager.ResponseCallback
                public void fail(Errors errors) {
                    completableFuture.complete(new AddPartitionsToTxnResponse(0, TransactionCoordinator.this.addPartitionError(list, errors)));
                }
            }, errors -> {
                return true;
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<TopicPartition, Errors> addPartitionError(List<TopicPartition> list, Errors errors) {
        HashMap hashMap = new HashMap();
        Iterator<TopicPartition> it = list.iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), errors);
        }
        return hashMap;
    }

    private Errors producerEpochFenceErrors() {
        return Errors.forException(new Throwable("There is a newer producer with the same transactionalId which fences the current one."));
    }

    public void handleEndTransaction(String str, long j, short s, TransactionResult transactionResult, KafkaRequestHandler kafkaRequestHandler, CompletableFuture<AbstractResponse> completableFuture) {
        endTransaction(str, Long.valueOf(j), Short.valueOf(s), transactionResult, true, kafkaRequestHandler, errors -> {
            completableFuture.complete(new EndTxnResponse(0, errors));
        });
    }

    private void endTransaction(final String str, final Long l, final Short sh, final TransactionResult transactionResult, Boolean bool, final KafkaRequestHandler kafkaRequestHandler, final EndTxnCallback endTxnCallback) {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        if (str == null || str.isEmpty()) {
            endTxnCallback.complete(Errors.INVALID_REQUEST);
            return;
        }
        final Optional<TransactionStateManager.CoordinatorEpochAndTxnMetadata> data = this.txnManager.getTransactionState(str).getData();
        if (!data.isPresent()) {
            endTxnCallback.complete(Errors.INVALID_PRODUCER_ID_MAPPING);
            return;
        }
        final ErrorsAndData<TransactionMetadata.TxnTransitMetadata> endTxnPreAppend = endTxnPreAppend(data, str, l.longValue(), bool.booleanValue(), sh.shortValue(), transactionResult, atomicBoolean);
        if (endTxnPreAppend.hasErrors()) {
            log.error("Aborting append of {} to transaction log with coordinator and returning {} error to client for {}'s EndTransaction request", transactionResult, endTxnPreAppend.getErrors(), str);
            endTxnCallback.complete(endTxnPreAppend.getErrors());
        } else {
            final int intValue = data.get().getCoordinatorEpoch().intValue();
            this.txnManager.appendTransactionToLog(str, intValue, endTxnPreAppend.getData(), new TransactionStateManager.ResponseCallback() { // from class: io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator.3
                @Override // io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager.ResponseCallback
                public void complete() {
                    TransactionCoordinator.this.completeEndTxn(str, intValue, l.longValue(), sh.shortValue(), transactionResult, kafkaRequestHandler, endTxnCallback);
                }

                @Override // io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager.ResponseCallback
                public void fail(Errors errors) {
                    TransactionCoordinator.log.info("Aborting sending of transaction markers and returning {} error to client for {}'s EndTransaction request of {}, since appending {} to transaction log with coordinator epoch {} failed", errors, str, transactionResult, endTxnPreAppend.getData(), Integer.valueOf(intValue));
                    if (atomicBoolean.get()) {
                        ErrorsAndData<Optional<TransactionStateManager.CoordinatorEpochAndTxnMetadata>> transactionState = TransactionCoordinator.this.txnManager.getTransactionState(str);
                        if (!transactionState.getData().isPresent()) {
                            TransactionCoordinator.log.warn("The coordinator still owns the transaction partition for {}, but there is no metadata in the cache; this is not expected", str);
                        } else if (transactionState.getData().isPresent() && ((TransactionStateManager.CoordinatorEpochAndTxnMetadata) data.get()).getCoordinatorEpoch().intValue() == intValue) {
                            ((TransactionStateManager.CoordinatorEpochAndTxnMetadata) data.get()).getTransactionMetadata().setHasFailedEpochFence(true);
                            TransactionCoordinator.log.warn("The coordinator failed to write an epoch fence transition for producer {} to the transaction log with error {}. The epoch was increased to ${} but not returned to the client", str, errors, Short.valueOf(((TransactionMetadata.TxnTransitMetadata) endTxnPreAppend.getData()).getProducerEpoch()));
                        }
                    }
                    endTxnCallback.complete(errors);
                }
            }, errors -> {
                return true;
            });
        }
    }

    private ErrorsAndData<TransactionMetadata.TxnTransitMetadata> endTxnPreAppend(Optional<TransactionStateManager.CoordinatorEpochAndTxnMetadata> optional, String str, long j, boolean z, short s, TransactionResult transactionResult, AtomicBoolean atomicBoolean) {
        TransactionMetadata transactionMetadata = optional.get().getTransactionMetadata();
        ErrorsAndData<TransactionMetadata.TxnTransitMetadata> errorsAndData = new ErrorsAndData<>();
        transactionMetadata.inLock(() -> {
            if (transactionMetadata.getProducerId() != j) {
                errorsAndData.setErrors(Errors.INVALID_PRODUCER_ID_MAPPING);
                return null;
            }
            if ((z && s != transactionMetadata.getProducerEpoch()) || s < transactionMetadata.getProducerEpoch()) {
                errorsAndData.setErrors(producerEpochFenceErrors());
                return null;
            }
            if (!transactionMetadata.getPendingState().isPresent() || transactionMetadata.getPendingState().get() == TransactionState.PREPARE_EPOCH_FENCE) {
                endTxnByStatus(str, transactionResult, transactionMetadata, atomicBoolean, s, errorsAndData);
                return null;
            }
            errorsAndData.setErrors(Errors.CONCURRENT_TRANSACTIONS);
            return null;
        });
        return errorsAndData;
    }

    private void endTxnByStatus(String str, TransactionResult transactionResult, TransactionMetadata transactionMetadata, AtomicBoolean atomicBoolean, short s, ErrorsAndData<TransactionMetadata.TxnTransitMetadata> errorsAndData) {
        switch (transactionMetadata.getState()) {
            case PREPARE_ABORT:
                setPreEndTxnErrors(transactionResult, TransactionResult.ABORT, Errors.CONCURRENT_TRANSACTIONS, errorsAndData, str, transactionMetadata);
                return;
            case PREPARE_COMMIT:
                setPreEndTxnErrors(transactionResult, TransactionResult.COMMIT, Errors.CONCURRENT_TRANSACTIONS, errorsAndData, str, transactionMetadata);
                return;
            case COMPLETE_ABORT:
                setPreEndTxnErrors(transactionResult, TransactionResult.ABORT, Errors.NONE, errorsAndData, str, transactionMetadata);
                return;
            case COMPLETE_COMMIT:
                setPreEndTxnErrors(transactionResult, TransactionResult.COMMIT, Errors.NONE, errorsAndData, str, transactionMetadata);
                return;
            case EMPTY:
                errorsAndData.setErrors(logInvalidStateTransitionAndReturnError(str, transactionMetadata.getState(), transactionResult));
                return;
            case ONGOING:
                endTxnOnGoingResult(transactionResult, transactionMetadata, atomicBoolean, s, errorsAndData);
                return;
            case DEAD:
            case PREPARE_EPOCH_FENCE:
                String format = String.format("Found transactionalId %s with state %s. This is illegal as we should never have transitioned to this state.", str, transactionMetadata.getState());
                log.error(format);
                throw new IllegalStateException(format);
            default:
                return;
        }
    }

    private void endTxnOnGoingResult(TransactionResult transactionResult, TransactionMetadata transactionMetadata, AtomicBoolean atomicBoolean, short s, ErrorsAndData<TransactionMetadata.TxnTransitMetadata> errorsAndData) {
        TransactionState transactionState = transactionResult == TransactionResult.COMMIT ? TransactionState.PREPARE_COMMIT : TransactionState.PREPARE_ABORT;
        if (transactionState == TransactionState.PREPARE_ABORT && transactionMetadata.getPendingState().isPresent() && transactionMetadata.getPendingState().get().equals(TransactionState.PREPARE_EPOCH_FENCE)) {
            atomicBoolean.set(true);
            transactionMetadata.setPendingState(Optional.empty());
            transactionMetadata.setProducerEpoch(s);
            transactionMetadata.setLastProducerEpoch((short) -1);
        }
        errorsAndData.setData(transactionMetadata.prepareAbortOrCommit(transactionState, Long.valueOf(SystemTime.SYSTEM.milliseconds())));
    }

    private void setPreEndTxnErrors(TransactionResult transactionResult, TransactionResult transactionResult2, Errors errors, ErrorsAndData<TransactionMetadata.TxnTransitMetadata> errorsAndData, String str, TransactionMetadata transactionMetadata) {
        if (transactionResult.equals(transactionResult2)) {
            errorsAndData.setErrors(errors);
        } else {
            errorsAndData.setErrors(logInvalidStateTransitionAndReturnError(str, transactionMetadata.getState(), transactionResult));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void completeEndTxn(String str, int i, long j, int i2, TransactionResult transactionResult, KafkaRequestHandler kafkaRequestHandler, EndTxnCallback endTxnCallback) {
        ErrorsAndData<Optional<TransactionStateManager.CoordinatorEpochAndTxnMetadata>> transactionState = this.txnManager.getTransactionState(str);
        if (!transactionState.getData().isPresent()) {
            String format = String.format("The coordinator still owns the transaction partition for %s, but there is no metadata in the cache; this is not expected", str);
            log.error(format);
            throw new IllegalStateException(format);
        }
        ErrorsAndData errorsAndData = new ErrorsAndData();
        TransactionStateManager.CoordinatorEpochAndTxnMetadata coordinatorEpochAndTxnMetadata = transactionState.getData().get();
        if (coordinatorEpochAndTxnMetadata.getCoordinatorEpoch().intValue() == i) {
            TransactionMetadata transactionMetadata = coordinatorEpochAndTxnMetadata.getTransactionMetadata();
            transactionMetadata.inLock(() -> {
                if (transactionMetadata.getProducerId() != j) {
                    errorsAndData.setErrors(Errors.INVALID_PRODUCER_ID_MAPPING);
                    return null;
                }
                if (transactionMetadata.getProducerEpoch() != i2) {
                    errorsAndData.setErrors(producerEpochFenceErrors());
                    return null;
                }
                if (transactionMetadata.getPendingState().isPresent()) {
                    errorsAndData.setErrors(Errors.CONCURRENT_TRANSACTIONS);
                    return null;
                }
                switch (transactionMetadata.getState()) {
                    case PREPARE_ABORT:
                        if (transactionResult != TransactionResult.ABORT) {
                            errorsAndData.setErrors(logInvalidStateTransitionAndReturnError(str, transactionMetadata.getState(), transactionResult));
                            return null;
                        }
                        errorsAndData.setData(new PreSendResult(transactionMetadata, transactionMetadata.prepareComplete(Long.valueOf(SystemTime.SYSTEM.milliseconds()))));
                        return null;
                    case PREPARE_COMMIT:
                        if (transactionResult != TransactionResult.COMMIT) {
                            errorsAndData.setErrors(logInvalidStateTransitionAndReturnError(str, transactionMetadata.getState(), transactionResult));
                            return null;
                        }
                        errorsAndData.setData(new PreSendResult(transactionMetadata, transactionMetadata.prepareComplete(Long.valueOf(SystemTime.SYSTEM.milliseconds()))));
                        return null;
                    case COMPLETE_ABORT:
                    case COMPLETE_COMMIT:
                    case EMPTY:
                    case ONGOING:
                        errorsAndData.setErrors(logInvalidStateTransitionAndReturnError(str, transactionMetadata.getState(), transactionResult));
                        return null;
                    case DEAD:
                    case PREPARE_EPOCH_FENCE:
                        String format2 = String.format("Found transactionalId %s with state %s. This is illegal as we should never have transitioned to this state.", str, transactionMetadata.getState());
                        log.error(format2);
                        throw new IllegalStateException(format2);
                    default:
                        return null;
                }
            });
        } else {
            if (log.isDebugEnabled()) {
                log.debug("The transaction coordinator epoch has changed to {} after {} was successfully appended to the log for {} with old epoch {}", coordinatorEpochAndTxnMetadata.getCoordinatorEpoch(), transactionResult, str, Integer.valueOf(i));
            }
            errorsAndData.setErrors(Errors.NOT_COORDINATOR);
        }
        if (errorsAndData.hasErrors()) {
            log.info("Aborting sending of transaction markers after appended {} to transaction log and returning {} error to client for $transactionalId's EndTransaction request", transactionResult, errorsAndData.getErrors());
            endTxnCallback.complete(errorsAndData.getErrors());
        } else {
            endTxnCallback.complete(Errors.NONE);
            this.transactionMarkerChannelManager.addTxnMarkersToSend(Integer.valueOf(i), transactionResult, coordinatorEpochAndTxnMetadata.getTransactionMetadata(), ((PreSendResult) errorsAndData.getData()).getTxnTransitMetadata());
        }
    }

    private Errors logInvalidStateTransitionAndReturnError(String str, TransactionState transactionState, TransactionResult transactionResult) {
        log.debug("TransactionalId: {}'s state is {}, but received transaction marker result to send: {}", str, transactionState, transactionResult);
        return Errors.INVALID_TXN_STATE;
    }

    public void addActivePidOffset(TopicName topicName, long j, long j2) {
        ConcurrentHashMap<Long, Long> computeIfAbsent = this.activePidOffsetMap.computeIfAbsent(topicName, topicName2 -> {
            return new ConcurrentHashMap();
        });
        NavigableMap<Long, Long> computeIfAbsent2 = this.activeOffsetPidMap.computeIfAbsent(topicName, topicName3 -> {
            return new ConcurrentSkipListMap();
        });
        computeIfAbsent.computeIfAbsent(Long.valueOf(j), l -> {
            computeIfAbsent2.computeIfAbsent(Long.valueOf(j2), l -> {
                return Long.valueOf(j);
            });
            return Long.valueOf(j2);
        });
    }

    public long removeActivePidOffset(TopicName topicName, long j) {
        ConcurrentHashMap<Long, Long> orDefault = this.activePidOffsetMap.getOrDefault(topicName, null);
        if (orDefault == null) {
            return -1L;
        }
        NavigableMap<Long, Long> orDefault2 = this.activeOffsetPidMap.getOrDefault(topicName, null);
        if (orDefault2 == null) {
            log.warn("[removeActivePidOffset] offsetPidMap is null");
            return -1L;
        }
        Long remove = orDefault.remove(Long.valueOf(j));
        if (remove == null) {
            log.warn("[removeActivePidOffset] pidOffsetMap is not contains pid {}.", Long.valueOf(j));
            return -1L;
        }
        if (orDefault2.containsKey(remove)) {
            orDefault2.remove(remove);
        }
        return remove.longValue();
    }

    public long getLastStableOffset(TopicName topicName, long j) {
        NavigableMap<Long, Long> orDefault = this.activeOffsetPidMap.getOrDefault(topicName, null);
        return (orDefault == null || orDefault.isEmpty()) ? j : orDefault.firstKey().longValue();
    }

    public void addAbortedIndex(AbortedIndexEntry abortedIndexEntry) {
        this.abortedIndexList.add(abortedIndexEntry);
    }

    public List<FetchResponse.AbortedTransaction> getAbortedIndexList(long j) {
        ArrayList arrayList = new ArrayList(this.abortedIndexList.size());
        for (AbortedIndexEntry abortedIndexEntry : this.abortedIndexList) {
            if (abortedIndexEntry.getLastOffset() >= j) {
                arrayList.add(new FetchResponse.AbortedTransaction(abortedIndexEntry.getPid(), abortedIndexEntry.getFirstOffset()));
            }
        }
        return arrayList;
    }
}
