package kz.greetgo.kafka.consumer;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import kz.greetgo.kafka.consumer.config.ConsumerReactorConfig;
import kz.greetgo.kafka.core.ConsumerPortionInvoking;
import kz.greetgo.kafka.core.ProducerSynchronizer;
import kz.greetgo.kafka.core.logger.Logger;
import kz.greetgo.kafka.core.logger.LoggerType;
import kz.greetgo.kafka.model.Box;
import kz.greetgo.kafka.serializer.BoxDeserializer;
import kz.greetgo.strconverter.StrConverter;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

/* loaded from: input_file:kz/greetgo/kafka/consumer/ConsumerReactorImpl.class */
public class ConsumerReactorImpl implements ConsumerReactor {
    Logger logger;
    ProducerSynchronizer producerSynchronizer;
    Supplier<StrConverter> strConverter;
    ConsumerDefinition consumerDefinition;
    Supplier<String> bootstrapServers;
    ConsumerReactorConfig consumerConfig;
    Supplier<String> topicPrefix;
    Supplier<String> consumerThreadPrefix;
    BooleanSupplier externalWorkingSupplier;
    private final AtomicBoolean working = new AtomicBoolean(true);
    private final ConcurrentHashMap<Long, Worker> workers = new ConcurrentHashMap<>();
    private final AtomicLong nextValue = new AtomicLong(1);
    private final AtomicReference<Set<String>> currentTopicList = new AtomicReference<>(Set.of());
    private final AtomicBoolean started = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kz/greetgo/kafka/consumer/ConsumerReactorImpl$Worker.class */
    public class Worker extends Thread {
        private final long id;
        private final AtomicBoolean running = new AtomicBoolean(true);

        private Worker() {
            this.id = ConsumerReactorImpl.this.nextValue.getAndIncrement();
        }

