package pl.allegro.tech.hermes.common.kafka;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.UncheckedExecutionException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import pl.allegro.tech.hermes.common.broker.BrokerStorage;
import pl.allegro.tech.hermes.common.exception.BrokerNotFoundForPartitionException;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;

/* loaded from: input_file:pl/allegro/tech/hermes/common/kafka/KafkaConsumerPool.class */
public class KafkaConsumerPool {
    private final LoadingCache<Integer, KafkaConsumer<byte[], byte[]>> kafkaConsumers;
    private final BrokerStorage brokerStorage;

    /* loaded from: input_file:pl/allegro/tech/hermes/common/kafka/KafkaConsumerPool$KafkaConsumerRemoveListener.class */
    private static class KafkaConsumerRemoveListener implements RemovalListener<Integer, KafkaConsumer<byte[], byte[]>> {
        private KafkaConsumerRemoveListener() {
        }

        public void onRemoval(RemovalNotification<Integer, KafkaConsumer<byte[], byte[]>> removalNotification) {
            ((KafkaConsumer) removalNotification.getValue()).close();
        }
    }

    /* loaded from: input_file:pl/allegro/tech/hermes/common/kafka/KafkaConsumerPool$KafkaConsumerSupplier.class */
    private static class KafkaConsumerSupplier extends CacheLoader<Integer, KafkaConsumer<byte[], byte[]>> {
        private final KafkaConsumerPoolConfig poolConfig;
        private final String configuredBootstrapServers;

        KafkaConsumerSupplier(KafkaConsumerPoolConfig kafkaConsumerPoolConfig, String str) {
            this.poolConfig = kafkaConsumerPoolConfig;
            this.configuredBootstrapServers = str;
        }

        public KafkaConsumer<byte[], byte[]> load(Integer num) throws Exception {
            return createKafkaConsumer();
        }

        private KafkaConsumer<byte[], byte[]> createKafkaConsumer() {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", this.configuredBootstrapServers);
            properties.put("group.id", this.poolConfig.getIdPrefix() + HermesMetrics.REPLACEMENT_CHAR + this.poolConfig.getConsumerGroupName());
            properties.put("receive.buffer.bytes", Integer.valueOf(this.poolConfig.getBufferSizeBytes()));
            properties.put("enable.auto.commit", false);
            properties.put("fetch.max.wait.ms", Integer.valueOf(this.poolConfig.getFetchMaxWaitMillis()));
            properties.put("fetch.min.bytes", Integer.valueOf(this.poolConfig.getFetchMinBytes()));
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            if (this.poolConfig.isSaslEnabled()) {
                properties.put("sasl.mechanism", this.poolConfig.getSecurityMechanism());
                properties.put("security.protocol", this.poolConfig.getSecurityProtocol());
                properties.put("sasl.jaas.config", this.poolConfig.getSaslJaasConfig());
            }
            return new KafkaConsumer<>(properties);
        }
    }

    public KafkaConsumerPool(KafkaConsumerPoolConfig kafkaConsumerPoolConfig, BrokerStorage brokerStorage, String str) {
        this.brokerStorage = brokerStorage;
        this.kafkaConsumers = CacheBuilder.newBuilder().expireAfterAccess(kafkaConsumerPoolConfig.getCacheExpirationSeconds(), TimeUnit.SECONDS).removalListener(new KafkaConsumerRemoveListener()).build(new KafkaConsumerSupplier(kafkaConsumerPoolConfig, str));
    }

    public KafkaConsumer<byte[], byte[]> get(KafkaTopic kafkaTopic, int i) {
        return get(kafkaTopic.name().asString(), i);
    }

    public KafkaConsumer<byte[], byte[]> get(String str, int i) {
        try {
            return (KafkaConsumer) this.kafkaConsumers.get(Integer.valueOf(this.brokerStorage.readLeaderForPartition(new TopicPartition(str, i))));
        } catch (ExecutionException e) {
            throw new KafkaConsumerPoolException(String.format("Cannot get KafkaConsumer for topic %s and partition %d", str, Integer.valueOf(i)), e);
        } catch (UncheckedExecutionException e2) {
            if (e2.getCause() instanceof BrokerNotFoundForPartitionException) {
                throw ((BrokerNotFoundForPartitionException) e2.getCause());
            }
            throw e2;
        }
    }
}
