package de.thksystems.util.concurrent.scalingworkerqueue;

import de.thksystems.util.concurrent.Consumers;
import de.thksystems.util.concurrent.ThreadUtils;
import de.thksystems.util.concurrent.scalingworkerqueue.WorkerQueueConfiguration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadFactory;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/thksystems/util/concurrent/scalingworkerqueue/ScalingWorkerQueue.class */
public class ScalingWorkerQueue<E, C extends WorkerQueueConfiguration> {
    private static final Logger LOG = LoggerFactory.getLogger(ScalingWorkerQueue.class);
    private static final long WAIT_FOR_STATUS_PERIOD = 10;
    private long dispatcherThreadId;
    private final Function<Integer, Collection<E>> supplier;
    private final BiConsumer<E, C> worker;
    private final C configuration;
    private Status status = Status.CREATED;
    private ThreadFactory threadFactory = new BasicThreadFactory.Builder().uncaughtExceptionHandler((thread, th) -> {
        LOG.error("Uncaught error in thread '{}': {}", new Object[]{thread, th.getMessage(), th});
    }).build();
    private Function<Thread, String> dispatcherThreadNameSupplier = (v0) -> {
        return v0.getName();
    };
    private BiFunction<Thread, Integer, String> workerThreadNameSupplier = (thread, num) -> {
        return thread.getName();
    };
    private Function<E, Boolean> trylockFunction = obj -> {
        return true;
    };
    private Consumer<E> unlockFunction = Consumers.noOp();
    private Function<E, Boolean> integrityCheckFunction = obj -> {
        return true;
    };
    private Map<ListenerEvent, BiConsumer<Long, E>> eventListenerMap = new HashMap();
    private Queue<E> internalQueue = new ConcurrentLinkedQueue();
    private Set<E> elementsInWork = ConcurrentHashMap.newKeySet();
    private List<ScalingWorkerQueue<E, C>.Runner> runners = new ArrayList();

    /* loaded from: input_file:de/thksystems/util/concurrent/scalingworkerqueue/ScalingWorkerQueue$ListenerEvent.class */
    public enum ListenerEvent {
        ADDED_TO_QUEUE,
        REMOVED_FROM_QUEUE
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:de/thksystems/util/concurrent/scalingworkerqueue/ScalingWorkerQueue$Runner.class */
    public class Runner implements Runnable {
        private final int number;
        private final boolean canDieIfIdle;
        private Long noResultStartTime = null;