        public boolean isRunning() {
            return this.running.get();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            ConsumerRecords<byte[], Box> poll;
            try {
                Thread.currentThread().setName(ConsumerReactorImpl.this.consumerThreadPrefix.get() + ConsumerReactorImpl.this.consumerDefinition.logDisplay() + "-" + this.id);
                if (ConsumerReactorImpl.this.logger.isShow(LoggerType.LOG_START_CONSUMER_WORKER)) {
                    ConsumerReactorImpl.this.logger.logConsumerStartWorker(ConsumerReactorImpl.this.consumerDefinition, this.id);
                }
                HashMap hashMap = new HashMap(ConsumerReactorImpl.this.consumerConfig.getConfigMap());
                hashMap.put("bootstrap.servers", ConsumerReactorImpl.this.bootstrapServers.get());
                hashMap.put("auto.offset.reset", ConsumerReactorImpl.this.consumerDefinition.getAutoOffsetReset().name().toLowerCase());
                hashMap.put("group.id", ConsumerReactorImpl.this.applyPrefix(ConsumerReactorImpl.this.consumerDefinition.getGroupId()));
                hashMap.put("enable.auto.commit", ConsumerReactorImpl.this.consumerDefinition.isAutoCommit() ? "true" : "false");
                if (ConsumerReactorImpl.this.logger.isShow(LoggerType.SHOW_CONSUMER_WORKER_CONFIG)) {
                    ConsumerReactorImpl.this.logger.logConsumerWorkerConfig(ConsumerReactorImpl.this.consumerDefinition, this.id, hashMap);
                }
                ByteArrayDeserializer byteArrayDeserializer = new ByteArrayDeserializer();
                BoxDeserializer boxDeserializer = new BoxDeserializer(ConsumerReactorImpl.this.strConverter.get());
                InvokeSession createSession = ConsumerReactorImpl.this.consumerDefinition.getInvoker().createSession();
                while (ConsumerReactorImpl.this.isWorking() && ConsumerReactorImpl.this.workers.containsKey(Long.valueOf(this.id))) {
                    try {
                        KafkaConsumer kafkaConsumer = new KafkaConsumer(hashMap, byteArrayDeserializer, boxDeserializer);
                        try {
                            kafkaConsumer.subscribe(ConsumerReactorImpl.this.updateCurrentTopicList(ConsumerReactorImpl.this.applyPrefix(ConsumerReactorImpl.this.consumerDefinition.topicList())));
                            while (ConsumerReactorImpl.this.isWorking() && ConsumerReactorImpl.this.workers.containsKey(Long.valueOf(this.id))) {
                                try {
                                    poll = kafkaConsumer.poll(ConsumerReactorImpl.this.consumerConfig.getPollDuration());
                                } catch (RuntimeException e) {
                                    if (ConsumerReactorImpl.this.logger.isShow(LoggerType.LOG_CONSUMER_POLL_EXCEPTION_HAPPENED)) {
                                        ConsumerReactorImpl.this.logger.logConsumerPollExceptionHappened(e, ConsumerReactorImpl.this.consumerDefinition);
                                    }
                                    ConsumerReactorImpl.this.working.set(false);
                                    kafkaConsumer.close();
                                }
                                if (poll.count() != 0) {
                                    long currentTimeMillis = System.currentTimeMillis();
                                    try {
                                        ConsumerPortionInvoking startPortionInvoking = ConsumerReactorImpl.this.producerSynchronizer.startPortionInvoking();
                                        try {
                                            if (createSession.invoke(poll).needToCommit()) {
                                                if (startPortionInvoking != null) {
                                                    startPortionInvoking.close();
                                                }
                                                try {
                                                    kafkaConsumer.commitSync();
                                                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                                                    if (ConsumerReactorImpl.this.logger.isShow(LoggerType.LOG_CONSUMER_POLL_PERFORM_DELAY)) {
                                                        ConsumerReactorImpl.this.logger.logConsumerPollPerformDelay(ConsumerReactorImpl.this.consumerDefinition, currentTimeMillis2, poll);
                                                    }
                                                } catch (RuntimeException e2) {
                                                    if (ConsumerReactorImpl.this.logger.isShow(LoggerType.LOG_CONSUMER_COMMIT_SYNC_EXCEPTION_HAPPENED)) {
                                                        ConsumerReactorImpl.this.logger.logConsumerCommitSyncExceptionHappened(e2, ConsumerReactorImpl.this.consumerDefinition);
                                                    }
                                                    long currentTimeMillis3 = System.currentTimeMillis() - currentTimeMillis;
                                                    if (ConsumerReactorImpl.this.logger.isShow(LoggerType.LOG_CONSUMER_POLL_PERFORM_DELAY)) {
                                                        ConsumerReactorImpl.this.logger.logConsumerPollPerformDelay(ConsumerReactorImpl.this.consumerDefinition, currentTimeMillis3, poll);
                                                    }
                                                    kafkaConsumer.close();
                                                }
                                            } else {
                                                if (startPortionInvoking != null) {
                                                    startPortionInvoking.close();
                                                }
                                                long currentTimeMillis4 = System.currentTimeMillis() - currentTimeMillis;
                                                if (ConsumerReactorImpl.this.logger.isShow(LoggerType.LOG_CONSUMER_POLL_PERFORM_DELAY)) {
                                                    ConsumerReactorImpl.this.logger.logConsumerPollPerformDelay(ConsumerReactorImpl.this.consumerDefinition, currentTimeMillis4, poll);
                                                }
                                                kafkaConsumer.close();
                                            }
                                        } catch (Throwable th) {
                                            if (startPortionInvoking != null) {
                                                try {
                                                    startPortionInvoking.close();
                                                } catch (Throwable th2) {
                                                    th.addSuppressed(th2);
                                                }
                                            }
                                            throw th;
                                        }
                                    } catch (Throwable th3) {
                                        long currentTimeMillis5 = System.currentTimeMillis() - currentTimeMillis;
                                        if (ConsumerReactorImpl.this.logger.isShow(LoggerType.LOG_CONSUMER_POLL_PERFORM_DELAY)) {
                                            ConsumerReactorImpl.this.logger.logConsumerPollPerformDelay(ConsumerReactorImpl.this.consumerDefinition, currentTimeMillis5, poll);
                                        }
                                        throw th3;
                                    }
                                } else if (ConsumerReactorImpl.this.logger.isShow(LoggerType.LOG_CONSUMER_POLL_ZERO)) {
                                    ConsumerReactorImpl.this.logger.logConsumerPollZero(ConsumerReactorImpl.this.consumerDefinition);
                                }
                            }
                            kafkaConsumer.close();
                        } catch (Throwable th4) {
                            try {
                                kafkaConsumer.close();
                            } catch (Throwable th5) {
                                th4.addSuppressed(th5);
                            }
                            throw th4;
                        }
                    } catch (Throwable th6) {
                        if (createSession != null) {
                            try {
                                createSession.close();
                            } catch (Throwable th7) {
                                th6.addSuppressed(th7);
                            }
                        }
                        throw th6;
                    }
                }
                if (createSession != null) {
                    createSession.close();
                }
            } finally {
                this.running.set(false);
                ConsumerReactorImpl.this.workers.remove(Long.valueOf(this.id));
                if (ConsumerReactorImpl.this.logger.isShow(LoggerType.LOG_CONSUMER_FINISH_WORKER)) {
                    ConsumerReactorImpl.this.logger.logConsumerFinishWorker(ConsumerReactorImpl.this.consumerDefinition, this.id);
                }
            }
        }
    }

