package org.springframework.integration.kafka.core;

import com.gs.collections.api.block.function.Function;
import com.gs.collections.impl.block.factory.Functions;
import com.gs.collections.impl.list.mutable.FastList;
import com.gs.collections.impl.map.mutable.UnifiedMap;
import com.gs.collections.impl.utility.ListIterate;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import kafka.client.ClientUtils$;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataResponse;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;
import scala.collection.JavaConversions;

/* loaded from: input_file:org/springframework/integration/kafka/core/DefaultConnectionFactory.class */
public class DefaultConnectionFactory implements InitializingBean, ConnectionFactory, DisposableBean {
    private final Configuration configuration;
    private final GetBrokersByPartitionFunction getBrokersByPartitionFunction = new GetBrokersByPartitionFunction();
    private final ConnectionInstantiationFunction connectionInstantiationFunction = new ConnectionInstantiationFunction();
    private final AtomicReference<PartitionBrokerMap> partitionBrokerMapReference = new AtomicReference<>();
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private String clientId = KafkaConsumerDefaults.GROUP_ID;
    private int minBytes = 1;
    private int maxWait = 100;
    private int bufferSize = KafkaConsumerDefaults.SOCKET_BUFFER_SIZE_INT;
    private int socketTimeout = KafkaConsumerDefaults.SOCKET_TIMEOUT_INT;
    private int fetchMetadataTimeout = KafkaConsumerDefaults.FETCH_METADATA_TIMEOUT;
    private final UnifiedMap<BrokerAddress, Connection> kafkaBrokersCache = UnifiedMap.newMap();

    /* loaded from: input_file:org/springframework/integration/kafka/core/DefaultConnectionFactory$ConnectionInstantiationFunction.class */
    private class ConnectionInstantiationFunction implements Function<BrokerAddress, Connection> {
        private ConnectionInstantiationFunction() {
        }

        public Connection valueOf(BrokerAddress brokerAddress) {
            return new DefaultConnection(brokerAddress, DefaultConnectionFactory.this.clientId, DefaultConnectionFactory.this.bufferSize, DefaultConnectionFactory.this.socketTimeout, DefaultConnectionFactory.this.minBytes, DefaultConnectionFactory.this.maxWait);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/integration/kafka/core/DefaultConnectionFactory$GetBrokersByPartitionFunction.class */
    public class GetBrokersByPartitionFunction implements Function<Partition, BrokerAddress> {
        private GetBrokersByPartitionFunction() {
        }

        public BrokerAddress valueOf(Partition partition) {
            return (BrokerAddress) ((PartitionBrokerMap) DefaultConnectionFactory.this.partitionBrokerMapReference.get()).getBrokersByPartition().get(partition);
        }
    }

    public DefaultConnectionFactory(Configuration configuration) {
        this.configuration = configuration;
    }

    public Configuration getConfiguration() {
        return this.configuration;
    }

    public void afterPropertiesSet() throws Exception {
        Assert.notNull(this.configuration, "Kafka configuration cannot be empty");
        refreshLeaders(this.configuration.getDefaultTopic() == null ? Collections.emptyList() : Collections.singletonList(this.configuration.getDefaultTopic()));
    }

    public void destroy() throws Exception {
        Iterator it = this.kafkaBrokersCache.iterator();
        while (it.hasNext()) {
            ((Connection) it.next()).close();
        }
    }

    public void setMinBytes(int i) {
        this.minBytes = i;
    }

    public void setMaxWait(int i) {
        this.maxWait = i;
    }

    public String getClientId() {
        return this.clientId;
    }

    public void setClientId(String str) {
        this.clientId = str;
    }

    public void setBufferSize(int i) {
        this.bufferSize = i;
    }

    public void setSocketTimeout(int i) {
        this.socketTimeout = i;
    }

    public void setFetchMetadataTimeout(int i) {
        this.fetchMetadataTimeout = i;
    }

    @Override // org.springframework.integration.kafka.core.ConnectionFactory
    public Map<Partition, BrokerAddress> getLeaders(Iterable<Partition> iterable) {
        return FastList.newList(iterable).toMap(Functions.getPassThru(), this.getBrokersByPartitionFunction);
    }

    @Override // org.springframework.integration.kafka.core.ConnectionFactory
    public BrokerAddress getLeader(Partition partition) {
        try {
            this.lock.readLock().lock();
            BrokerAddress brokerAddress = getLeaders(Collections.singleton(partition)).get(partition);
            this.lock.readLock().unlock();
            return brokerAddress;
        } catch (Throwable th) {
            this.lock.readLock().unlock();
            throw th;
        }
    }

    @Override // org.springframework.integration.kafka.core.ConnectionFactory
    public Connection connect(BrokerAddress brokerAddress) {
        return (Connection) this.kafkaBrokersCache.getIfAbsentPutWithKey(brokerAddress, this.connectionInstantiationFunction);
    }

    @Override // org.springframework.integration.kafka.core.ConnectionFactory
    public void refreshLeaders(Collection<String> collection) {
        try {
            this.lock.writeLock().lock();
            Iterator it = this.kafkaBrokersCache.iterator();
            while (it.hasNext()) {
                ((Connection) it.next()).close();
            }
            TopicMetadataResponse topicMetadataResponse = new TopicMetadataResponse(ClientUtils$.MODULE$.fetchTopicMetadata(JavaConversions.asScalaSet(new HashSet(collection)), ClientUtils$.MODULE$.parseBrokerList(ListIterate.collect(this.configuration.getBrokerAddresses(), Functions.getToString()).makeString(",")), getClientId(), this.fetchMetadataTimeout, 0));
            HashMap hashMap = new HashMap();
            for (TopicMetadata topicMetadata : topicMetadataResponse.topicsMetadata()) {
                for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
                    hashMap.put(new Partition(topicMetadata.topic(), partitionMetadata.partitionId()), new BrokerAddress(partitionMetadata.leader().host(), partitionMetadata.leader().port()));
                }
            }
            this.partitionBrokerMapReference.set(new PartitionBrokerMap(UnifiedMap.newMap(hashMap)));
            this.lock.writeLock().unlock();
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    @Override // org.springframework.integration.kafka.core.ConnectionFactory
    public Collection<Partition> getPartitions(String str) {
        if (getPartitionBrokerMap().getPartitionsByTopic().containsKey(str)) {
            return getPartitionBrokerMap().getPartitionsByTopic().get(str).toList();
        }
        throw new TopicNotFoundException(str);
    }

    private PartitionBrokerMap getPartitionBrokerMap() {
        return this.partitionBrokerMapReference.get();
    }
}
