package io.streamnative.pulsar.handlers.kop;

import com.fasterxml.jackson.databind.deser.std.StdKeyDeserializer;
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import io.streamnative.pulsar.handlers.kop.stats.StatsLogger;
import java.io.Closeable;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.DefaultRecord;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream;
import org.apache.kafka.common.record.LegacyRecord;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseCallbackWrapper;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.requests.ResponseUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.class */
public abstract class KafkaCommandDecoder extends ChannelInboundHandlerAdapter {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaCommandDecoder.class);
    protected ChannelHandlerContext ctx;
    protected SocketAddress remoteAddress;
    protected AtomicBoolean isActive = new AtomicBoolean(false);
    private final LinkedBlockingQueue<ResponseAndRequest> requestQueue;
    protected final RequestStats requestStats;
    protected final KafkaServiceConfiguration kafkaConfig;

    /* renamed from: io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder$1, reason: invalid class name */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$common$protocol$ApiKeys = new int[ApiKeys.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.API_VERSIONS.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.METADATA.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.PRODUCE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.FIND_COORDINATOR.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.LIST_OFFSETS.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.OFFSET_FETCH.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.OFFSET_COMMIT.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.FETCH.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.JOIN_GROUP.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.SYNC_GROUP.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.HEARTBEAT.ordinal()] = 11;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.LEAVE_GROUP.ordinal()] = 12;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.DESCRIBE_GROUPS.ordinal()] = 13;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.LIST_GROUPS.ordinal()] = 14;
            } catch (NoSuchFieldError e14) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.DELETE_GROUPS.ordinal()] = 15;
            } catch (NoSuchFieldError e15) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.SASL_HANDSHAKE.ordinal()] = 16;
            } catch (NoSuchFieldError e16) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.SASL_AUTHENTICATE.ordinal()] = 17;
            } catch (NoSuchFieldError e17) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.CREATE_TOPICS.ordinal()] = 18;
            } catch (NoSuchFieldError e18) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.INIT_PRODUCER_ID.ordinal()] = 19;
            } catch (NoSuchFieldError e19) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.ADD_PARTITIONS_TO_TXN.ordinal()] = 20;
            } catch (NoSuchFieldError e20) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.ADD_OFFSETS_TO_TXN.ordinal()] = 21;
            } catch (NoSuchFieldError e21) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.TXN_OFFSET_COMMIT.ordinal()] = 22;
            } catch (NoSuchFieldError e22) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.END_TXN.ordinal()] = 23;
            } catch (NoSuchFieldError e23) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.WRITE_TXN_MARKERS.ordinal()] = 24;
            } catch (NoSuchFieldError e24) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.DESCRIBE_CONFIGS.ordinal()] = 25;
            } catch (NoSuchFieldError e25) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.DELETE_TOPICS.ordinal()] = 26;
            } catch (NoSuchFieldError e26) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder$KafkaHeaderAndRequest.class */
    public static class KafkaHeaderAndRequest implements Closeable {
        private static final String DEFAULT_CLIENT_HOST = "";
        private final RequestHeader header;
        private final AbstractRequest request;
        private final ByteBuf buffer;
        private final SocketAddress remoteAddress;

        KafkaHeaderAndRequest(RequestHeader requestHeader, AbstractRequest abstractRequest, ByteBuf byteBuf, SocketAddress socketAddress) {
            this.header = requestHeader;
            this.request = abstractRequest;
            this.buffer = byteBuf.retain();
            this.remoteAddress = socketAddress;
        }

        public ByteBuf getBuffer() {
            return this.buffer;
        }

        public RequestHeader getHeader() {
            return this.header;
        }

        public AbstractRequest getRequest() {
            return this.request;
        }

        public SocketAddress getRemoteAddress() {
            return this.remoteAddress;
        }

        public String getClientHost() {
            return this.remoteAddress == null ? "" : this.remoteAddress.toString();
        }

        public ByteBuf createErrorResponse(Throwable th) {
            return KafkaCommandDecoder.responseToByteBuf(this.request.getErrorResponse(th), this);
        }

        public String toString() {
            return String.format("KafkaHeaderAndRequest(header=%s, request=%s, remoteAddress=%s)", this.header, this.request, this.remoteAddress);
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            this.buffer.release();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder$KafkaHeaderAndResponse.class */
    public static class KafkaHeaderAndResponse implements Closeable {
        private final short apiVersion;
        private final ResponseHeader header;
        private final AbstractResponse response;

        private KafkaHeaderAndResponse(short s, ResponseHeader responseHeader, AbstractResponse abstractResponse) {
            this.apiVersion = s;
            this.header = responseHeader;
            this.response = abstractResponse;
        }

        public short getApiVersion() {
            return this.apiVersion;
        }

        public ResponseHeader getHeader() {
            return this.header;
        }

        public AbstractResponse getResponse() {
            return this.response;
        }

        static KafkaHeaderAndResponse responseForRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, AbstractResponse abstractResponse) {
            return new KafkaHeaderAndResponse(kafkaHeaderAndRequest.getHeader().apiVersion(), kafkaHeaderAndRequest.getHeader().toResponseHeader(), abstractResponse);
        }

        public String toString() {
            return String.format("KafkaHeaderAndResponse(header=%s,responseFuture=%s)", this.header.toStruct().toString(), this.response.toString(getApiVersion()));
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder$ResponseAndRequest.class */
    public static class ResponseAndRequest {
        private final CompletableFuture<AbstractResponse> responseFuture;
        private final KafkaHeaderAndRequest request;
        private final long createdTimestamp = MathUtils.nowInNano();
        private long firstBlockedTimestamp = 0;

        public static ResponseAndRequest of(CompletableFuture<AbstractResponse> completableFuture, KafkaHeaderAndRequest kafkaHeaderAndRequest) {
            return new ResponseAndRequest(completableFuture, kafkaHeaderAndRequest);
        }

        public long nanoSecondsSinceCreated() {
            return MathUtils.elapsedNanos(this.createdTimestamp);
        }

        public boolean expired(int i) {
            return MathUtils.elapsedNanos(this.createdTimestamp) > TimeUnit.MILLISECONDS.toNanos((long) i);
        }

        public void updateStats(RequestStats requestStats) {
            RequestStats.REQUEST_QUEUE_SIZE_INSTANCE.decrementAndGet();
            requestStats.getStatsLogger().scopeLabel(KopServerStats.REQUEST_SCOPE, this.request.getHeader().apiKey().name).getOpStatsLogger(KopServerStats.REQUEST_QUEUED_LATENCY).registerSuccessfulEvent(MathUtils.elapsedNanos(this.createdTimestamp), TimeUnit.NANOSECONDS);
        }

        ResponseAndRequest(CompletableFuture<AbstractResponse> completableFuture, KafkaHeaderAndRequest kafkaHeaderAndRequest) {
            this.responseFuture = completableFuture;
            this.request = kafkaHeaderAndRequest;
        }

        public CompletableFuture<AbstractResponse> getResponseFuture() {
            return this.responseFuture;
        }

        public KafkaHeaderAndRequest getRequest() {
            return this.request;
        }

        public long getCreatedTimestamp() {
            return this.createdTimestamp;
        }

        public long getFirstBlockedTimestamp() {
            return this.firstBlockedTimestamp;
        }

        public void setFirstBlockedTimestamp(long j) {
            this.firstBlockedTimestamp = j;
        }
    }

    public KafkaCommandDecoder(StatsLogger statsLogger, KafkaServiceConfiguration kafkaServiceConfiguration) {
        this.requestStats = new RequestStats(statsLogger);
        this.kafkaConfig = kafkaServiceConfiguration;
        this.requestQueue = new LinkedBlockingQueue<>(kafkaServiceConfiguration.getMaxQueuedRequests());
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelActive(channelHandlerContext);
        this.remoteAddress = channelHandlerContext.channel().remoteAddress();
        this.ctx = channelHandlerContext;
        this.isActive.set(true);
    }

    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (!(obj instanceof IdleStateEvent)) {
            super.userEventTriggered(channelHandlerContext, obj);
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("About to close the idle connection from {} due to being idle for {} millis", getRemoteAddress(), Long.valueOf(this.kafkaConfig.getConnectionMaxIdleMs()));
        }
        close();
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        log.error("[{}] Got exception: {}", this.remoteAddress, th.getMessage(), th);
        close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void close() {
        log.info("close channel {} with {} pending responses", this.ctx.channel(), Integer.valueOf(this.requestQueue.size()));
        while (true) {
            ResponseAndRequest poll = this.requestQueue.poll();
            if (poll == null) {
                this.ctx.close();
                return;
            } else {
                poll.getResponseFuture().cancel(true);
                RequestStats.REQUEST_QUEUE_SIZE_INSTANCE.decrementAndGet();
            }
        }
    }

    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("Channel writability has changed to: {}", Boolean.valueOf(channelHandlerContext.channel().isWritable()));
        }
    }

    protected KafkaHeaderAndRequest byteBufToRequest(ByteBuf byteBuf) {
        return byteBufToRequest(byteBuf, null);
    }

    protected KafkaHeaderAndRequest byteBufToRequest(ByteBuf byteBuf, SocketAddress socketAddress) {
        Preconditions.checkArgument(byteBuf.readableBytes() > 0);
        ByteBuffer nioBuffer = byteBuf.nioBuffer();
        RequestHeader parse = RequestHeader.parse(nioBuffer);
        if (isUnsupportedApiVersionsRequest(parse)) {
            return new KafkaHeaderAndRequest(parse, new ApiVersionsRequest((short) 0, Short.valueOf(parse.apiVersion())), byteBuf, socketAddress);
        }
        ApiKeys apiKey = parse.apiKey();
        short apiVersion = parse.apiVersion();
        return new KafkaHeaderAndRequest(parse, AbstractRequest.parseRequest(apiKey, apiVersion, apiKey.parseRequest(apiVersion, nioBuffer)), byteBuf, socketAddress);
    }

    protected static ByteBuf responseToByteBuf(AbstractResponse abstractResponse, KafkaHeaderAndRequest kafkaHeaderAndRequest) {
        try {
            KafkaHeaderAndResponse responseForRequest = KafkaHeaderAndResponse.responseForRequest(kafkaHeaderAndRequest, abstractResponse);
            Throwable th = null;
            try {
                try {
                    short apiVersion = responseForRequest.getApiVersion();
                    if (kafkaHeaderAndRequest.getHeader().apiKey() == ApiKeys.API_VERSIONS && !ApiKeys.API_VERSIONS.isVersionSupported(apiVersion)) {
                        apiVersion = ApiKeys.API_VERSIONS.oldestVersion();
                    }
                    ByteBuf serializeResponse = ResponseUtils.serializeResponse(apiVersion, responseForRequest.getHeader(), responseForRequest.getResponse());
                    if (responseForRequest != null) {
                        if (0 != 0) {
                            try {
                                responseForRequest.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            responseForRequest.close();
                        }
                    }
                    return serializeResponse;
                } finally {
                }
            } finally {
            }
        } finally {
            kafkaHeaderAndRequest.close();
        }
    }

    protected Boolean channelReady() {
        return Boolean.valueOf(hasAuthenticated());
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        ByteBuf byteBuf = (ByteBuf) obj;
        BiConsumer<Long, Throwable> biConsumer = (l, th) -> {
            this.requestStats.getRequestParseLatencyStats().registerSuccessfulEvent(MathUtils.elapsedNanos(l.longValue()), TimeUnit.NANOSECONDS);
        };
        BiConsumer<String, Long> biConsumer2 = (str, l2) -> {
            this.requestStats.getStatsLogger().scopeLabel(KopServerStats.REQUEST_SCOPE, str).getOpStatsLogger(KopServerStats.REQUEST_LATENCY).registerSuccessfulEvent(MathUtils.elapsedNanos(l2.longValue()), TimeUnit.NANOSECONDS);
        };
        if (this.isActive.get() && !channelReady().booleanValue()) {
            try {
                try {
                    channelPrepare(channelHandlerContext, byteBuf, biConsumer, biConsumer2);
                    byteBuf.release();
                    return;
                } catch (AuthenticationException e) {
                    log.error("Failed authentication with [{}] ({})", this.remoteAddress, e.getMessage());
                    maybeDelayCloseOnAuthenticationFailure();
                    byteBuf.release();
                    return;
                }
            } finally {
                byteBuf.release();
            }
        }
        Channel channel = channelHandlerContext.channel();
        SocketAddress socketAddress = null;
        if (null != channel) {
            socketAddress = channel.remoteAddress();
        }
        long nowInNano = MathUtils.nowInNano();
        KafkaHeaderAndRequest byteBufToRequest = byteBufToRequest(byteBuf, socketAddress);
        biConsumer.accept(Long.valueOf(nowInNano), null);
        try {
            try {
                if (log.isDebugEnabled()) {
                    Logger logger = log;
                    Object[] objArr = new Object[3];
                    objArr[0] = channelHandlerContext.channel() != null ? channelHandlerContext.channel().remoteAddress() : "Null channel";
                    objArr[1] = byteBufToRequest.getHeader();
                    objArr[2] = byteBufToRequest;
                    logger.debug("[{}] Received kafka cmd {}, the request content is: {}", objArr);
                }
                CompletableFuture<AbstractResponse> completableFuture = new CompletableFuture<>();
                long nowInNano2 = MathUtils.nowInNano();
                completableFuture.whenComplete((abstractResponse, th2) -> {
                    if (!(th2 instanceof CancellationException)) {
                        biConsumer2.accept(byteBufToRequest.getHeader().apiKey().name, Long.valueOf(nowInNano2));
                        channelHandlerContext.channel().eventLoop().execute(() -> {
                            writeAndFlushResponseToClient(channel);
                        });
                    } else if (log.isDebugEnabled()) {
                        log.debug("[{}] Request {} is cancelled", channelHandlerContext.channel(), byteBufToRequest.getHeader());
                    }
                });
                this.requestQueue.put(ResponseAndRequest.of(completableFuture, byteBufToRequest));
                RequestStats.REQUEST_QUEUE_SIZE_INSTANCE.incrementAndGet();
                if (this.isActive.get()) {
                    switch (AnonymousClass1.$SwitchMap$org$apache$kafka$common$protocol$ApiKeys[byteBufToRequest.getHeader().apiKey().ordinal()]) {
                        case 1:
                            handleApiVersionsRequest(byteBufToRequest, completableFuture);
                            break;
                        case 2:
                            handleTopicMetadataRequest(byteBufToRequest, completableFuture);
                            break;
                        case 3:
                            handleProduceRequest(byteBufToRequest, completableFuture);
                            break;
                        case 4:
                            handleFindCoordinatorRequest(byteBufToRequest, completableFuture);
                            break;
                        case 5:
                            handleListOffsetRequest(byteBufToRequest, completableFuture);
                            break;
                        case 6:
                            handleOffsetFetchRequest(byteBufToRequest, completableFuture);
                            break;
                        case 7:
                            handleOffsetCommitRequest(byteBufToRequest, completableFuture);
                            break;
                        case 8:
                            handleFetchRequest(byteBufToRequest, completableFuture);
                            break;
                        case 9:
                            handleJoinGroupRequest(byteBufToRequest, completableFuture);
                            break;
                        case 10:
                            handleSyncGroupRequest(byteBufToRequest, completableFuture);
                            break;
                        case 11:
                            handleHeartbeatRequest(byteBufToRequest, completableFuture);
                            break;
                        case 12:
                            handleLeaveGroupRequest(byteBufToRequest, completableFuture);
                            break;
                        case 13:
                            handleDescribeGroupRequest(byteBufToRequest, completableFuture);
                            break;
                        case 14:
                            handleListGroupsRequest(byteBufToRequest, completableFuture);
                            break;
                        case StdKeyDeserializer.TYPE_CLASS /* 15 */:
                            handleDeleteGroupsRequest(byteBufToRequest, completableFuture);
                            break;
                        case 16:
                            handleSaslHandshake(byteBufToRequest, completableFuture);
                            break;
                        case 17:
                            handleSaslAuthenticate(byteBufToRequest, completableFuture);
                            break;
                        case LegacyRecord.KEY_OFFSET_V1 /* 18 */:
                            handleCreateTopics(byteBufToRequest, completableFuture);
                            break;
                        case KafkaLZ4BlockOutputStream.LZ4_MAX_HEADER_LENGTH /* 19 */:
                            handleInitProducerId(byteBufToRequest, completableFuture);
                            break;
                        case 20:
                            handleAddPartitionsToTxn(byteBufToRequest, completableFuture);
                            break;
                        case DefaultRecord.MAX_RECORD_OVERHEAD /* 21 */:
                            handleAddOffsetsToTxn(byteBufToRequest, completableFuture);
                            break;
                        case LegacyRecord.RECORD_OVERHEAD_V1 /* 22 */:
                            handleTxnOffsetCommit(byteBufToRequest, completableFuture);
                            break;
                        case DefaultRecordBatch.LAST_OFFSET_DELTA_OFFSET /* 23 */:
                            handleEndTxn(byteBufToRequest, completableFuture);
                            break;
                        case 24:
                            handleWriteTxnMarkers(byteBufToRequest, completableFuture);
                            break;
                        case 25:
                            handleDescribeConfigs(byteBufToRequest, completableFuture);
                            break;
                        case 26:
                            handleDeleteTopics(byteBufToRequest, completableFuture);
                            break;
                        default:
                            handleError(byteBufToRequest, completableFuture);
                            break;
                    }
                } else {
                    handleInactive(byteBufToRequest, completableFuture);
                }
            } catch (Throwable th3) {
                byteBuf.release();
                throw th3;
            }
        } catch (Exception e2) {
            log.error("error while handle command:", (Throwable) e2);
            close();
            byteBuf.release();
        }
    }

    protected void writeAndFlushResponseToClient(Channel channel) {
        ResponseAndRequest peek;
        while (this.isActive.get() && (peek = this.requestQueue.peek()) != null) {
            CompletableFuture<AbstractResponse> responseFuture = peek.getResponseFuture();
            long nanoSecondsSinceCreated = peek.nanoSecondsSinceCreated();
            boolean z = nanoSecondsSinceCreated > TimeUnit.MILLISECONDS.toNanos((long) this.kafkaConfig.getRequestTimeoutMs());
            if (!responseFuture.isDone() && !z) {
                this.requestStats.getResponseBlockedTimes().inc();
                if (peek.getFirstBlockedTimestamp() == 0) {
                    peek.setFirstBlockedTimestamp(MathUtils.nowInNano());
                    return;
                }
                return;
            }
            if (this.requestQueue.remove(peek)) {
                peek.updateStats(this.requestStats);
                if (peek.getFirstBlockedTimestamp() != 0) {
                    this.requestStats.getResponseBlockedLatency().registerSuccessfulEvent(MathUtils.elapsedNanos(peek.getFirstBlockedTimestamp()), TimeUnit.NANOSECONDS);
                }
                KafkaHeaderAndRequest request = peek.getRequest();
                if (responseFuture.isCompletedExceptionally()) {
                    responseFuture.exceptionally(th -> {
                        log.error("[{}] request {} completed exceptionally", channel, request.getHeader(), th);
                        channel.writeAndFlush(request.createErrorResponse(th));
                        this.requestStats.getStatsLogger().scopeLabel(KopServerStats.REQUEST_SCOPE, peek.request.getHeader().apiKey().name).getOpStatsLogger(KopServerStats.REQUEST_QUEUED_LATENCY).registerFailedEvent(MathUtils.elapsedNanos(peek.getCreatedTimestamp()), TimeUnit.NANOSECONDS);
                        return null;
                    });
                } else if (responseFuture.isDone()) {
                    responseFuture.thenAccept(abstractResponse -> {
                        if (abstractResponse == null) {
                            log.error("[{}] Unexpected null completed future for request {}", this.ctx.channel(), request.getHeader());
                            channel.writeAndFlush(request.createErrorResponse(new ApiException("response is null")));
                        } else {
                            if (log.isDebugEnabled()) {
                                log.debug("Write kafka cmd to client. request content: {} responseAndRequest content: {}", request, abstractResponse.toString(request.getRequest().version()));
                            }
                            channel.writeAndFlush(responseToByteBuf(abstractResponse, request)).addListener(future -> {
                                if (abstractResponse instanceof ResponseCallbackWrapper) {
                                    ((ResponseCallbackWrapper) abstractResponse).responseComplete();
                                }
                                if (future.isSuccess()) {
                                    return;
                                }
                                log.error("[{}] Failed to write {}", channel, request.getHeader(), future.cause());
                            });
                        }
                    });
                } else if (z) {
                    log.error("[{}] request {} is not completed for {} ns (> {} ms)", channel, request.getHeader(), Long.valueOf(nanoSecondsSinceCreated), Integer.valueOf(this.kafkaConfig.getRequestTimeoutMs()));
                    responseFuture.cancel(true);
                    channel.writeAndFlush(request.createErrorResponse(new ApiException("request is expired from server side")));
                    this.requestStats.getStatsLogger().scopeLabel(KopServerStats.REQUEST_SCOPE, peek.request.getHeader().apiKey().name).getOpStatsLogger(KopServerStats.REQUEST_QUEUED_LATENCY).registerFailedEvent(MathUtils.elapsedNanos(peek.getCreatedTimestamp()), TimeUnit.NANOSECONDS);
                }
            }
        }
    }

    protected abstract boolean hasAuthenticated();

    protected abstract void channelPrepare(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, BiConsumer<Long, Throwable> biConsumer, BiConsumer<String, Long> biConsumer2) throws AuthenticationException;

    protected abstract void maybeDelayCloseOnAuthenticationFailure();

    protected abstract void completeCloseOnAuthenticationFailure();

    protected abstract void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleInactive(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleApiVersionsRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleTopicMetadataRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleProduceRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleFindCoordinatorRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleListOffsetRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleOffsetFetchRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleOffsetCommitRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleFetchRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleJoinGroupRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleSyncGroupRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleHeartbeatRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleLeaveGroupRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleDescribeGroupRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleListGroupsRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleDeleteGroupsRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleSaslAuthenticate(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleSaslHandshake(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleCreateTopics(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleDescribeConfigs(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleInitProducerId(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleAddPartitionsToTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleAddOffsetsToTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleTxnOffsetCommit(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleEndTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleWriteTxnMarkers(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleDeleteTopics(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    private static boolean isUnsupportedApiVersionsRequest(RequestHeader requestHeader) {
        return requestHeader.apiKey() == ApiKeys.API_VERSIONS && !ApiKeys.API_VERSIONS.isVersionSupported(requestHeader.apiVersion());
    }

    public SocketAddress getRemoteAddress() {
        return this.remoteAddress;
    }

    public AtomicBoolean getIsActive() {
        return this.isActive;
    }

    public KafkaServiceConfiguration getKafkaConfig() {
        return this.kafkaConfig;
    }
}