    @Override // kz.greetgo.kafka.consumer.ConsumerReactor
    public void refreshTopicList() {
        if (isWorking() && !Set.copyOf(applyPrefix(this.consumerDefinition.topicList())).equals(this.currentTopicList.get())) {
            restartAllWorkers();
        }
    }

    private Collection<String> updateCurrentTopicList(Collection<String> collection) {
        if (collection == null) {
            this.currentTopicList.set(Set.of());
            return Set.of();
        }
        this.currentTopicList.set(Set.copyOf(collection));
        return collection;
    }

    @Override // kz.greetgo.kafka.consumer.ConsumerReactor
    public boolean isWorking() {
        return this.working.get() && this.externalWorkingSupplier.getAsBoolean();
    }

    private String applyPrefix(String str) {
        String str2 = this.topicPrefix.get();
        return str2 == null ? str : str2 + str;
    }

    private Collection<String> applyPrefix(Collection<String> collection) {
        String str = this.topicPrefix.get();
        return str == null ? collection : (Collection) collection.stream().map(str2 -> {
            return str + str2;
        }).collect(Collectors.toList());
    }

    @Override // kz.greetgo.kafka.consumer.ConsumerReactor
    public ConsumerDefinition getConsumerDefinition() {
        return this.consumerDefinition;
    }

    @Override // kz.greetgo.kafka.consumer.ConsumerReactor
    public ConsumerReactor stop() {
        this.working.set(false);
        return this;
    }

    @Override // kz.greetgo.kafka.consumer.ConsumerReactor
    public void start() {
        if (this.working.get() && !this.started.get() && this.started.compareAndSet(false, true)) {
            this.consumerConfig.onChangeWorkerCount(this::correctWorkersCount);
            this.consumerConfig.onChangeParams(this::restartAllWorkers);
            correctWorkersCount();
        }
    }

    @Override // kz.greetgo.kafka.consumer.ConsumerReactor
    public void join() {
        while (true) {
            Iterator<Worker> it = this.workers.values().iterator();
            if (!it.hasNext()) {
                return;
            }
            try {
                it.next().join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private void restartAllWorkers() {
        if (this.working.get()) {
            this.workers.clear();
            correctWorkersCount();
        }
    }

    private void correctWorkersCount() {
        if (this.working.get() && this.externalWorkingSupplier.getAsBoolean()) {
            HashSet hashSet = new HashSet();
            int i = 0;
            for (Map.Entry<Long, Worker> entry : this.workers.entrySet()) {
                if (entry.getValue().isRunning()) {
                    i++;
                } else {
                    hashSet.add(entry.getKey());
                }
            }
            ConcurrentHashMap<Long, Worker> concurrentHashMap = this.workers;
            Objects.requireNonNull(concurrentHashMap);
            hashSet.forEach((v1) -> {
                r1.remove(v1);
            });
            int workerCount = this.consumerConfig.getWorkerCount();
            if (this.logger.isShow(LoggerType.LOG_CONSUMER_REACTOR_REFRESH)) {
                this.logger.logConsumerReactorRefresh(this.consumerDefinition, i, workerCount);
            }
            if (workerCount <= i) {
                if (workerCount < i) {
                    removeWorkers(i - workerCount);
                }
            } else {
                int i2 = workerCount - i;
                for (int i3 = 0; i3 < i2; i3++) {
                    appendWorker();
                }
            }
        }
    }

    private void appendWorker() {
        Worker worker = new Worker();
        this.workers.put(Long.valueOf(worker.id), worker);
        worker.start();
    }

    private void removeWorkers(int i) {
        int i2 = 0;
        while (i2 < i) {
            ArrayList arrayList = new ArrayList(this.workers.keySet());
            if (arrayList.isEmpty()) {
                return;
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                Worker remove = this.workers.remove((Long) it.next());
                if (remove != null && remove.isRunning()) {
                    i2++;
                    if (i2 >= i) {
                        return;
                    }
                }
            }
        }
    }
}
