package colesico.framework.asyncjob.internal;

import colesico.framework.asyncjob.JobDao;
import colesico.framework.asyncjob.JobQueueConfig;
import colesico.framework.asyncjob.JobRecord;
import colesico.framework.asyncjob.JobServiceConfig;
import colesico.framework.asyncjob.PayloadConverter;
import colesico.framework.ioc.ThreadScope;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:colesico/framework/asyncjob/internal/Worker.class */
public final class Worker implements Runnable {
    private final Logger logger = LoggerFactory.getLogger(Worker.class);
    private final JobServiceConfig srvConfig;
    private final JobDao jobsDao;
    private final PayloadConverter payloadTransformer;
    private final QueuePool queuePool;
    private final ThreadScope threadScope;
    private final Object workersMonitor;
    private String workerThreadName;

    public Worker(JobServiceConfig jobServiceConfig, JobDao jobDao, PayloadConverter payloadConverter, QueuePool queuePool, ThreadScope threadScope, Object obj) {
        this.srvConfig = jobServiceConfig;
        this.jobsDao = jobDao;
        this.payloadTransformer = payloadConverter;
        this.queuePool = queuePool;
        this.threadScope = threadScope;
        this.workersMonitor = obj;
    }

    private void idle() {
        try {
            synchronized (this.workersMonitor) {
                this.workersMonitor.wait(this.srvConfig.getWorkerIdleTimeoutMs());
            }
        } catch (InterruptedException e) {
            this.logger.warn("Job queue worker process has interrupted");
        }
    }

    private Object transformPayload(JobQueueConfig jobQueueConfig, JobRecord jobRecord) {
        return this.payloadTransformer.toObject(jobQueueConfig.getPayloadType(), jobRecord.getPayload());
    }

    private void notifyWorker() {
        synchronized (this.workersMonitor) {
            this.workersMonitor.notify();
        }
    }

    private void processQueue(QueueRef queueRef) {
        boolean isDebugEnabled = this.logger.isDebugEnabled();
        if (isDebugEnabled) {
            this.logger.debug("Process queue " + queueRef.getConfig() + " in thread " + this.workerThreadName);
        }
        try {
            this.srvConfig.getTransactionalShell().requiresNew(() -> {
                JobQueueConfig config = queueRef.getConfig();
                JobRecord pick = this.jobsDao.pick(config);
                if (pick == null) {
                    if (!isDebugEnabled) {
                        return null;
                    }
                    this.logger.debug("Queue is empty: " + queueRef.getConfig());
                    return null;
                }
                this.queuePool.release(queueRef);
                notifyWorker();
                Object transformPayload = transformPayload(config, pick);
                if (isDebugEnabled) {
                    try {
                        this.logger.debug("Call job consumer: " + queueRef.getConfig() + " with payload: " + pick.getPayload());
                    } catch (Exception e) {
                        this.logger.error("Call job consumer error: " + ExceptionUtils.getRootCauseMessage(e), e);
                    }
                }
                config.getJobConsumer().consume(transformPayload);
                return pick;
            });
        } catch (Exception e) {
            this.logger.error("Error processing job {}. Message: {}" + this.jobsDao, ExceptionUtils.getRootCauseMessage(e), e);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        this.workerThreadName = Thread.currentThread().getName();
        while (!Thread.currentThread().isInterrupted()) {
            try {
                QueueRef capture = this.queuePool.capture(this.srvConfig.getQueueCaptureTimeoutMs());
                if (capture == null) {
                    idle();
                } else {
                    processQueue(capture);
                }
            } finally {
                this.threadScope.destroy();
            }
        }
    }
}
