/*
 * Decompiled with CFR 0.152.
 */
package io.axual.client.producer.generic;

import io.axual.client.config.BaseProducerConfig;
import io.axual.client.producer.generic.GenericProducer;
import io.axual.client.producer.generic.ProducerWorker;
import io.axual.common.config.ClientConfig;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ProducerWorkerManager
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(ProducerWorkerManager.class);
    private static final Duration JOIN_TIMOUT = Duration.ofSeconds(10L);
    private final Map<GenericProducer, ProducerWorker> registry = new ConcurrentHashMap<GenericProducer, ProducerWorker>();
    private final AtomicBoolean isClosed = new AtomicBoolean(false);

    public synchronized <K, V> ProducerWorker<K, V> claimWorker(ClientConfig clientConfig, GenericProducer<K, V> producer, BaseProducerConfig<K, V> producerConfig) {
        if (this.isClosed.get()) {
            throw new IllegalStateException("The ProducerWorkerManager is already closed");
        }
        if (LOG.isInfoEnabled()) {
            LOG.info(String.format("Claiming worker\tProducer                 : %s\tSink                     : %s\tDeliveryStrategy         : %s\tOrderingStrategy         : %s\tMessageBufferSize        : %d\tmessageBufferWaitTimeout : %d\tblockedMessageInsert     : %s", new Object[]{producer, producerConfig, producerConfig.getDeliveryStrategy(), producerConfig.getOrderingStrategy(), producerConfig.getMessageBufferSize(), producerConfig.getMessageBufferWaitTimeout(), producerConfig.isBlocking()}));
        }
        if (this.registry.containsKey(producer)) {
            LOG.debug("Producer found in registry");
            return this.registry.get(producer);
        }
        LOG.debug("Producer not found in registry, starting new ProducerWorker");
        ProducerWorker worker = new ProducerWorker(clientConfig, producerConfig);
        worker.start();
        this.registry.put(producer, worker);
        return worker;
    }

    public synchronized <K, V> void releaseWorker(GenericProducer<K, V> producer) {
        LOG.info("Releasing producer {}", producer);
        if (this.registry.containsKey(producer)) {
            LOG.debug("Send ProducerWorker cancel command");
            this.stopWorker(this.registry.get(producer));
            this.registry.remove(producer);
        } else {
            LOG.debug("Producer not found in registry");
        }
    }

    @Override
    public void close() {
        if (this.isClosed.compareAndSet(false, true)) {
            LOG.info("Shutting down ProducerWorkerManager, cancelling all workers");
            for (ProducerWorker worker : this.registry.values()) {
                this.stopWorker(worker);
            }
        } else {
            LOG.warn("ProducerWorkerManager already closed, ignoring...");
        }
    }

    private void stopWorker(ProducerWorker worker) {
        worker.cancel();
        long startTime = System.currentTimeMillis();
        try {
            worker.join(JOIN_TIMOUT.toMillis());
        }
        catch (InterruptedException e) {
            LOG.warn("Interrupted while waiting for ProducerWorker to stop, ignoring");
            Thread.currentThread().interrupt();
        }
        Duration duration = Duration.ofMillis(System.currentTimeMillis() - startTime);
        LOG.info("Stopping producer worker took {}", (Object)duration);
    }
}

