/*
 * Decompiled with CFR 0.152.
 */
package io.axual.client.producer.generic;

import io.axual.client.config.BaseProducerConfig;
import io.axual.client.config.DeliveryStrategy;
import io.axual.client.config.OrderingStrategy;
import io.axual.client.exception.BufferFullException;
import io.axual.client.exception.ProducerWorkerCancelledException;
import io.axual.client.producer.ProducedMessage;
import io.axual.client.producer.generic.ProduceFuture;
import io.axual.client.producer.generic.ProduceJob;
import io.axual.client.proxy.axual.producer.AxualProducer;
import io.axual.client.proxy.generic.producer.ProducerProxy;
import io.axual.common.config.ClientConfig;
import io.axual.common.tools.KafkaUtil;
import io.axual.common.tools.SleepUtil;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerWorker<K, V>
extends Thread {
    private static final AtomicInteger WORKERCOUNT = new AtomicInteger(0);
    private static final Logger LOG = LoggerFactory.getLogger(ProducerWorker.class);
    private final AtomicBoolean isCancelled = new AtomicBoolean(false);
    private final ClientConfig clientConfig;
    private final BaseProducerConfig producerConfig;
    private final boolean blockedMessageInsert;
    private final int messageBufferSize;
    private final long messageBufferWaitTimeout;
    private final ArrayBlockingQueue<ProduceJob<K, V>> messageBuffer;
    private final ProducerProxy<K, V> producer;
    private final AtomicBoolean isRunning;

    public ProducerWorker(ClientConfig clientConfig, BaseProducerConfig producerConfig) {
        LOG.info("Creating new ProducerWorker");
        this.clientConfig = clientConfig;
        this.producerConfig = producerConfig;
        this.messageBufferSize = producerConfig.getMessageBufferSize();
        this.messageBufferWaitTimeout = producerConfig.getMessageBufferWaitTimeout();
        this.blockedMessageInsert = producerConfig.isBlocking();
        this.messageBuffer = new ArrayBlockingQueue(this.messageBufferSize);
        this.producer = this.createNewKafkaProducer();
        this.isRunning = new AtomicBoolean(false);
    }

    private ProducerProxy<K, V> createNewKafkaProducer() {
        Map configs = KafkaUtil.getKafkaConfigs((ClientConfig)this.clientConfig);
        configs.put("key.serializer", this.producerConfig.getKeySerializer());
        configs.put("value.serializer", this.producerConfig.getValueSerializer());
        configs.put("acks", this.producerConfig.getDeliveryStrategy() == DeliveryStrategy.AT_LEAST_ONCE ? "-1" : "0");
        configs.put("retries", "0");
        configs.put("reconnect.backoff.ms", "1000");
        configs.put("retry.backoff.ms", "1000");
        configs.put("max.in.flight.requests.per.connection", this.producerConfig.getOrderingStrategy() == OrderingStrategy.LOSING_ORDER ? "10" : "1");
        configs.put("linger.ms", this.producerConfig.getLingerMs());
        configs.put("batch.size", this.producerConfig.getBatchSize());
        configs.put("axualproducer.chain", this.producerConfig.getProxyChain());
        LOG.info("Creating a new Axual producer with properties: {}", (Object)configs);
        AxualProducer result = new AxualProducer(configs);
        LOG.info("Created a new Axual producer");
        return result;
    }

    @Override
    public synchronized void start() {
        this.isRunning.set(true);
        super.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        int producerWorkerNr = WORKERCOUNT.getAndIncrement();
        Thread.currentThread().setName(ProducerWorker.class.getSimpleName() + Integer.toString(producerWorkerNr));
        LOG.info("Starting ProducerWorker thread {}", (Object)Thread.currentThread().getName());
        try {
            while (!this.isCancelled.get()) {
                this.takeJob(this.producer);
            }
            LOG.info("ProducerWorker cancelled, flushing {} messages in buffer", (Object)this.messageBuffer.size());
            for (ProduceJob<K, V> job : this.messageBuffer) {
                job.execute(this.producer);
            }
            this.messageBuffer.clear();
        }
        catch (Exception e) {
            LOG.error("ProducerWorker caught exception", (Throwable)e);
        }
        finally {
            this.isRunning.set(false);
            this.producer.close();
        }
        LOG.info("Exiting ProducerWorker thread {}", (Object)Thread.currentThread().getName());
    }

    void cancel() {
        LOG.info("ProducerWorker cancel requested");
        this.isCancelled.set(true);
    }

    private void takeJob(ProducerProxy<K, V> producer) {
        LOG.debug("Taking new job from queue");
        try {
            ProduceJob<K, V> job = this.messageBuffer.poll(100L, TimeUnit.MILLISECONDS);
            if (job != null) {
                this.executeJob(producer, job);
            }
        }
        catch (InterruptedException e) {
            if (this.isCancelled.get()) {
                LOG.info("The ProducerWorker got cancelled and the take from messageBuffer was interrupted");
            } else {
                LOG.warn("Take from messageBuffer was interrupted while the ProducerWorker is not cancelled", (Throwable)e);
            }
            Thread.currentThread().interrupt();
        }
    }

    private void executeJob(ProducerProxy<K, V> producer, ProduceJob<K, V> job) {
        try {
            LOG.debug("Producing message");
            ProduceFuture<K, V> future = job.execute(producer);
            if (this.producerConfig.getOrderingStrategy() == OrderingStrategy.KEEPING_ORDER) {
                while (!future.isDone()) {
                    SleepUtil.sleep((Duration)Duration.ofMillis(25L));
                }
            }
        }
        catch (Exception e) {
            LOG.error("Produce message failed", (Throwable)e);
            job.completeProduce("Produce message failed", e);
        }
    }

    public Future<ProducedMessage<K, V>> queueJob(ProduceJob<K, V> produceJob) {
        if (this.isCancelled.get()) {
            LOG.error("Trying to produce while the ProducerWorker is cancelled");
            produceJob.completeProduce((Throwable)((Object)new ProducerWorkerCancelledException("Cannot produce messages on cancelled ProducerWorker")));
            return produceJob.getFuture();
        }
        if (!this.isRunning.get()) {
            LOG.error("Trying to produce while the ProducerWorker is stopped or failed");
            produceJob.completeProduce((Throwable)((Object)new ProducerWorkerCancelledException("Cannot produce messages on stopped ProducerWorker")));
            return produceJob.getFuture();
        }
        try {
            if (this.blockedMessageInsert) {
                this.messageBuffer.put(produceJob);
                return produceJob.getFuture();
            }
            if (this.messageBufferWaitTimeout < 1L) {
                if (!this.messageBuffer.offer(produceJob)) {
                    produceJob.completeProduce((Throwable)((Object)new BufferFullException(String.format("Could not add message to buffer. Buffer maximum size is %d", this.messageBufferSize))));
                }
                return produceJob.getFuture();
            }
            if (!this.messageBuffer.offer(produceJob, this.messageBufferWaitTimeout, TimeUnit.MILLISECONDS)) {
                produceJob.completeProduce((Throwable)((Object)new BufferFullException(String.format("Could not add message to buffer. Buffer wait timeout is %d. Buffer maximum size is %d", this.messageBufferWaitTimeout, this.messageBufferSize))));
            }
            return produceJob.getFuture();
        }
        catch (InterruptedException e) {
            LOG.warn("Put message into buffer was interrupted");
            produceJob.completeProduce("Put message into buffer was interrupted", e);
            Thread.currentThread().interrupt();
            return produceJob.getFuture();
        }
    }
}

