package org.springframework.integration.kafka.listener;

import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import kafka.common.ErrorMapping;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.kafka.core.BrokerAddress;
import org.springframework.integration.kafka.core.ConnectionFactory;
import org.springframework.integration.kafka.core.ConsumerException;
import org.springframework.integration.kafka.core.KafkaConsumerDefaults;
import org.springframework.integration.kafka.core.Partition;
import org.springframework.integration.kafka.core.Result;
import org.springframework.integration.metadata.MetadataStore;
import org.springframework.integration.metadata.SimpleMetadataStore;

/* loaded from: input_file:org/springframework/integration/kafka/listener/MetadataStoreOffsetManager.class */
public class MetadataStoreOffsetManager implements OffsetManager {
    private static final Log log = LogFactory.getLog(MetadataStoreOffsetManager.class);
    private String consumerId;
    private MetadataStore metadataStore;
    private ConnectionFactory connectionFactory;
    private Map<Partition, Long> initialOffsets;
    private long referenceTimestamp;

    public MetadataStoreOffsetManager(ConnectionFactory connectionFactory) {
        this(connectionFactory, null);
    }

    public MetadataStoreOffsetManager(ConnectionFactory connectionFactory, Map<Partition, Long> map) {
        this.consumerId = KafkaConsumerDefaults.GROUP_ID;
        this.metadataStore = new SimpleMetadataStore();
        this.referenceTimestamp = KafkaConsumerDefaults.DEFAULT_OFFSET_RESET;
        this.connectionFactory = connectionFactory;
        this.initialOffsets = map == null ? new HashMap<>() : map;
    }

    public String getConsumerId() {
        return this.consumerId;
    }

    public void setConsumerId(String str) {
        this.consumerId = str;
    }

    public MetadataStore getMetadataStore() {
        return this.metadataStore;
    }

    public void setMetadataStore(MetadataStore metadataStore) {
        this.metadataStore = metadataStore;
    }

    public long getReferenceTimestamp() {
        return this.referenceTimestamp;
    }

    public void setReferenceTimestamp(long j) {
        this.referenceTimestamp = j;
    }

    @Override // org.springframework.integration.kafka.listener.OffsetManager
    public synchronized void updateOffset(Partition partition, long j) {
        this.metadataStore.put(asKey(partition), Long.toString(j));
    }

    @Override // org.springframework.integration.kafka.listener.OffsetManager
    public synchronized long getOffset(Partition partition) {
        Long offsetFromMetadataStore = getOffsetFromMetadataStore(partition);
        if (offsetFromMetadataStore != null) {
            return offsetFromMetadataStore.longValue();
        }
        if (this.initialOffsets.containsKey(partition)) {
            return this.initialOffsets.get(partition).longValue();
        }
        BrokerAddress leader = this.connectionFactory.getLeader(partition);
        if (leader == null) {
            throw new ConsumerException("No leader found for " + partition.toString());
        }
        Result<Long> fetchInitialOffset = this.connectionFactory.connect(leader).fetchInitialOffset(this.referenceTimestamp, partition);
        if (fetchInitialOffset.getErrors().size() > 0) {
            throw new ConsumerException(ErrorMapping.exceptionFor(fetchInitialOffset.getError(partition)));
        }
        if (fetchInitialOffset.getResults().containsKey(partition)) {
            return fetchInitialOffset.getResult(partition).longValue();
        }
        throw new IllegalStateException("Result does not contain an expected value");
    }

    @Override // org.springframework.integration.kafka.listener.OffsetManager
    public synchronized void resetOffsets(Collection<Partition> collection) {
        for (Partition partition : collection) {
            this.metadataStore.remove(asKey(partition));
            this.initialOffsets.remove(partition);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        flush();
        if (this.metadataStore instanceof Closeable) {
            this.metadataStore.close();
        }
    }

    @Override // java.io.Flushable
    public void flush() throws IOException {
        if (this.metadataStore instanceof Flushable) {
            this.metadataStore.flush();
        }
    }

    private Long getOffsetFromMetadataStore(Partition partition) {
        String str = this.metadataStore.get(asKey(partition));
        Long l = null;
        if (str != null) {
            try {
                l = Long.valueOf(Long.parseLong(str));
            } catch (NumberFormatException e) {
                log.warn("Invalid value: " + str);
            }
        }
        return l;
    }

    private String asKey(Partition partition) {
        return partition.getTopic() + ":" + partition.getId() + ":" + this.consumerId;
    }
}
