package io.streamnative.pulsar.handlers.kop.format;

import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.streamnative.pulsar.handlers.kop.exceptions.KoPMessageMetadataNotFoundException;
import io.streamnative.pulsar.handlers.kop.utils.ByteBufUtils;
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.StreamSupport;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.class */
public class PulsarEntryFormatter implements EntryFormatter {
    private static final int INITIAL_BATCH_BUFFER_SIZE = 1024;
    private static final int MAX_MESSAGE_BATCH_SIZE_BYTES = 131072;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PulsarEntryFormatter.class);
    private static final DecodeResult EMPTY_DECODE_RESULT = new DecodeResult(MemoryRecords.readableRecords(ByteBuffer.allocate(0)));

    @Override // io.streamnative.pulsar.handlers.kop.format.EntryFormatter
    public ByteBuf encode(MemoryRecords memoryRecords, int i) {
        long j = 0;
        int i2 = 0;
        long j2 = -1;
        ByteBuf buffer = PulsarByteBufAllocator.DEFAULT.buffer(Math.min(INITIAL_BATCH_BUFFER_SIZE, MAX_MESSAGE_BATCH_SIZE_BYTES));
        ArrayList<MessageImpl> newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(i);
        MessageMetadata messageMetadata = new MessageMetadata();
        memoryRecords.batches().forEach(mutableRecordBatch -> {
            StreamSupport.stream(mutableRecordBatch.spliterator(), true).forEachOrdered(record -> {
                newArrayListWithExpectedSize.add(recordToEntry(record));
                if (mutableRecordBatch.isTransactional()) {
                    messageMetadata.setTxnidMostBits(mutableRecordBatch.producerId());
                    messageMetadata.setTxnidLeastBits(mutableRecordBatch.producerEpoch());
                }
            });
        });
        for (MessageImpl messageImpl : newArrayListWithExpectedSize) {
            i2++;
            if (i2 == 1) {
                j2 = Commands.initBatchMessageMetadata(messageMetadata, messageImpl.getMessageBuilder());
            }
            j += messageImpl.getDataBuffer().readableBytes();
            if (log.isDebugEnabled()) {
                log.debug("recordsToByteBuf , sequenceId: {}, numMessagesInBatch: {}, currentBatchSizeBytes: {} ", Long.valueOf(j2), Integer.valueOf(i2), Long.valueOf(j));
            }
            buffer = Commands.serializeSingleMessageInBatchWithPayload(messageImpl.getMessageBuilder(), messageImpl.getDataBuffer(), buffer);
        }
        messageMetadata.setNumMessagesInBatch(i2);
        ByteBuf serializeMetadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, buffer);
        buffer.release();
        return serializeMetadataAndPayload;
    }

    @Override // io.streamnative.pulsar.handlers.kop.format.EntryFormatter
    public DecodeResult decode(List<Entry> list, byte b) {
        ArrayList arrayList = new ArrayList();
        list.parallelStream().forEachOrdered(entry -> {
            try {
                try {
                    long peekBaseOffsetFromEntry = MessageIdUtils.peekBaseOffsetFromEntry(entry);
                    ByteBuf dataBuffer = entry.getDataBuffer();
                    Commands.skipBrokerEntryMetadataIfExist(dataBuffer);
                    arrayList.add(ByteBufUtils.decodePulsarEntryToKafkaRecords(Commands.parseMessageMetadata(dataBuffer), dataBuffer, peekBaseOffsetFromEntry, b));
                    entry.release();
                } catch (KoPMessageMetadataNotFoundException | IOException e) {
                    log.error("[{}:{}] Failed to decode entry", Long.valueOf(entry.getLedgerId()), Long.valueOf(entry.getEntryId()));
                    entry.release();
                }
            } catch (Throwable th) {
                entry.release();
                throw th;
            }
        });
        if (arrayList.isEmpty()) {
            return EMPTY_DECODE_RESULT;
        }
        if (arrayList.size() == 1) {
            return (DecodeResult) arrayList.get(0);
        }
        ByteBuf directBuffer = PulsarByteBufAllocator.DEFAULT.directBuffer(arrayList.stream().mapToInt(decodeResult -> {
            return decodeResult.getRecords().sizeInBytes();
        }).sum());
        arrayList.forEach(decodeResult2 -> {
            directBuffer.writeBytes(decodeResult2.getRecords().buffer());
        });
        arrayList.forEach((v0) -> {
            v0.release();
        });
        return new DecodeResult(MemoryRecords.readableRecords(directBuffer.nioBuffer()), directBuffer);
    }

    private static MessageImpl<byte[]> recordToEntry(Record record) {
        TypedMessageBuilderImpl typedMessageBuilderImpl = new TypedMessageBuilderImpl((ProducerBase) null, Schema.BYTES);
        if (record.hasKey()) {
            byte[] bArr = new byte[record.keySize()];
            record.key().get(bArr);
            typedMessageBuilderImpl.keyBytes(bArr);
            typedMessageBuilderImpl.orderingKey(bArr);
        }
        if (record.hasValue()) {
            byte[] bArr2 = new byte[record.valueSize()];
            record.value().get(bArr2);
            typedMessageBuilderImpl.value(bArr2);
        } else {
            typedMessageBuilderImpl.value((Object) null);
        }
        typedMessageBuilderImpl.getMetadataBuilder().setProducerName("");
        if (record.sequence() >= 0) {
            typedMessageBuilderImpl.sequenceId(record.sequence());
        } else {
            typedMessageBuilderImpl.sequenceId(0L);
        }
        if (record.timestamp() >= 0) {
            typedMessageBuilderImpl.eventTime(record.timestamp());
            typedMessageBuilderImpl.getMetadataBuilder().setPublishTime(record.timestamp());
        } else {
            typedMessageBuilderImpl.getMetadataBuilder().setPublishTime(System.currentTimeMillis());
        }
        for (Header header : record.headers()) {
            typedMessageBuilderImpl.property(header.key(), new String(header.value(), StandardCharsets.UTF_8));
        }
        return typedMessageBuilderImpl.getMessage();
    }
}
