package io.streamnative.pulsar.handlers.kop.coordinator.group;

import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.TopicPartition;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.class */
public class OffsetAcker implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(OffsetAcker.class);
    private final ConsumerBuilder<byte[]> consumerBuilder;
    Map<String, Map<TopicPartition, CompletableFuture<Consumer<byte[]>>>> consumers = new ConcurrentHashMap();

    public OffsetAcker(PulsarClientImpl pulsarClientImpl) {
        this.consumerBuilder = pulsarClientImpl.newConsumer().receiverQueueSize(0).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
    }

    public void addOffsetsTracker(String str, byte[] bArr) {
        PartitionAssignor.Assignment deserializeAssignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(bArr));
        if (log.isDebugEnabled()) {
            log.debug(" Add offsets after sync group: {}", deserializeAssignment.toString());
        }
        deserializeAssignment.partitions().forEach(topicPartition -> {
            getConsumer(str, topicPartition);
        });
    }

    public void ackOffsets(String str, Map<TopicPartition, OffsetAndMetadata> map) {
        if (log.isDebugEnabled()) {
            log.debug(" ack offsets after commit offset for group: {}", str);
            map.forEach((topicPartition, offsetAndMetadata) -> {
                log.debug("\t partition: {}, offset: {}", topicPartition, MessageIdUtils.getPosition(offsetAndMetadata.offset()));
            });
        }
        map.forEach((topicPartition2, offsetAndMetadata2) -> {
            getConsumer(str, topicPartition2).whenComplete((consumer, th) -> {
                if (th != null) {
                    log.warn("Error when get consumer for offset ack:", th);
                } else {
                    consumer.acknowledgeCumulativeAsync(MessageIdUtils.getMessageId(offsetAndMetadata2.offset()));
                }
            });
        });
    }

    public void close(Set<String> set) {
        set.forEach(str -> {
            this.consumers.get(str).values().forEach(completableFuture -> {
                completableFuture.whenComplete((consumer, th) -> {
                    if (th != null) {
                        log.warn("Error when get consumer for consumer group close:", th);
                        return;
                    }
                    try {
                        consumer.close();
                    } catch (Exception e) {
                        log.warn("Error when close consumer topic: {}, sub: {}.", new Object[]{consumer.getTopic(), consumer.getSubscription(), e});
                    }
                });
            });
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        log.info("close OffsetAcker with {} groupIds", Integer.valueOf(this.consumers.size()));
        close(this.consumers.keySet());
    }

    private CompletableFuture<Consumer<byte[]>> getConsumer(String str, TopicPartition topicPartition) {
        return this.consumers.computeIfAbsent(str, str2 -> {
            return new ConcurrentHashMap();
        }).computeIfAbsent(topicPartition, topicPartition2 -> {
            return createConsumer(str, topicPartition2);
        });
    }

    private CompletableFuture<Consumer<byte[]>> createConsumer(String str, TopicPartition topicPartition) {
        return this.consumerBuilder.clone().topic(new String[]{new KopTopic(topicPartition.topic()).getPartitionName(topicPartition.partition())}).subscriptionName(str).subscribeAsync();
    }
}
