package io.streamnative.pulsar.handlers.kop.format;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.streamnative.pulsar.handlers.kop.utils.ByteBufUtils;
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.class */
public class PulsarEntryFormatter implements EntryFormatter {
    private static final Logger log = LoggerFactory.getLogger(PulsarEntryFormatter.class);
    private static final int DEFAULT_FETCH_BUFFER_SIZE = 1048576;
    private static final int MAX_RECORDS_BUFFER_SIZE = 104857600;
    private static final int INITIAL_BATCH_BUFFER_SIZE = 1024;
    private static final int MAX_MESSAGE_BATCH_SIZE_BYTES = 131072;

    @Override // io.streamnative.pulsar.handlers.kop.format.EntryFormatter
    public ByteBuf encode(MemoryRecords memoryRecords, int i) {
        long j = 0;
        int i2 = 0;
        long j2 = -1;
        PulsarApi.CompressionType compressionType = PulsarApi.CompressionType.NONE;
        ByteBuf buffer = PulsarByteBufAllocator.DEFAULT.buffer(Math.min(INITIAL_BATCH_BUFFER_SIZE, MAX_MESSAGE_BATCH_SIZE_BYTES));
        ArrayList<MessageImpl> newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(i);
        PulsarApi.MessageMetadata.Builder newBuilder = PulsarApi.MessageMetadata.newBuilder();
        StreamSupport.stream(memoryRecords.records().spliterator(), true).forEachOrdered(record -> {
            MessageImpl<byte[]> recordToEntry = recordToEntry(record);
            newArrayListWithExpectedSize.add(recordToEntry);
            if (newBuilder.getPublishTime() <= 0) {
                newBuilder.setPublishTime(recordToEntry.getPublishTime());
            }
        });
        for (MessageImpl messageImpl : newArrayListWithExpectedSize) {
            i2++;
            if (i2 == 1) {
                j2 = Commands.initBatchMessageMetadata(newBuilder, messageImpl.getMessageBuilder());
            }
            j += messageImpl.getDataBuffer().readableBytes();
            if (log.isDebugEnabled()) {
                log.debug("recordsToByteBuf , sequenceId: {}, numMessagesInBatch: {}, currentBatchSizeBytes: {} ", new Object[]{Long.valueOf(j2), Integer.valueOf(i2), Long.valueOf(j)});
            }
            PulsarApi.MessageMetadata.Builder messageBuilder = messageImpl.getMessageBuilder();
            buffer = Commands.serializeSingleMessageInBatchWithPayload(messageBuilder, messageImpl.getDataBuffer(), buffer);
            messageBuilder.recycle();
        }
        int readableBytes = buffer.readableBytes();
        if (PulsarApi.CompressionType.NONE != compressionType) {
            newBuilder.setCompression(compressionType);
            newBuilder.setUncompressedSize(readableBytes);
        }
        newBuilder.setNumMessagesInBatch(i2);
        PulsarApi.MessageMetadata build = newBuilder.build();
        ByteBuf serializeMetadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, build, buffer);
        newBuilder.recycle();
        build.recycle();
        buffer.release();
        return serializeMetadataAndPayload;
    }

    @Override // io.streamnative.pulsar.handlers.kop.format.EntryFormatter
    public MemoryRecords decode(List<Entry> list, byte b) {
        try {
            ByteBufferOutputStream byteBufferOutputStream = new ByteBufferOutputStream(DEFAULT_FETCH_BUFFER_SIZE);
            Throwable th = null;
            try {
                try {
                    MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(byteBufferOutputStream, b, CompressionType.NONE, TimestampType.CREATE_TIME, MessageIdUtils.getOffset(list.get(0).getLedgerId(), list.get(0).getEntryId(), 0), -1L, -1L, (short) -1, -1, false, false, -1, 104857600);
                    list.parallelStream().forEachOrdered(entry -> {
                        ByteBuf dataBuffer = entry.getDataBuffer();
                        PulsarApi.MessageMetadata parseMessageMetadata = Commands.parseMessageMetadata(dataBuffer);
                        try {
                            ByteBuf decode = CompressionCodecProvider.getCompressionCodec(parseMessageMetadata.getCompression()).decode(dataBuffer, parseMessageMetadata.getUncompressedSize());
                            int numMessagesInBatch = parseMessageMetadata.getNumMessagesInBatch();
                            boolean z = numMessagesInBatch == 1 && !parseMessageMetadata.hasNumMessagesInBatch();
                            if (log.isDebugEnabled()) {
                                Logger logger = log;
                                Object[] objArr = new Object[7];
                                objArr[0] = Integer.valueOf(numMessagesInBatch);
                                objArr[1] = Boolean.valueOf(!z);
                                objArr[2] = Integer.valueOf(list.size());
                                objArr[3] = Long.valueOf(entry.getLedgerId());
                                objArr[4] = Long.valueOf(entry.getEntryId());
                                objArr[5] = Integer.valueOf(decode.readerIndex());
                                objArr[6] = Integer.valueOf(decode.writerIndex());
                                logger.debug("entriesToRecords.  NumMessagesInBatch: {}, isBatchMessage: {}, entries in list: {}. new entryId {}:{}, readerIndex: {},  writerIndex: {}", objArr);
                            }
                            Preconditions.checkState(parseMessageMetadata.getEncryptionKeysCount() == 0);
                            if (z) {
                                memoryRecordsBuilder.appendWithOffset(MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId()), parseMessageMetadata.getEventTime() > 0 ? parseMessageMetadata.getEventTime() : parseMessageMetadata.getPublishTime(), ByteBufUtils.getKeyByteBuffer(parseMessageMetadata), ByteBufUtils.getNioBuffer(decode), getHeadersFromMetadata(parseMessageMetadata.getPropertiesList()));
                            } else {
                                IntStream.range(0, numMessagesInBatch).parallel().forEachOrdered(i -> {
                                    if (log.isDebugEnabled()) {
                                        log.debug(" processing message num - {} in batch", Integer.valueOf(i));
                                    }
                                    try {
                                        PulsarApi.SingleMessageMetadata.Builder newBuilder = PulsarApi.SingleMessageMetadata.newBuilder();
                                        ByteBuf deSerializeSingleMessageInBatch = Commands.deSerializeSingleMessageInBatch(decode, newBuilder, i, numMessagesInBatch);
                                        PulsarApi.SingleMessageMetadata build = newBuilder.build();
                                        Header[] headersFromMetadata = getHeadersFromMetadata(build.getPropertiesList());
                                        memoryRecordsBuilder.appendWithOffset(MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId(), i), parseMessageMetadata.getEventTime() > 0 ? parseMessageMetadata.getEventTime() : parseMessageMetadata.getPublishTime(), ByteBufUtils.getKeyByteBuffer(build), build.getNullValue() ? null : ByteBufUtils.getNioBuffer(deSerializeSingleMessageInBatch), headersFromMetadata);
                                        deSerializeSingleMessageInBatch.release();
                                        newBuilder.recycle();
                                    } catch (IOException e) {
                                        log.error("Meet IOException: {}", e);
                                        throw new UncheckedIOException(e);
                                    }
                                });
                            }
                            parseMessageMetadata.recycle();
                            decode.release();
                            entry.release();
                        } catch (IOException e) {
                            log.error("Meet IOException: {}", e);
                            throw new UncheckedIOException(e);
                        }
                    });
                    MemoryRecords build = memoryRecordsBuilder.build();
                    if (byteBufferOutputStream != null) {
                        if (0 != 0) {
                            try {
                                byteBufferOutputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            byteBufferOutputStream.close();
                        }
                    }
                    return build;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            log.error("Meet IOException: {}", e);
            throw new UncheckedIOException(e);
        } catch (Exception e2) {
            log.error("Meet exception: {}", e2);
            throw e2;
        }
    }

    private static MessageImpl<byte[]> recordToEntry(Record record) {
        TypedMessageBuilderImpl typedMessageBuilderImpl = new TypedMessageBuilderImpl((ProducerBase) null, Schema.BYTES);
        if (record.hasKey()) {
            byte[] bArr = new byte[record.keySize()];
            record.key().get(bArr);
            typedMessageBuilderImpl.keyBytes(bArr);
            typedMessageBuilderImpl.orderingKey(bArr);
        }
        if (record.hasValue()) {
            byte[] bArr2 = new byte[record.valueSize()];
            record.value().get(bArr2);
            typedMessageBuilderImpl.value(bArr2);
        } else {
            typedMessageBuilderImpl.value((Object) null);
        }
        if (record.sequence() >= 0) {
            typedMessageBuilderImpl.sequenceId(record.sequence());
        }
        if (record.timestamp() >= 0) {
            typedMessageBuilderImpl.eventTime(record.timestamp());
            typedMessageBuilderImpl.getMetadataBuilder().setPublishTime(record.timestamp());
        } else {
            typedMessageBuilderImpl.getMetadataBuilder().setPublishTime(System.currentTimeMillis());
        }
        for (Header header : record.headers()) {
            typedMessageBuilderImpl.property(header.key(), new String(header.value(), StandardCharsets.UTF_8));
        }
        return typedMessageBuilderImpl.getMessage();
    }

    private Header[] getHeadersFromMetadata(List<PulsarApi.KeyValue> list) {
        Header[] headerArr = new Header[list.size()];
        if (log.isDebugEnabled()) {
            log.debug("getHeadersFromMetadata. Header size: {}", Integer.valueOf(list.size()));
        }
        int i = 0;
        for (PulsarApi.KeyValue keyValue : list) {
            headerArr[i] = new RecordHeader(keyValue.getKey(), keyValue.getValue().getBytes(StandardCharsets.UTF_8));
            if (log.isDebugEnabled()) {
                log.debug("index: {} kv.getKey: {}. kv.getValue: {}", new Object[]{Integer.valueOf(i), keyValue.getKey(), keyValue.getValue()});
            }
            i++;
        }
        return headerArr;
    }
}
