package colesico.framework.asyncjob.internal;

import colesico.framework.asyncjob.JobDao;
import colesico.framework.asyncjob.JobQueueConfig;
import colesico.framework.asyncjob.JobService;
import colesico.framework.asyncjob.JobServiceConfig;
import colesico.framework.asyncjob.PayloadConverter;
import colesico.framework.ioc.Polysupplier;
import colesico.framework.ioc.ThreadScope;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:colesico/framework/asyncjob/internal/JobServiceImpl.class */
public class JobServiceImpl implements JobService {
    private final JobServiceConfig srvConfig;
    private final JobDao jobsDao;
    private final PayloadConverter payloadTransformer;
    private final Polysupplier<JobQueueConfig> queueConfigs;
    private final ThreadScope threadScope;
    private QueuePool queuePool;
    private Thread[] workersThreads;
    private final Logger logger = LoggerFactory.getLogger(JobService.class);
    private final Map<Class<?>, QueueRef> queueRefs = new HashMap();
    private final Object workersMonitor = new Object();
    private volatile boolean running = false;

    public JobServiceImpl(JobServiceConfig jobServiceConfig, JobDao jobDao, PayloadConverter payloadConverter, Polysupplier<JobQueueConfig> polysupplier, ThreadScope threadScope) {
        this.srvConfig = jobServiceConfig;
        this.jobsDao = jobDao;
        this.payloadTransformer = payloadConverter;
        this.queueConfigs = polysupplier;
        this.threadScope = threadScope;
    }

    @Override // colesico.framework.asyncjob.JobService
    public synchronized void start() {
        this.logger.debug("Starting jobs service...");
        if (this.running) {
            throw new IllegalStateException("Jobs service is already started");
        }
        HashSet hashSet = new HashSet();
        this.queueRefs.clear();
        this.queueConfigs.forEach(jobQueueConfig -> {
            this.logger.debug("Found job queue definition for payload type '{}' on table '{}'", jobQueueConfig.getPayloadType(), jobQueueConfig.getTableName());
            if (hashSet.contains(jobQueueConfig.getTableName())) {
                throw new RuntimeException("Duplicate job queue table '" + jobQueueConfig.getTableName() + "' defined in " + jobQueueConfig);
            }
            hashSet.add(jobQueueConfig.getTableName());
            if (this.queueRefs.put(jobQueueConfig.getPayloadType(), new QueueRef(jobQueueConfig)) != null) {
                throw new RuntimeException("Duplicate job payload type '" + jobQueueConfig.getPayloadType() + "' defined in " + jobQueueConfig);
            }
        }, (Object) null);
        this.queuePool = new QueuePool((QueueRef[]) this.queueRefs.values().toArray(new QueueRef[this.queueRefs.size()]));
        this.workersThreads = new Thread[this.srvConfig.getWorkersNum()];
        for (int i = 0; i < this.workersThreads.length; i++) {
            String str = "JOB_QUEUE_WORKER_" + i;
            Thread thread = new Thread(new Worker(this.srvConfig, this.jobsDao, this.payloadTransformer, this.queuePool, this.threadScope, this.workersMonitor), str);
            this.workersThreads[i] = thread;
            thread.start();
            this.logger.debug("Job queue worker has started in thread " + str);
        }
        this.running = true;
        this.logger.debug("Jobs service has been started");
    }

    @Override // colesico.framework.asyncjob.JobService
    public synchronized void stop() {
        checkRunning();
        for (Thread thread : this.workersThreads) {
            thread.interrupt();
        }
        this.running = false;
    }

    @Override // colesico.framework.asyncjob.JobService
    public boolean isRunning() {
        return this.running;
    }

    private void checkRunning() {
        if (!this.running) {
            throw new IllegalStateException("Background jobs service is not started");
        }
    }

    private void notifyWorker() {
        synchronized (this.workersMonitor) {
            this.workersMonitor.notify();
        }
    }

    @Override // colesico.framework.asyncjob.JobEnqueuer
    public <P> void enqueue(P p, Duration duration) {
        checkRunning();
        QueueRef queueRef = this.queueRefs.get(p.getClass());
        if (queueRef == null) {
            throw new RuntimeException("Job queue not found for payload type: " + p.getClass());
        }
        String fromObject = this.payloadTransformer.fromObject(p);
        this.srvConfig.getTransactionalShell().requiresNew(() -> {
            this.jobsDao.enqueue(queueRef.getConfig(), fromObject, duration);
            return null;
        });
        this.queuePool.release(queueRef);
        notifyWorker();
    }
}
