package io.streamnative.pulsar.handlers.kop.coordinator.transaction;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestUtils;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
import org.apache.kafka.common.requests.WriteTxnMarkersResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.class */
public class TransactionMarkerChannelHandler extends ChannelInboundHandlerAdapter {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) TransactionMarkerChannelHandler.class);
    private final CompletableFuture<ChannelHandlerContext> cnx = new CompletableFuture<>();
    private final ConcurrentLongHashMap<InFlightRequest> inFlightRequestMap = new ConcurrentLongHashMap<>();
    private final AtomicInteger correlationId = new AtomicInteger(0);

    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler$InFlightRequest.class */
    private class InFlightRequest {
        private final long requestId;
        private final WriteTxnMarkersRequest request;
        private final TransactionMarkerRequestCompletionHandler requestCompletionHandler;

        public InFlightRequest(WriteTxnMarkersRequest writeTxnMarkersRequest, TransactionMarkerRequestCompletionHandler transactionMarkerRequestCompletionHandler) {
            this.request = writeTxnMarkersRequest;
            this.requestCompletionHandler = transactionMarkerRequestCompletionHandler;
            this.requestId = TransactionMarkerChannelHandler.this.correlationId.incrementAndGet();
        }

        public ByteBuf getRequestData() {
            return RequestUtils.serializeRequest(this.request.version(), new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, this.request.version(), "", (int) this.requestId), this.request);
        }

        public void onComplete(ByteBuffer byteBuffer) {
            this.requestCompletionHandler.onComplete(WriteTxnMarkersResponse.parse(byteBuffer, ApiKeys.WRITE_TXN_MARKERS.latestVersion()));
        }
    }

    public void enqueueRequest(WriteTxnMarkersRequest writeTxnMarkersRequest, TransactionMarkerRequestCompletionHandler transactionMarkerRequestCompletionHandler) {
        this.cnx.thenAccept(channelHandlerContext -> {
            InFlightRequest inFlightRequest = new InFlightRequest(writeTxnMarkersRequest, transactionMarkerRequestCompletionHandler);
            this.inFlightRequestMap.put(inFlightRequest.requestId, inFlightRequest);
            channelHandlerContext.writeAndFlush(inFlightRequest.getRequestData());
        });
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("channelActive");
        }
        this.cnx.complete(channelHandlerContext);
        super.channelActive(channelHandlerContext);
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        log.info("[TransactionMarkerChannelHandler] channelInactive");
        super.channelInactive(channelHandlerContext);
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        ByteBuffer nioBuffer = ((ByteBuf) obj).nioBuffer();
        ResponseHeader parse = ResponseHeader.parse(nioBuffer);
        InFlightRequest inFlightRequest = (InFlightRequest) this.inFlightRequestMap.get(parse.correlationId());
        if (inFlightRequest == null) {
            log.error("Miss the inFlightRequest with correlationId {}.", Integer.valueOf(parse.correlationId()));
        } else {
            inFlightRequest.onComplete(nioBuffer);
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        log.error("Transaction marker channel handler caught exception.", th);
        super.exceptionCaught(channelHandlerContext, th);
    }
}
