/*
 * Decompiled with CFR 0.152.
 */
package io.axual.client.consumer.base;

import io.axual.client.config.BaseConsumerConfig;
import io.axual.client.config.DeliveryStrategy;
import io.axual.client.consumer.base.BaseMessage;
import io.axual.client.consumer.base.CommitStrategy;
import io.axual.client.consumer.base.CommitStrategyALO;
import io.axual.client.consumer.base.CommitStrategyAMO;
import io.axual.client.consumer.base.Committer;
import io.axual.client.exception.NoExistingStreamException;
import io.axual.client.proxy.axual.consumer.AxualConsumer;
import io.axual.client.proxy.generic.consumer.ConsumerProxy;
import io.axual.common.config.ClientConfig;
import io.axual.common.tools.KafkaUtil;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseMessageSource<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(BaseMessageSource.class);
    private ConsumerProxy<K, V> consumer = null;
    private final ClientConfig clientConfig;
    private final BaseConsumerConfig<K, V> consumerConfig;
    private final CommitStrategy<K, V> commitStrategy;

    public BaseMessageSource(ClientConfig clientConfig, final BaseConsumerConfig<K, V> consumerConfig) {
        this.clientConfig = clientConfig;
        this.consumerConfig = consumerConfig;
        Committer committer = new Committer<K, V>(){
            private final AtomicLong lastCommitId = new AtomicLong(0L);
            private Map<TopicPartition, OffsetAndMetadata> processed = new HashMap<TopicPartition, OffsetAndMetadata>();

            @Override
            public void markAsProcessed(BaseMessage<K, V> message) {
                this.processed.put(new TopicPartition(message.getRecord().topic(), message.getRecord().partition()), new OffsetAndMetadata(message.getRecord().offset() + 1L, "{\"strategy\":\"" + consumerConfig.getDeliveryStrategy().name() + "\"}"));
            }

            @Override
            public void commitProcessedOffsets(boolean synchronous) {
                if (!this.processed.isEmpty() && BaseMessageSource.this.consumer != null) {
                    long commitId = this.lastCommitId.incrementAndGet();
                    if (!synchronous) {
                        LOG.debug("Commit async: id={}, partition_count={}", (Object)commitId, (Object)this.processed.size());
                        BaseMessageSource.this.consumer.commitAsync(this.processed, (map, e) -> {
                            if (e == null) {
                                LOG.debug("Commit successful: id={}", (Object)commitId);
                            } else {
                                LOG.debug("Commit not successful: id={}", (Object)commitId, (Object)e);
                            }
                        });
                    } else {
                        LOG.debug("Commit sync: id={}", (Object)commitId);
                        BaseMessageSource.this.consumer.commitSync(this.processed);
                    }
                    this.processed.clear();
                }
            }
        };
        this.commitStrategy = consumerConfig.getDeliveryStrategy() == DeliveryStrategy.AT_MOST_ONCE ? new CommitStrategyAMO(committer) : new CommitStrategyALO(committer);
    }

    public BaseConsumerConfig<K, V> getConsumerConfig() {
        return this.consumerConfig;
    }

    public String getInfo() {
        return "stream = " + this.consumerConfig.getStream();
    }

    public List<BaseMessage<K, V>> getMessages() {
        if (this.consumer == null) {
            Map<String, Object> configs = this.getConsumerConfigs();
            configs.put("axualconsumer.chain", this.consumerConfig.getProxyChain());
            LOG.debug("Creating a new Axual consumer with properties: {}", configs);
            this.consumer = new AxualConsumer(configs);
            LOG.debug("Created a new Axual consumer");
            List partitionInfo = this.consumer.partitionsFor(this.consumerConfig.getStream());
            if (partitionInfo == null || partitionInfo.isEmpty()) {
                throw new NoExistingStreamException("No partitions found for stream", this.consumerConfig.getStream());
            }
            this.consumer.subscribe(Collections.singletonList(this.consumerConfig.getStream()));
            LOG.debug("Subscribed consumer to stream: {}", (Object)this.consumerConfig.getStream());
        }
        ConsumerRecords records = this.consumer.poll(Duration.ofMillis(100L));
        ArrayList result = new ArrayList(records.count());
        LOG.debug("Poll retrieved {} messages", (Object)records.count());
        for (ConsumerRecord record : records) {
            result.add(new BaseMessage(record));
        }
        this.commitStrategy.onAfterFetchBatch(result);
        return result;
    }

    public void close() {
        this.commitStrategy.close();
        LOG.info("Closing the Kafka consumer");
        if (this.consumer != null) {
            this.consumer.close();
            this.consumer = null;
            LOG.info("Closed the Kafka consumer");
        } else {
            LOG.info("Kafka consumer was already closed");
        }
    }

    protected Map<String, Object> getConsumerConfigs() {
        Map result = KafkaUtil.getKafkaConfigs((ClientConfig)this.clientConfig);
        result.put("key.deserializer", this.consumerConfig.getKeyDeserializer());
        result.put("value.deserializer", this.consumerConfig.getValueDeserializer());
        result.put("enable.auto.commit", "false");
        result.put("auto.offset.reset", this.consumerConfig.getDeliveryStrategy() == DeliveryStrategy.AT_LEAST_ONCE ? "earliest" : "latest");
        result.put("max.poll.records", this.consumerConfig.getMaximumPollSize().toString());
        return result;
    }

    public void onAfterProcessBatch() {
        this.commitStrategy.onAfterProcessBatch();
    }

    public void onAfterProcessMessage(BaseMessage<K, V> message, Throwable error) {
        this.commitStrategy.onAfterProcessMessage(message, error);
    }
}

