package io.streamnative.pulsar.handlers.kop;

import io.netty.buffer.ByteBuf;
import io.streamnative.pulsar.handlers.kop.utils.MessageRecordUtils;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/PendingProduce.class */
public class PendingProduce {
    private static final Logger log = LoggerFactory.getLogger(PendingProduce.class);
    private final CompletableFuture<ProduceResponse.PartitionResponse> responseFuture;
    private final KafkaTopicManager topicManager;
    private final String partitionName;
    private final int numMessages;
    private final CompletableFuture<PersistentTopic> topicFuture;
    private final CompletableFuture<ByteBuf> byteBufFuture = new CompletableFuture<>();
    private CompletableFuture<Long> offsetFuture;

    public PendingProduce(CompletableFuture<ProduceResponse.PartitionResponse> completableFuture, KafkaTopicManager kafkaTopicManager, String str, MemoryRecords memoryRecords, ExecutorService executorService) {
        this.responseFuture = completableFuture;
        this.topicManager = kafkaTopicManager;
        this.partitionName = str;
        this.numMessages = parseNumMessages(memoryRecords);
        this.topicFuture = kafkaTopicManager.getTopic(str).exceptionally(th -> {
            log.error("Failed to getTopic for partition '{}': {}", str, th);
            return null;
        });
        this.byteBufFuture.exceptionally(th2 -> {
            log.error("Failed to compute ByteBuf for partition '{}': {}", str, th2);
            return null;
        });
        executorService.execute(() -> {
            this.byteBufFuture.complete(MessageRecordUtils.recordsToByteBuf(memoryRecords, this.numMessages));
        });
        this.offsetFuture = new CompletableFuture<>();
    }

    public boolean ready() {
        return this.topicFuture.isDone() && this.byteBufFuture.isDone();
    }

    public void whenComplete(Runnable runnable) {
        CompletableFuture.allOf(this.topicFuture, this.byteBufFuture).whenComplete((r7, th) -> {
            if (th == null) {
                runnable.run();
                return;
            }
            if (this.topicFuture.isCompletedExceptionally()) {
                this.responseFuture.complete(new ProduceResponse.PartitionResponse(Errors.LEADER_NOT_AVAILABLE));
            } else if (this.byteBufFuture.isCompletedExceptionally()) {
                this.responseFuture.complete(new ProduceResponse.PartitionResponse(Errors.CORRUPT_MESSAGE));
            } else {
                this.responseFuture.completeExceptionally(th);
            }
        });
    }

    public void publishMessages() {
        if (!ready()) {
            throw new RuntimeException("Try to send while PendingProduce is not ready");
        }
        try {
            PersistentTopic persistentTopic = this.topicFuture.get();
            ByteBuf byteBuf = this.byteBufFuture.get();
            if (log.isDebugEnabled()) {
                log.debug("publishMessages for topic partition: {}, records size is {}", this.partitionName, Integer.valueOf(this.numMessages));
            }
            this.topicManager.registerProducerInPersistentTopic(this.partitionName, persistentTopic);
            this.topicManager.getReferenceProducer(this.partitionName).getTopic().incrementPublishCount(this.numMessages, byteBuf.readableBytes());
            persistentTopic.publishMessage(byteBuf, MessagePublishContext.get(this.offsetFuture, persistentTopic, System.nanoTime()));
            this.offsetFuture.whenComplete((l, th) -> {
                if (th == null) {
                    this.responseFuture.complete(new ProduceResponse.PartitionResponse(Errors.NONE, l.longValue(), -1L, -1L));
                } else {
                    log.error("publishMessages for topic partition: {} failed when write.", this.partitionName, th);
                    this.responseFuture.complete(new ProduceResponse.PartitionResponse(Errors.KAFKA_STORAGE_ERROR));
                }
                byteBuf.release();
            });
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private static int parseNumMessages(MemoryRecords memoryRecords) {
        int i = 0;
        for (Record record : memoryRecords.records()) {
            i++;
        }
        return i;
    }
}
