package io.github.askmeagain.meshinery.connectors.kafka.factories;

import io.github.askmeagain.meshinery.connectors.kafka.MeshineryKafkaProperties;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/github/askmeagain/meshinery/connectors/kafka/factories/KafkaConsumerFactory.class */
public class KafkaConsumerFactory implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerFactory.class);
    private final Map<List<String>, KafkaConsumer<String, byte[]>> consumers = new ConcurrentHashMap();
    private final Properties properties = new Properties();
    private boolean disposed;

    public KafkaConsumerFactory(MeshineryKafkaProperties meshineryKafkaProperties) {
        this.properties.putAll(meshineryKafkaProperties.getConsumerProperties());
        this.properties.setProperty("bootstrap.servers", meshineryKafkaProperties.getBootstrapServers());
        this.properties.setProperty("group.id", meshineryKafkaProperties.getGroupId());
        this.properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        this.properties.setProperty("auto.offset.reset", "earliest");
    }

    public KafkaConsumer<String, byte[]> get(List<String> list) {
        return this.consumers.computeIfAbsent(list, this::createKafkaConsumer);
    }

    private KafkaConsumer<String, byte[]> createKafkaConsumer(List<String> list) {
        log.info("Creating kafka consumer for topic '{}'", list);
        Properties properties = new Properties();
        properties.putAll(this.properties);
        properties.setProperty("group.id", this.properties.getProperty("group.id") + "-" + list);
        KafkaConsumer<String, byte[]> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(list);
        kafkaConsumer.poll(0L);
        return kafkaConsumer;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.disposed) {
            return;
        }
        this.consumers.forEach((list, kafkaConsumer) -> {
            kafkaConsumer.close();
        });
        this.disposed = true;
    }
}
