package io.streamnative.pulsar.handlers.kop.coordinator.transaction;

import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionMetadata;
import io.streamnative.pulsar.handlers.kop.utils.CoreUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestUtils;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.class */
public class TransactionStateManager {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) TransactionStateManager.class);
    private final TransactionConfig transactionConfig;
    private int transactionTopicPartitionCount;
    private ReentrantReadWriteLock stateLock = new ReentrantReadWriteLock();
    private Set<TransactionPartitionAndLeaderEpoch> loadingPartitions = new HashSet();
    private Map<Integer, TxnMetadataCacheEntry> transactionMetadataCache = new HashMap();
    private final Map<String, TransactionMetadata> transactionStateMap = new ConcurrentHashMap();

    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager$CoordinatorEpochAndTxnMetadata.class */
    public static class CoordinatorEpochAndTxnMetadata {
        private Integer coordinatorEpoch;
        private TransactionMetadata transactionMetadata;

        public Integer getCoordinatorEpoch() {
            return this.coordinatorEpoch;
        }

        public TransactionMetadata getTransactionMetadata() {
            return this.transactionMetadata;
        }

        public void setCoordinatorEpoch(Integer num) {
            this.coordinatorEpoch = num;
        }

        public void setTransactionMetadata(TransactionMetadata transactionMetadata) {
            this.transactionMetadata = transactionMetadata;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof CoordinatorEpochAndTxnMetadata)) {
                return false;
            }
            CoordinatorEpochAndTxnMetadata coordinatorEpochAndTxnMetadata = (CoordinatorEpochAndTxnMetadata) obj;
            if (!coordinatorEpochAndTxnMetadata.canEqual(this)) {
                return false;
            }
            Integer coordinatorEpoch = getCoordinatorEpoch();
            Integer coordinatorEpoch2 = coordinatorEpochAndTxnMetadata.getCoordinatorEpoch();
            if (coordinatorEpoch == null) {
                if (coordinatorEpoch2 != null) {
                    return false;
                }
            } else if (!coordinatorEpoch.equals(coordinatorEpoch2)) {
                return false;
            }
            TransactionMetadata transactionMetadata = getTransactionMetadata();
            TransactionMetadata transactionMetadata2 = coordinatorEpochAndTxnMetadata.getTransactionMetadata();
            return transactionMetadata == null ? transactionMetadata2 == null : transactionMetadata.equals(transactionMetadata2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof CoordinatorEpochAndTxnMetadata;
        }

        public int hashCode() {
            Integer coordinatorEpoch = getCoordinatorEpoch();
            int hashCode = (1 * 59) + (coordinatorEpoch == null ? 43 : coordinatorEpoch.hashCode());
            TransactionMetadata transactionMetadata = getTransactionMetadata();
            return (hashCode * 59) + (transactionMetadata == null ? 43 : transactionMetadata.hashCode());
        }

        public String toString() {
            return "TransactionStateManager.CoordinatorEpochAndTxnMetadata(coordinatorEpoch=" + getCoordinatorEpoch() + ", transactionMetadata=" + getTransactionMetadata() + ")";
        }

        public CoordinatorEpochAndTxnMetadata(Integer num, TransactionMetadata transactionMetadata) {
            this.coordinatorEpoch = num;
            this.transactionMetadata = transactionMetadata;
        }
    }

    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager$ResponseCallback.class */
    public interface ResponseCallback {
        void complete();

        void fail(Errors errors);
    }

    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager$RetryOnError.class */
    public interface RetryOnError {
        boolean retry(Errors errors);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager$SendTxnMarkersCallback.class */
    public interface SendTxnMarkersCallback {
        void send(int i, TransactionResult transactionResult, TransactionMetadata transactionMetadata, TransactionMetadata.TxnTransitMetadata txnTransitMetadata);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager$TransactionPartitionAndLeaderEpoch.class */
    public static class TransactionPartitionAndLeaderEpoch {
        private Integer txnPartitionId;
        private final Integer coordinatorEpoch;

        public TransactionPartitionAndLeaderEpoch(Integer num, Integer num2) {
            this.txnPartitionId = num;
            this.coordinatorEpoch = num2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager$TransactionalIdCoordinatorEpochAndTransitMetadata.class */
    public static class TransactionalIdCoordinatorEpochAndTransitMetadata {
        private final String transactionalId;
        private int coordinatorEpoch;
        private TransactionResult result;
        private TransactionMetadata txnMetadata;
        private TransactionMetadata.TxnTransitMetadata transitMetadata;

        public TransactionalIdCoordinatorEpochAndTransitMetadata(String str, int i, TransactionResult transactionResult, TransactionMetadata transactionMetadata, TransactionMetadata.TxnTransitMetadata txnTransitMetadata) {
            this.transactionalId = str;
            this.coordinatorEpoch = i;
            this.result = transactionResult;
            this.txnMetadata = transactionMetadata;
            this.transitMetadata = txnTransitMetadata;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager$TxnMetadataCacheEntry.class */
    public static class TxnMetadataCacheEntry {
        private Integer coordinatorEpoch;
        private Map<String, TransactionMetadata> metadataPerTransactionalId;

        public String toString() {
            return "TxnMetadataCacheEntry{coordinatorEpoch=" + this.coordinatorEpoch + ", numTransactionalEntries=" + this.metadataPerTransactionalId.size() + '}';
        }

        public TxnMetadataCacheEntry(Integer num, Map<String, TransactionMetadata> map) {
            this.coordinatorEpoch = num;
            this.metadataPerTransactionalId = map;
        }
    }

    public TransactionStateManager(TransactionConfig transactionConfig) {
        this.transactionConfig = transactionConfig;
        this.transactionTopicPartitionCount = transactionConfig.getTransactionLogNumPartitions();
    }

    public void appendTransactionToLog(String str, int i, TransactionMetadata.TxnTransitMetadata txnTransitMetadata, ResponseCallback responseCallback, RetryOnError retryOnError) {
        TopicPartition topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionFor(str));
        CoreUtils.inReadLock(this.stateLock, () -> {
            ErrorsAndData<Optional<CoordinatorEpochAndTxnMetadata>> transactionState = getTransactionState(str);
            if (transactionState.hasErrors()) {
                responseCallback.fail(transactionState.getErrors());
                return null;
            }
            if (!transactionState.getData().isPresent()) {
                responseCallback.fail(Errors.NOT_COORDINATOR);
                return null;
            }
            CoordinatorEpochAndTxnMetadata coordinatorEpochAndTxnMetadata = transactionState.getData().get();
            coordinatorEpochAndTxnMetadata.getTransactionMetadata().inLock(() -> {
                if (coordinatorEpochAndTxnMetadata.getCoordinatorEpoch().intValue() != i) {
                    responseCallback.fail(Errors.NOT_COORDINATOR);
                    return null;
                }
                HashMap hashMap = new HashMap();
                hashMap.put(topicPartition, new ProduceResponse.PartitionResponse(Errors.NONE));
                updateCacheCallback(str, txnTransitMetadata, topicPartition, i, hashMap, responseCallback, retryOnError);
                log.info("Appending new metadata {} for transaction id {} with coordinator epoch {} to the local transaction log", txnTransitMetadata, str, Integer.valueOf(i));
                return null;
            });
            return null;
        });
    }

    private void updateCacheCallback(String str, TransactionMetadata.TxnTransitMetadata txnTransitMetadata, TopicPartition topicPartition, int i, Map<TopicPartition, ProduceResponse.PartitionResponse> map, ResponseCallback responseCallback, RetryOnError retryOnError) {
        if (map.size() != 1 || !map.containsKey(topicPartition)) {
            throw new IllegalStateException(String.format("Append status %s should only have one partition %s", map, topicPartition));
        }
        ErrorsAndData<Void> statusCheck = statusCheck(str, txnTransitMetadata, map.get(topicPartition));
        if (statusCheck.hasErrors()) {
            invalidStatus(str, txnTransitMetadata, statusCheck, i, retryOnError);
        } else {
            validStatus(str, txnTransitMetadata, statusCheck, i);
        }
        if (statusCheck.hasErrors()) {
            responseCallback.fail(statusCheck.getErrors());
        } else {
            responseCallback.complete();
        }
    }

    private ErrorsAndData<Void> statusCheck(String str, TransactionMetadata.TxnTransitMetadata txnTransitMetadata, ProduceResponse.PartitionResponse partitionResponse) {
        ErrorsAndData<Void> errorsAndData = new ErrorsAndData<>();
        if (partitionResponse.error != Errors.NONE) {
            if (log.isDebugEnabled()) {
                log.debug("Appending {}'s new metadata {} failed due to {}", str, txnTransitMetadata, partitionResponse.error.exceptionName());
            }
            switch (partitionResponse.error) {
                case UNKNOWN_TOPIC_OR_PARTITION:
                case NOT_ENOUGH_REPLICAS:
                case NOT_ENOUGH_REPLICAS_AFTER_APPEND:
                case REQUEST_TIMED_OUT:
                    errorsAndData.setErrors(Errors.COORDINATOR_NOT_AVAILABLE);
                    break;
                case KAFKA_STORAGE_ERROR:
                    errorsAndData.setErrors(Errors.NOT_COORDINATOR);
                    break;
                case MESSAGE_TOO_LARGE:
                case RECORD_LIST_TOO_LARGE:
                    errorsAndData.setErrors(Errors.UNKNOWN_SERVER_ERROR);
                    break;
                default:
                    errorsAndData.setErrors(Errors.UNKNOWN_SERVER_ERROR);
                    break;
            }
        } else {
            errorsAndData.setErrors(Errors.NONE);
        }
        return errorsAndData;
    }

    private void validStatus(String str, TransactionMetadata.TxnTransitMetadata txnTransitMetadata, ErrorsAndData<Void> errorsAndData, int i) {
        ErrorsAndData<Optional<CoordinatorEpochAndTxnMetadata>> transactionState = getTransactionState(str);
        if (transactionState.hasErrors()) {
            log.info("Accessing the cached transaction metadata for {} returns {} error; aborting transition to the new metadata and setting the error in the callback", str, transactionState.getErrors());
            errorsAndData.setErrors(transactionState.getErrors());
        } else if (transactionState.getData().isPresent()) {
            TransactionMetadata transactionMetadata = transactionState.getData().get().transactionMetadata;
            transactionMetadata.inLock(() -> {
                if (((CoordinatorEpochAndTxnMetadata) ((Optional) transactionState.getData()).get()).coordinatorEpoch.intValue() != i) {
                    log.info("The cached coordinator epoch for {} has changed to {} after appended its new metadata {} to the transaction log (txn topic partition {}) while it was {} before appending; aborting transition to the new metadata and returning {} in the callback", str, Integer.valueOf(i), txnTransitMetadata, Integer.valueOf(partitionFor(str)), Integer.valueOf(i), Errors.NOT_CONTROLLER);
                    errorsAndData.setErrors(Errors.NOT_COORDINATOR);
                    return null;
                }
                transactionMetadata.completeTransitionTo(txnTransitMetadata);
                if (!log.isDebugEnabled()) {
                    return null;
                }
                log.debug("Updating {}'s transaction state to {} with coordinator epoch {} for {} succeezded", str, txnTransitMetadata, Integer.valueOf(i), str);
                return null;
            });
        } else {
            log.info("The cached coordinator metadata does not exist in the cache anymore for {} after appended its new metadata {} to the transaction log (txn topic partition {}) while it was {} before appending; aborting transition to the new metadata and returning {} in the callback", str, txnTransitMetadata, Integer.valueOf(partitionFor(str)), Integer.valueOf(i), Errors.NOT_COORDINATOR);
            errorsAndData.setErrors(Errors.NOT_COORDINATOR);
        }
    }

    private void invalidStatus(String str, TransactionMetadata.TxnTransitMetadata txnTransitMetadata, ErrorsAndData<Void> errorsAndData, int i, RetryOnError retryOnError) {
        ErrorsAndData<Optional<CoordinatorEpochAndTxnMetadata>> transactionState = getTransactionState(str);
        if (transactionState.hasErrors()) {
            log.info("TransactionalId {} append transaction log for {} transition failed due to {}, aborting state transition and returning the error in the callback since retrieving metadata returned {}", str, txnTransitMetadata, errorsAndData.getErrors(), transactionState.getErrors());
        } else if (!transactionState.getData().isPresent()) {
            log.info("TransactionalId {} append transaction log for {} transition failed due to {}, aborting state transition and returning the error in the callback since metadata is not available in the cache anymore", str, txnTransitMetadata, errorsAndData.getErrors());
        } else {
            TransactionMetadata transactionMetadata = transactionState.getData().get().transactionMetadata;
            transactionMetadata.inLock(() -> {
                if (((CoordinatorEpochAndTxnMetadata) ((Optional) transactionState.getData()).get()).coordinatorEpoch.intValue() != i) {
                    log.info("TransactionalId {} append transaction log for {} transition failed due to {}, aborting state transition and returning the error in the callback since the coordinator epoch has changed from {} to {}", transactionMetadata.getTransactionalId(), txnTransitMetadata, errorsAndData.getErrors(), ((CoordinatorEpochAndTxnMetadata) ((Optional) transactionState.getData()).get()).coordinatorEpoch, Integer.valueOf(i));
                    return null;
                }
                if (retryOnError.retry(errorsAndData.getErrors())) {
                    log.info("TransactionalId {} append transaction log for {} transition failed due to {}, not resetting pending state {} but just returning the error in the callback to let the caller retry", transactionMetadata.getTransactionalId(), txnTransitMetadata, errorsAndData.getErrors(), transactionMetadata.getPendingState());
                    return null;
                }
                log.info("TransactionalId {} append transaction log for {} transition failed due to {}, resetting pending state from {}, aborting state transition and returning {} in the callback", transactionMetadata.getTransactionalId(), txnTransitMetadata, errorsAndData.getErrors(), transactionMetadata.getPendingState(), errorsAndData.getErrors());
                transactionMetadata.setPendingState(Optional.empty());
                return null;
            });
        }
    }

    public ByteBuf getWriteMarker(String str) {
        TransactionMetadata transactionMetadata = this.transactionStateMap.get(str);
        WriteTxnMarkersRequest build = new WriteTxnMarkersRequest.Builder(Lists.newArrayList(new WriteTxnMarkersRequest.TxnMarkerEntry[]{new WriteTxnMarkersRequest.TxnMarkerEntry(transactionMetadata.getProducerId(), transactionMetadata.getProducerEpoch(), 1, TransactionResult.COMMIT, new ArrayList(transactionMetadata.getTopicPartitions()))})).build();
        return RequestUtils.serializeRequest(build.version(), new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, build.version(), "", -1), build);
    }

    public ErrorsAndData<Optional<CoordinatorEpochAndTxnMetadata>> getTransactionState(String str) {
        return getAndMaybeAddTransactionState(str, Optional.empty());
    }

    public ErrorsAndData<Optional<CoordinatorEpochAndTxnMetadata>> putTransactionStateIfNotExists(TransactionMetadata transactionMetadata) {
        ErrorsAndData<Optional<CoordinatorEpochAndTxnMetadata>> andMaybeAddTransactionState = getAndMaybeAddTransactionState(transactionMetadata.getTransactionalId(), Optional.of(transactionMetadata));
        if (andMaybeAddTransactionState.getData().isPresent()) {
            return andMaybeAddTransactionState;
        }
        throw new IllegalStateException("Unexpected empty transaction metadata returned while putting " + transactionMetadata);
    }

    public boolean validateTransactionTimeoutMs(int i) {
        return ((long) i) <= this.transactionConfig.getTransactionMaxTimeoutMs() && i > 0;
    }

    private ErrorsAndData<Optional<CoordinatorEpochAndTxnMetadata>> getAndMaybeAddTransactionState(String str, Optional<TransactionMetadata> optional) {
        return (ErrorsAndData) CoreUtils.inReadLock(this.stateLock, () -> {
            Optional of;
            int partitionFor = partitionFor(str);
            if (this.loadingPartitions.stream().anyMatch(transactionPartitionAndLeaderEpoch -> {
                return transactionPartitionAndLeaderEpoch.txnPartitionId.intValue() == partitionFor;
            })) {
                return new ErrorsAndData(Errors.CONCURRENT_TRANSACTIONS);
            }
            TxnMetadataCacheEntry txnMetadataCacheEntry = this.transactionMetadataCache.get(Integer.valueOf(partitionFor));
            if (txnMetadataCacheEntry == null) {
                return new ErrorsAndData(Errors.NOT_COORDINATOR);
            }
            TransactionMetadata transactionMetadata = (TransactionMetadata) txnMetadataCacheEntry.metadataPerTransactionalId.get(str);
            if (transactionMetadata != null) {
                of = Optional.of(transactionMetadata);
            } else if (optional.isPresent()) {
                txnMetadataCacheEntry.metadataPerTransactionalId.put(str, optional.get());
                of = optional;
            } else {
                of = Optional.empty();
            }
            return (ErrorsAndData) of.map(transactionMetadata2 -> {
                return new ErrorsAndData(Optional.of(new CoordinatorEpochAndTxnMetadata(txnMetadataCacheEntry.coordinatorEpoch, transactionMetadata2)));
            }).orElseGet(() -> {
                return new ErrorsAndData(Optional.empty());
            });
        });
    }

    public int partitionFor(String str) {
        return Utils.abs(str.hashCode()) % this.transactionTopicPartitionCount;
    }

    private void addLoadedTransactionsToCache(int i, int i2, Map<String, TransactionMetadata> map) {
        log.warn("Unloaded transaction metadata {} from {} as part of loading metadata at epoch {}", this.transactionMetadataCache.put(Integer.valueOf(i), new TxnMetadataCacheEntry(Integer.valueOf(i2), map)), Integer.valueOf(i), Integer.valueOf(i2));
    }

    private Map<String, TransactionMetadata> loadTransactionMetadata(TopicPartition topicPartition, int i) {
        log.info("load transaction metadata for topicPartition: {}, coordinatorEpoch: {}", topicPartition, Integer.valueOf(i));
        return new HashMap();
    }

    public CompletableFuture<Void> loadTransactionsForTxnTopicPartition(int i, int i2, SendTxnMarkersCallback sendTxnMarkersCallback) {
        TopicPartition topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, i);
        TransactionPartitionAndLeaderEpoch transactionPartitionAndLeaderEpoch = new TransactionPartitionAndLeaderEpoch(Integer.valueOf(i), Integer.valueOf(i2));
        CoreUtils.inWriteLock(this.stateLock, () -> {
            this.loadingPartitions.add(transactionPartitionAndLeaderEpoch);
            return null;
        });
        loadTransactions(SystemTime.SYSTEM.milliseconds(), topicPartition, i2, transactionPartitionAndLeaderEpoch, sendTxnMarkersCallback);
        return CompletableFuture.completedFuture(null);
    }

    private void loadTransactions(long j, TopicPartition topicPartition, int i, TransactionPartitionAndLeaderEpoch transactionPartitionAndLeaderEpoch, SendTxnMarkersCallback sendTxnMarkersCallback) {
        long milliseconds = SystemTime.SYSTEM.milliseconds() - j;
        log.info("Loading transaction metadata from {} at epoch {}", topicPartition, Integer.valueOf(i));
        Map<String, TransactionMetadata> loadTransactionMetadata = loadTransactionMetadata(topicPartition, i);
        log.info("Finished loading {} transaction metadata from {} in {} milliseconds, of which {} milliseconds was spent in the scheduler.", Integer.valueOf(loadTransactionMetadata.size()), topicPartition, Long.valueOf(SystemTime.SYSTEM.milliseconds() - j), Long.valueOf(milliseconds));
        CoreUtils.inWriteLock(this.stateLock, () -> {
            if (!this.loadingPartitions.contains(transactionPartitionAndLeaderEpoch)) {
                return null;
            }
            addLoadedTransactionsToCache(topicPartition.partition(), i, loadTransactionMetadata);
            ArrayList arrayList = new ArrayList();
            for (Map.Entry entry : loadTransactionMetadata.entrySet()) {
                TransactionMetadata transactionMetadata = (TransactionMetadata) entry.getValue();
                transactionMetadata.inLock(() -> {
                    switch (transactionMetadata.getState()) {
                        case PREPARE_ABORT:
                            arrayList.add(new TransactionalIdCoordinatorEpochAndTransitMetadata((String) entry.getKey(), i, TransactionResult.ABORT, transactionMetadata, transactionMetadata.prepareComplete(Long.valueOf(SystemTime.SYSTEM.milliseconds()))));
                            return null;
                        case PREPARE_COMMIT:
                            arrayList.add(new TransactionalIdCoordinatorEpochAndTransitMetadata((String) entry.getKey(), i, TransactionResult.COMMIT, transactionMetadata, transactionMetadata.prepareComplete(Long.valueOf(SystemTime.SYSTEM.milliseconds()))));
                            return null;
                        default:
                            return null;
                    }
                });
            }
            this.loadingPartitions.remove(transactionPartitionAndLeaderEpoch);
            arrayList.forEach(transactionalIdCoordinatorEpochAndTransitMetadata -> {
                sendTxnMarkersCallback.send(transactionalIdCoordinatorEpochAndTransitMetadata.coordinatorEpoch, transactionalIdCoordinatorEpochAndTransitMetadata.result, transactionalIdCoordinatorEpochAndTransitMetadata.txnMetadata, transactionalIdCoordinatorEpochAndTransitMetadata.transitMetadata);
            });
            return null;
        });
        log.info("Completed loading transaction metadata from {} for coordinator epoch {}", topicPartition, Integer.valueOf(i));
    }
}