        Runner(int i, boolean z) {
            this.number = i;
            this.canDieIfIdle = z;
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Runnable
        public void run() {
            String name = Thread.currentThread().getName();
            try {
                try {
                    Thread.currentThread().setName((String) ScalingWorkerQueue.this.workerThreadNameSupplier.apply(Thread.currentThread(), Integer.valueOf(this.number)));
                    ScalingWorkerQueue.LOG.info("Additional runner started: {} (mandatory: {})", Integer.valueOf(this.number), Boolean.valueOf(!this.canDieIfIdle));
                    long sleepPeriod = ScalingWorkerQueue.this.configuration.getSleepPeriod();
                    long runnerMaxIdlePeriod = ScalingWorkerQueue.this.configuration.getRunnerMaxIdlePeriod();
                    long runnerSleepIdlePeriod = ScalingWorkerQueue.this.configuration.getRunnerSleepIdlePeriod();
                    while (true) {
                        if (ScalingWorkerQueue.this.shouldStop()) {
                            break;
                        }
                        Optional<E> nextElement = ScalingWorkerQueue.this.getNextElement();
                        if (nextElement.isPresent()) {
                            E e = nextElement.get();
                            ScalingWorkerQueue.LOG.info("Got next: {}", e);
                            try {
                                this.noResultStartTime = null;
                                if (((Boolean) ScalingWorkerQueue.this.trylockFunction.apply(e)).booleanValue() && ((Boolean) ScalingWorkerQueue.this.integrityCheckFunction.apply(e)).booleanValue()) {
                                    try {
                                        try {
                                            ScalingWorkerQueue.this.worker.accept(e, ScalingWorkerQueue.this.configuration);
                                            ScalingWorkerQueue.this.unlockFunction.accept(e);
                                        } catch (Throwable th) {
                                            ScalingWorkerQueue.this.unlockFunction.accept(e);
                                            throw th;
                                        }
                                    } catch (Throwable th2) {
                                        ScalingWorkerQueue.LOG.error(th2.getMessage(), th2);
                                        ScalingWorkerQueue.this.unlockFunction.accept(e);
                                    }
                                }
                                ScalingWorkerQueue.this.markElementAsProcessed(e);
                            } catch (Throwable th3) {
                                ScalingWorkerQueue.this.markElementAsProcessed(e);
                                throw th3;
                            }
                        } else {
                            ScalingWorkerQueue.LOG.trace("Got no next element");
                            if (this.noResultStartTime != null) {
                                if (this.canDieIfIdle && System.currentTimeMillis() > this.noResultStartTime.longValue() + runnerMaxIdlePeriod) {
                                    ScalingWorkerQueue.LOG.info("Runner is idle and will be stopped.");
                                    break;
                                }
                            } else {
                                this.noResultStartTime = Long.valueOf(System.currentTimeMillis());
                            }
                            Long valueOf = Long.valueOf(System.currentTimeMillis() + runnerSleepIdlePeriod);
                            while (!ScalingWorkerQueue.this.shouldStop() && System.currentTimeMillis() < valueOf.longValue()) {
                                try {
                                    Thread.sleep(sleepPeriod);
                                } catch (InterruptedException e2) {
                                    throw new UnsupportedOperationException("The runner thread must not interrupted.", e2);
                                }
                            }
                        }
                    }
                    ScalingWorkerQueue.LOG.info("Runner {} stopped", Integer.valueOf(this.number));
                    Thread.currentThread().setName(name);
                    ScalingWorkerQueue.this.removeRunner(this);
                } catch (Exception e3) {
                    ScalingWorkerQueue.LOG.error("Caught exception {}", e3.getMessage(), e3);
                    ScalingWorkerQueue.LOG.info("Runner {} stopped", Integer.valueOf(this.number));
                    Thread.currentThread().setName(name);
                    ScalingWorkerQueue.this.removeRunner(this);
                }
            } catch (Throwable th4) {
                ScalingWorkerQueue.LOG.info("Runner {} stopped", Integer.valueOf(this.number));
                Thread.currentThread().setName(name);
                ScalingWorkerQueue.this.removeRunner(this);
                throw th4;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:de/thksystems/util/concurrent/scalingworkerqueue/ScalingWorkerQueue$Status.class */
    public enum Status {
        CREATED,
        START_TRIGGERED,
        STARTED,
        STOP_TRIGGERED,
        STOPPED
    }

    public ScalingWorkerQueue(C c, Function<Integer, Collection<E>> function, BiConsumer<E, C> biConsumer) {
        this.worker = biConsumer;
        this.configuration = c;
        this.supplier = function;
    }

    public ScalingWorkerQueue<E, C> withDistributedSetup(Function<E, Boolean> function, Consumer<E> consumer, Function<E, Boolean> function2) {
        assertStatusCreated();
        if (function != null) {
            this.trylockFunction = function;
        }
        if (consumer != null) {
            this.unlockFunction = consumer;
        }
        if (function2 != null) {
            this.integrityCheckFunction = function2;
        }
        return this;
    }

    public ScalingWorkerQueue<E, C> withThreadNames(Function<Thread, String> function, BiFunction<Thread, Integer, String> biFunction) {
        assertStatusCreated();
        this.dispatcherThreadNameSupplier = function;
        this.workerThreadNameSupplier = biFunction;
        return this;
    }

    public ScalingWorkerQueue<E, C> withThreadFactory(ThreadFactory threadFactory) {
        assertStatusCreated();
        this.threadFactory = threadFactory;
        return this;
    }

    public ScalingWorkerQueue<E, C> withEventListener(ListenerEvent listenerEvent, BiConsumer<Long, E> biConsumer) {
        assertStatusCreated();
        this.eventListenerMap.put(listenerEvent, biConsumer);
        return this;
    }

    public ScalingWorkerQueue<E, C> withRunnersMonitoring(Consumer<Collection> consumer) {
        consumer.accept(this.runners);
        return this;
    }

    protected void executeEventListener(ListenerEvent listenerEvent, E e) {
        this.eventListenerMap.getOrDefault(listenerEvent, Consumers.noBiOp()).accept(Long.valueOf(this.dispatcherThreadId), e);
    }

    private void assertStatusCreated() {
        if (this.status != Status.CREATED) {
            throw new IllegalStateException("The configuration of the scaling worker queue must not be changed after it is started.");
        }
    }

    public ScalingWorkerQueue<E, C> start() {
        return start(false);
    }

    public ScalingWorkerQueue<E, C> start(boolean z) {
        this.status = Status.START_TRIGGERED;
        this.threadFactory.newThread(this::run).start();
        while (z && !isStarted()) {
            ThreadUtils.sleepWithoutException(WAIT_FOR_STATUS_PERIOD);
        }
        return this;
    }

    public boolean isStarted() {
        return this.status == Status.STARTED;
    }

    public ScalingWorkerQueue stop() {
        return stop(false);
    }

    public ScalingWorkerQueue stop(boolean z) {
        LOG.info("Requesting stop of worker queue");
        this.status = Status.STOP_TRIGGERED;
        while (z && !isStopped()) {
            ThreadUtils.sleepWithoutException(WAIT_FOR_STATUS_PERIOD);
        }
        return this;
    }

    public boolean shouldStop() {
        return this.status == Status.STOP_TRIGGERED;
    }

    public boolean isStopped() {
        return this.status == Status.STOPPED;
    }

    private void run() {
        int runnersCount;
        String name = Thread.currentThread().getName();
        try {
            this.dispatcherThreadId = Thread.currentThread().getId();
            Thread.currentThread().setName(this.dispatcherThreadNameSupplier.apply(Thread.currentThread()));
            LOG.info("Worker queue started");
            this.status = Status.STARTED;
            int minElementsCountToSupply = this.configuration.getMinElementsCountToSupply();
            int spareElementsCountToSupply = this.configuration.getSpareElementsCountToSupply();
            int countOfElementsPerRunner = this.configuration.getCountOfElementsPerRunner();
            int maxRunnerCount = this.configuration.getMaxRunnerCount();
            int minRunnerCount = this.configuration.getMinRunnerCount();
            long dispatcherWaitPeriodOnEmptyFetch = this.configuration.getDispatcherWaitPeriodOnEmptyFetch();
            long sleepPeriod = this.configuration.getSleepPeriod();
            int sleepPeriodCountOnError = this.configuration.getSleepPeriodCountOnError();
            while (!shouldStop()) {
                try {
                    LOG.trace("Fetching additional elements");
                    int max = Math.max(minElementsCountToSupply, (countOfElementsPerRunner * this.runners.size()) + spareElementsCountToSupply);
                    Collection<E> apply = this.supplier.apply(Integer.valueOf(max));
                    if (apply.size() > 0) {
                        LOG.debug("Fetched {} additional elements", Integer.valueOf(apply.size()));
                        if (apply.size() > max) {
                            LOG.warn("Fetching more elements than wanted: {} > {}.", Integer.valueOf(apply.size()), Integer.valueOf(max));
                        }
                    } else {
                        LOG.trace("Fetched no additional elements");
                    }
                    Long l = null;
                    Long l2 = 0L;
                    for (E e : apply) {
                        if (this.internalQueue.contains(e) || this.elementsInWork.contains(e)) {
                            LOG.debug("Skipping fetched element. It is already in the internal queue or currently processed: {}", e);
                        } else {
                            LOG.trace("Adding fetched element to internal queue: {}", e);
                            this.internalQueue.add(e);
                            executeEventListener(ListenerEvent.ADDED_TO_QUEUE, e);
                            l2 = Long.valueOf(l2.longValue() + 1);
                        }
                    }
                    if (l2.longValue() == 0) {
                        LOG.trace("No (new) element fetched. Waiting some time.");
                        l = Long.valueOf(System.currentTimeMillis() + dispatcherWaitPeriodOnEmptyFetch);
                    }
                    while (!shouldStop() && this.runners.size() < Math.min(maxRunnerCount, apply.size() / countOfElementsPerRunner)) {
                        ScalingWorkerQueue<E, C>.Runner runner = new Runner(this.runners.size(), this.runners.size() > minRunnerCount);
                        this.runners.add(runner);
                        this.threadFactory.newThread(runner).start();
                    }
                    while (!shouldStop() && ((this.internalQueue.size() >= spareElementsCountToSupply && l == null) || (l != null && System.currentTimeMillis() < l.longValue()))) {
                        try {
                            Thread.sleep(sleepPeriod);
                        } catch (InterruptedException e2) {
                            throw new UnsupportedOperationException("The scaling worker queue must not interrupted. Use stop() instead.", e2);
                            break;
                        }
                    }
                } catch (Exception e3) {
                    LOG.error("Caught exception: {} -> Sleeping some time ({} ms)", new Object[]{e3.getMessage(), Long.valueOf(sleepPeriodCountOnError * sleepPeriod), e3});
                    long j = sleepPeriodCountOnError;
                    while (!shouldStop()) {
                        long j2 = j;
                        j = j2 - 1;
                        if (j2 < 0) {
                            break;
                        } else {
                            ThreadUtils.sleepWithoutException(sleepPeriod);
                        }
                    }
                }
            }
            while (true) {
                if (runnersCount <= 0) {
                    return;
                }
            }
        } finally {
            while (getRunnersCount() > 0) {
                ThreadUtils.sleepWithoutException(WAIT_FOR_STATUS_PERIOD);
            }
            this.status = Status.STOPPED;
            LOG.info("Worker queue stopped");
            Thread.currentThread().setName(name);
        }
    }

    synchronized Optional<E> getNextElement() {
        E peek = this.internalQueue.peek();
        if (peek != null) {
            this.elementsInWork.add(peek);
            this.internalQueue.remove();
        }
        return Optional.ofNullable(peek);
    }

    boolean hasNextElement() {
        return !this.internalQueue.isEmpty();
    }

    void markElementAsProcessed(E e) {
        if (e != null) {
            this.elementsInWork.remove(e);
            executeEventListener(ListenerEvent.REMOVED_FROM_QUEUE, e);
        }
    }

    void removeRunner(ScalingWorkerQueue<E, C>.Runner runner) {
        this.runners.remove(runner);
    }

    public int getRunnersCount() {
        return this.runners.size();
    }
}
