package kz.greetgo.kafka.core;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import kz.greetgo.kafka.consumer.ConsumerDefinition;
import kz.greetgo.kafka.consumer.ConsumerReactor;
import kz.greetgo.kafka.consumer.InvokeSession;
import kz.greetgo.kafka.consumer.config.ConsumerReactorConfigFactory;
import kz.greetgo.kafka.core.logger.Logger;
import kz.greetgo.kafka.core.logger.LoggerType;
import kz.greetgo.kafka.model.Box;
import kz.greetgo.kafka.producer.ProducerSource;
import kz.greetgo.kafka.producer.RecordData;
import kz.greetgo.kafka.producer.config.ProducerReactorConfigFactory;
import kz.greetgo.kafka.serializer.BoxSerializer;
import kz.greetgo.kafka.util.KeyUtil;
import kz.greetgo.strconverter.StrConverter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.ByteArraySerializer;

/* loaded from: input_file:kz/greetgo/kafka/core/KafkaReactorImpl.class */
public class KafkaReactorImpl extends KafkaReactorAbstract {
    ConsumerReactorConfigFactory consumerConfigFactory;
    ProducerReactorConfigFactory producerConfigFactory;
    Supplier<String> consumerThreadPrefix;
    private final List<ConsumerReactor> consumerReactorList;
    private final List<ConsumerDefinition> directDefinitionList;
    private final ProducerSource producerSource;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaReactorImpl(Logger logger) {
        super(logger);
        this.consumerReactorList = new ArrayList();
        this.directDefinitionList = new ArrayList();
        this.producerSource = new ProducerSource() { // from class: kz.greetgo.kafka.core.KafkaReactorImpl.1
            @Override // kz.greetgo.kafka.producer.ProducerSource
            public Logger logger() {
                return KafkaReactorImpl.this.logger;
            }

            @Override // kz.greetgo.kafka.producer.ProducerSource
            public StrConverter getStrConverter() {
                return KafkaReactorImpl.this.strConverter.get();
            }

            @Override // kz.greetgo.kafka.producer.ProducerSource
            public byte[] extractKey(Object obj) {
                return KeyUtil.extractKey(obj);
            }

            @Override // kz.greetgo.kafka.producer.ProducerSource
            public String author() {
                if (KafkaReactorImpl.this.authorSupplier == null) {
                    return null;
                }
                return KafkaReactorImpl.this.authorSupplier.get();
            }

            @Override // kz.greetgo.kafka.producer.ProducerSource
            public Map<String, String> getConfigFor(String str) {
                return KafkaReactorImpl.this.producerConfigFactory.getConfig(str).getConfigMap();
            }

            @Override // kz.greetgo.kafka.producer.ProducerSource
            public Producer<byte[], Box> createProducer(String str, ByteArraySerializer byteArraySerializer, BoxSerializer boxSerializer) {
                HashMap hashMap = new HashMap(getConfigFor(str));
                hashMap.put("bootstrap.servers", KafkaReactorImpl.this.bootstrapServers.get());
                if (KafkaReactorImpl.this.logger.isShow(LoggerType.SHOW_PRODUCER_CONFIG)) {
                    KafkaReactorImpl.this.logger.logProducerConfigOnCreating(str, hashMap);
                }
                return new KafkaProducer(hashMap, byteArraySerializer, boxSerializer);
            }
        };
    }

    @Override // kz.greetgo.kafka.core.KafkaReactor
    public void startConsumers() {
        accumulateConsumerDefinitionList();
        this.consumerDefinitionList.forEach(consumerDefinition -> {
            this.consumerConfigFactory.getConsumerConfig(consumerDefinition);
        });
        for (ConsumerDefinition consumerDefinition2 : this.consumerDefinitionList) {
            if (consumerDefinition2.isDirect()) {
                this.directDefinitionList.add(consumerDefinition2);
            } else {
                this.consumerReactorList.add(ConsumerReactor.builder().logger(this.logger).producerSynchronizer(this.producerSynchronizer).strConverter(this.strConverter).bootstrapServers(this.bootstrapServers).topicPrefix(this.topicPrefix).consumerDefinition(consumerDefinition2).consumerConfig(this.consumerConfigFactory.getConsumerConfig(consumerDefinition2)).consumerThreadPrefix(this.consumerThreadPrefix).build());
            }
        }
    }

    @Override // kz.greetgo.kafka.core.KafkaReactorAbstract
    protected void afterKafkaSent(RecordData recordData) {
        for (ConsumerDefinition consumerDefinition : this.directDefinitionList) {
            InvokeSession createSession = consumerDefinition.getInvoker().createSession();
            try {
                try {
                    createSession.invoke(recordData.toConsumerRecords());
                } catch (Exception e) {
                    if (this.logger.isShow(LoggerType.LOG_PRODUCER_DIRECT_CONSUMER_ERROR)) {
                        this.logger.logProducerDirectConsumerError("VpJ3Dl9570", e, consumerDefinition);
                    }
                }
                if (createSession != null) {
                    createSession.close();
                }
            } catch (Throwable th) {
                if (createSession != null) {
                    try {
                        createSession.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    @Override // kz.greetgo.kafka.core.KafkaReactor, java.lang.AutoCloseable
    public void close() {
        this.consumerReactorList.forEach((v0) -> {
            v0.stop();
        });
    }

    @Override // kz.greetgo.kafka.core.KafkaReactor
    public void join() {
        this.consumerReactorList.forEach((v0) -> {
            v0.join();
        });
    }

    @Override // kz.greetgo.kafka.core.KafkaReactorAbstract
    public ProducerSource getProducerSource() {
        return this.producerSource;
    }
}
