package io.streamnative.pulsar.handlers.kop;

import com.google.common.annotations.VisibleForTesting;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/InternalServerCnx.class */
public class InternalServerCnx extends ServerCnx {
    KafkaRequestHandler kafkaRequestHandler;
    private volatile long kopMessagePublishBufferSize;
    private static final Logger log = LoggerFactory.getLogger(InternalServerCnx.class);
    private static final AtomicLongFieldUpdater<InternalServerCnx> KOP_MSG_PUBLISH_BUFFER_SIZE_UPDATER = AtomicLongFieldUpdater.newUpdater(InternalServerCnx.class, "kopMessagePublishBufferSize");

    public InternalServerCnx(KafkaRequestHandler kafkaRequestHandler) {
        super(kafkaRequestHandler.getPulsarService());
        this.kopMessagePublishBufferSize = 0L;
        this.kafkaRequestHandler = kafkaRequestHandler;
        this.remoteAddress = kafkaRequestHandler.getRemoteAddress();
        if (this.remoteAddress == null) {
            this.remoteAddress = new InetSocketAddress("localhost", 9999);
        }
    }

    public void closeProducer(Producer producer) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Removed topic: {}'s producer: {}.", new Object[]{this.remoteAddress, producer.getTopic().getName(), producer});
        }
        this.kafkaRequestHandler.close();
    }

    public void updateCtx() {
        this.remoteAddress = this.kafkaRequestHandler.getRemoteAddress();
    }

    public void enableCnxAutoRead() {
        if (this.kafkaRequestHandler.ctx.channel().config().isAutoRead()) {
            return;
        }
        this.kafkaRequestHandler.ctx.channel().config().setAutoRead(true);
        this.kafkaRequestHandler.ctx.read();
        if (log.isDebugEnabled()) {
            log.debug("Channel {}  auto read has set to true.", this.kafkaRequestHandler.ctx.channel());
        }
    }

    public void disableCnxAutoRead() {
        if (this.kafkaRequestHandler.ctx.channel().config().isAutoRead()) {
            this.kafkaRequestHandler.ctx.channel().config().setAutoRead(false);
            if (log.isDebugEnabled()) {
                log.debug("Channel {} auto read has set to false.", this.kafkaRequestHandler.ctx.channel());
            }
        }
    }

    public void increasePublishBuffer(long j) {
        KOP_MSG_PUBLISH_BUFFER_SIZE_UPDATER.getAndAdd(this, j);
        if (getBrokerService().isReachMessagePublishBufferThreshold()) {
            disableCnxAutoRead();
        }
    }

    public void decreasePublishBuffer(long j) {
        KOP_MSG_PUBLISH_BUFFER_SIZE_UPDATER.getAndAdd(this, -j);
    }

    public long getMessagePublishBufferSize() {
        return this.kopMessagePublishBufferSize;
    }

    public void cancelPublishBufferLimiting() {
    }

    @VisibleForTesting
    public void setMessagePublishBufferSize(long j) {
        this.kopMessagePublishBufferSize = j;
    }

    public KafkaRequestHandler getKafkaRequestHandler() {
        return this.kafkaRequestHandler;
    }
}
