package ca.uhn.fhir.batch2.maintenance;

import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IReductionStepExecutorService;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.util.StopWatch;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;

/* loaded from: input_file:ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.class */
public class JobInstanceProcessor {
    private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
    public static final long PURGE_THRESHOLD = 604800000;
    private final IJobPersistence myJobPersistence;
    private final BatchJobSender myBatchJobSender;
    private final JobChunkProgressAccumulator myProgressAccumulator;
    private final JobInstanceProgressCalculator myJobInstanceProgressCalculator;
    private final JobInstanceStatusUpdater myJobInstanceStatusUpdater;
    private final IReductionStepExecutorService myReductionStepExecutorService;
    private final String myInstanceId;
    private final JobDefinitionRegistry myJobDefinitionegistry;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: ca.uhn.fhir.batch2.maintenance.JobInstanceProcessor$1, reason: invalid class name */
    /* loaded from: input_file:ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum = new int[StatusEnum.values().length];

        static {
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum[StatusEnum.QUEUED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum[StatusEnum.FINALIZE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum[StatusEnum.IN_PROGRESS.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum[StatusEnum.ERRORED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum[StatusEnum.COMPLETED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum[StatusEnum.FAILED.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum[StatusEnum.CANCELLED.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
        }
    }

    public JobInstanceProcessor(IJobPersistence iJobPersistence, BatchJobSender batchJobSender, String str, JobChunkProgressAccumulator jobChunkProgressAccumulator, IReductionStepExecutorService iReductionStepExecutorService, JobDefinitionRegistry jobDefinitionRegistry) {
        this.myJobPersistence = iJobPersistence;
        this.myBatchJobSender = batchJobSender;
        this.myInstanceId = str;
        this.myProgressAccumulator = jobChunkProgressAccumulator;
        this.myReductionStepExecutorService = iReductionStepExecutorService;
        this.myJobDefinitionegistry = jobDefinitionRegistry;
        this.myJobInstanceProgressCalculator = new JobInstanceProgressCalculator(iJobPersistence, jobChunkProgressAccumulator, jobDefinitionRegistry);
        this.myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(jobDefinitionRegistry);
    }

    public void process() {
        ourLog.debug("Starting job processing: {}", this.myInstanceId);
        StopWatch stopWatch = new StopWatch();
        JobInstance orElse = this.myJobPersistence.fetchInstance(this.myInstanceId).orElse(null);
        if (orElse == null) {
            return;
        }
        if (handleCancellation(orElse)) {
            orElse = this.myJobPersistence.fetchInstance(this.myInstanceId).orElseThrow();
        }
        cleanupInstance(orElse);
        triggerGatedExecutions(orElse);
        ourLog.debug("Finished job processing: {} - {}", this.myInstanceId, stopWatch);
    }

    private boolean handleCancellation(JobInstance jobInstance) {
        if (!jobInstance.isPendingCancellationRequest()) {
            return false;
        }
        String buildCancelledMessage = buildCancelledMessage(jobInstance);
        ourLog.info("Job {} moving to CANCELLED", jobInstance.getInstanceId());
        return this.myJobPersistence.updateInstance(jobInstance.getInstanceId(), jobInstance2 -> {
            boolean updateInstanceStatus = this.myJobInstanceStatusUpdater.updateInstanceStatus(jobInstance2, StatusEnum.CANCELLED);
            if (updateInstanceStatus) {
                jobInstance2.setErrorMessage(buildCancelledMessage);
            }
            return updateInstanceStatus;
        });
    }

    private String buildCancelledMessage(JobInstance jobInstance) {
        String str;
        str = "Job instance cancelled";
        return jobInstance.hasGatedStep() ? str + " while running step " + jobInstance.getCurrentGatedStepId() : "Job instance cancelled";
    }

    private void cleanupInstance(JobInstance jobInstance) {
        switch (AnonymousClass1.$SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum[jobInstance.getStatus().ordinal()]) {
            case 2:
                return;
            case WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT /* 3 */:
            case 4:
                this.myJobInstanceProgressCalculator.calculateAndStoreInstanceProgress(jobInstance.getInstanceId());
                break;
            case JobMaintenanceServiceImpl.MAINTENANCE_TRIGGER_RUN_WITHOUT_SCHEDULER_TIMEOUT /* 5 */:
            case 6:
                if (purgeExpiredInstance(jobInstance)) {
                    return;
                }
                break;
            case 7:
                purgeExpiredInstance(jobInstance);
                return;
        }
        if (!jobInstance.isFinished() || jobInstance.isWorkChunksPurged()) {
            return;
        }
        this.myJobPersistence.deleteChunksAndMarkInstanceAsChunksPurged(jobInstance.getInstanceId());
    }

    private boolean purgeExpiredInstance(JobInstance jobInstance) {
        if (jobInstance.getEndTime() == null) {
            return false;
        }
        if (jobInstance.getEndTime().getTime() >= System.currentTimeMillis() - PURGE_THRESHOLD) {
            return false;
        }
        ourLog.info("Deleting old job instance {}", jobInstance.getInstanceId());
        this.myJobPersistence.deleteInstanceAndChunks(jobInstance.getInstanceId());
        return true;
    }

    private void triggerGatedExecutions(JobInstance jobInstance) {
        if (!jobInstance.isRunning()) {
            ourLog.debug("JobInstance {} is not in a \"running\" state. Status {}", jobInstance.getInstanceId(), jobInstance.getStatus());
            return;
        }
        if (jobInstance.hasGatedStep()) {
            JobDefinition jobDefinitionOrThrowException = this.myJobDefinitionegistry.getJobDefinitionOrThrowException(jobInstance);
            JobWorkCursor fromJobDefinitionAndRequestedStepId = JobWorkCursor.fromJobDefinitionAndRequestedStepId(jobDefinitionOrThrowException, jobInstance.getCurrentGatedStepId());
            if (fromJobDefinitionAndRequestedStepId.isFinalStep() && !fromJobDefinitionAndRequestedStepId.isReductionStep()) {
                ourLog.debug("Job instance {} is in final step and it's not a reducer step", jobInstance.getInstanceId());
                return;
            }
            String instanceId = jobInstance.getInstanceId();
            String currentStepId = fromJobDefinitionAndRequestedStepId.getCurrentStepId();
            if (!this.myJobPersistence.canAdvanceInstanceToNextStep(instanceId, currentStepId)) {
                ourLog.debug("Not ready to advance gated execution of instance {} from step {} to {}.", new Object[]{instanceId, currentStepId, fromJobDefinitionAndRequestedStepId.nextStep.getStepId()});
                return;
            }
            String stepId = fromJobDefinitionAndRequestedStepId.nextStep.getStepId();
            ourLog.info("All processing is complete for gated execution of instance {} step {}. Proceeding to step {}", new Object[]{instanceId, currentStepId, stepId});
            if (fromJobDefinitionAndRequestedStepId.nextStep.isReductionStep()) {
                this.myReductionStepExecutorService.triggerReductionStep(instanceId, JobWorkCursor.fromJobDefinitionAndRequestedStepId(jobDefinitionOrThrowException, fromJobDefinitionAndRequestedStepId.nextStep.getStepId()));
            } else {
                processChunksForNextSteps(jobInstance, stepId);
            }
        }
    }

    private void processChunksForNextSteps(JobInstance jobInstance, String str) {
        String instanceId = jobInstance.getInstanceId();
        List<String> chunkIdsWithStatus = this.myProgressAccumulator.getChunkIdsWithStatus(instanceId, str, WorkChunkStatusEnum.QUEUED);
        int totalChunkCountForInstanceAndStep = this.myProgressAccumulator.getTotalChunkCountForInstanceAndStep(instanceId, str);
        if (totalChunkCountForInstanceAndStep != chunkIdsWithStatus.size()) {
            ourLog.debug("Total ProgressAccumulator QUEUED chunk count does not match QUEUED chunk size! [instanceId={}, stepId={}, totalChunks={}, queuedChunks={}]", new Object[]{instanceId, str, Integer.valueOf(totalChunkCountForInstanceAndStep), Integer.valueOf(chunkIdsWithStatus.size())});
        }
        List<String> fetchAllChunkIdsForStepWithStatus = this.myJobPersistence.fetchAllChunkIdsForStepWithStatus(instanceId, str, WorkChunkStatusEnum.QUEUED);
        if (!this.myJobPersistence.updateInstance(instanceId, jobInstance2 -> {
            if (jobInstance2.getCurrentGatedStepId().equals(str)) {
                return false;
            }
            jobInstance2.setCurrentGatedStepId(str);
            return true;
        })) {
            ourLog.warn("Skipping gate advance to {} for instance {} - already advanced.", str, instanceId);
            return;
        }
        Iterator<String> it = fetchAllChunkIdsForStepWithStatus.iterator();
        while (it.hasNext()) {
            this.myBatchJobSender.sendWorkChannelMessage(new JobWorkNotification(jobInstance, str, it.next()));
        }
        ourLog.debug("Submitted a batch of chunks for processing. [chunkCount={}, instanceId={}, stepId={}]", new Object[]{Integer.valueOf(fetchAllChunkIdsForStepWithStatus.size()), instanceId, str});
    }
}
