package io.streamnative.pulsar.handlers.kop.format;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.streamnative.pulsar.handlers.kop.exceptions.KoPMessageMetadataNotFoundException;
import io.streamnative.pulsar.handlers.kop.format.EntryFormatterFactory;
import io.streamnative.pulsar.handlers.kop.utils.ByteBufUtils;
import io.streamnative.pulsar.handlers.kop.utils.KopRecordsUtil;
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.class */
public class KafkaEntryFormatter implements EntryFormatter {
    private static final String IDENTITY_KEY = "entry.format";
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaEntryFormatter.class);
    private static final String IDENTITY_VALUE = EntryFormatterFactory.EntryFormat.KAFKA.name().toLowerCase();

    @Override // io.streamnative.pulsar.handlers.kop.format.EntryFormatter
    public ByteBuf encode(MemoryRecords memoryRecords, int i) {
        ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(memoryRecords.buffer());
        ByteBuf serializeMetadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.None, getMessageMetadataWithNumberMessages(i), wrappedBuffer);
        wrappedBuffer.release();
        return serializeMetadataAndPayload;
    }

    @Override // io.streamnative.pulsar.handlers.kop.format.EntryFormatter
    public DecodeResult decode(List<Entry> list, byte b) {
        Optional empty = Optional.empty();
        ArrayList arrayList = new ArrayList();
        for (Entry entry : list) {
            try {
                long peekBaseOffsetFromEntry = MessageIdUtils.peekBaseOffsetFromEntry(entry);
                ByteBuf dataBuffer = entry.getDataBuffer();
                MessageMetadata parseMessageMetadata = Commands.parseMessageMetadata(dataBuffer);
                if (isKafkaEntryFormat(parseMessageMetadata)) {
                    byte b2 = dataBuffer.getByte(dataBuffer.readerIndex() + 16);
                    dataBuffer.setLong(dataBuffer.readerIndex() + 0, peekBaseOffsetFromEntry);
                    if (b2 > b || b2 != 2) {
                        ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(KopRecordsUtil.convertAndAssignOffsets(MemoryRecords.readableRecords(ByteBufUtils.getNioBuffer(dataBuffer)).batches(), b, peekBaseOffsetFromEntry).records().buffer());
                        arrayList.add(wrappedBuffer);
                        if (!empty.isPresent()) {
                            empty = Optional.of(new ArrayList());
                        }
                        empty.ifPresent(list2 -> {
                            list2.add(wrappedBuffer);
                        });
                        if (log.isTraceEnabled()) {
                            log.trace("[{}:{}] convertAndAssignOffsets record for down converted or assign offsets with v0 and v1 magic, start offset {}, entry magic: {}, client magic: {}", Long.valueOf(entry.getLedgerId()), Long.valueOf(entry.getEntryId()), Long.valueOf(peekBaseOffsetFromEntry), Byte.valueOf(b2), Byte.valueOf(b));
                        }
                    } else {
                        arrayList.add(dataBuffer.slice(dataBuffer.readerIndex(), dataBuffer.readableBytes()));
                    }
                } else {
                    ByteBuf orCreateByteBuf = ByteBufUtils.decodePulsarEntryToKafkaRecords(parseMessageMetadata, dataBuffer, peekBaseOffsetFromEntry, b).getOrCreateByteBuf();
                    arrayList.add(orCreateByteBuf);
                    if (!empty.isPresent()) {
                        empty = Optional.of(new ArrayList());
                    }
                    empty.ifPresent(list3 -> {
                        list3.add(orCreateByteBuf);
                    });
                }
            } catch (KoPMessageMetadataNotFoundException | IOException | KafkaException e) {
                log.error("[{}:{}] Failed to decode entry. ", Long.valueOf(entry.getLedgerId()), Long.valueOf(entry.getEntryId()), e);
                entry.release();
            }
        }
        ByteBuf directBuffer = PulsarByteBufAllocator.DEFAULT.directBuffer(arrayList.stream().mapToInt((v0) -> {
            return v0.readableBytes();
        }).sum());
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            directBuffer.writeBytes((ByteBuf) it.next());
        }
        empty.ifPresent(list4 -> {
            list4.forEach((v0) -> {
                v0.release();
            });
        });
        list.forEach((v0) -> {
            v0.release();
        });
        return new DecodeResult(MemoryRecords.readableRecords(ByteBufUtils.getNioBuffer(directBuffer)), directBuffer);
    }

    private static MessageMetadata getMessageMetadataWithNumberMessages(int i) {
        MessageMetadata messageMetadata = new MessageMetadata();
        messageMetadata.addProperty().setKey(IDENTITY_KEY).setValue(IDENTITY_VALUE);
        messageMetadata.setProducerName("");
        messageMetadata.setSequenceId(0L);
        messageMetadata.setPublishTime(System.currentTimeMillis());
        messageMetadata.setNumMessagesInBatch(i);
        return messageMetadata;
    }

    private static boolean isKafkaEntryFormat(MessageMetadata messageMetadata) {
        for (KeyValue keyValue : messageMetadata.getPropertiesList()) {
            if (keyValue.hasKey() && keyValue.getKey().equals(IDENTITY_KEY) && keyValue.getValue().equals(IDENTITY_VALUE)) {
                return true;
            }
        }
        return false;
    }
}
