package ca.uhn.fhir.batch2.coordinator;

import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import java.util.Optional;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

/* loaded from: input_file:ca/uhn/fhir/batch2/coordinator/WorkChannelMessageHandler.class */
class WorkChannelMessageHandler implements MessageHandler {
    private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
    private final IJobPersistence myJobPersistence;
    private final JobDefinitionRegistry myJobDefinitionRegistry;
    private final JobStepExecutorFactory myJobStepExecutorFactory;
    private final JobInstanceStatusUpdater myJobInstanceStatusUpdater;

    /* JADX INFO: Access modifiers changed from: package-private */
    public WorkChannelMessageHandler(@Nonnull IJobPersistence iJobPersistence, @Nonnull JobDefinitionRegistry jobDefinitionRegistry, @Nonnull BatchJobSender batchJobSender, @Nonnull WorkChunkProcessor workChunkProcessor, @Nonnull IJobMaintenanceService iJobMaintenanceService) {
        this.myJobPersistence = iJobPersistence;
        this.myJobDefinitionRegistry = jobDefinitionRegistry;
        this.myJobStepExecutorFactory = new JobStepExecutorFactory(iJobPersistence, batchJobSender, workChunkProcessor, iJobMaintenanceService);
        this.myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(iJobPersistence);
    }

    public void handleMessage(@Nonnull Message<?> message) throws MessagingException {
        handleWorkChannelMessage((JobWorkNotificationJsonMessage) message);
    }

    private void handleWorkChannelMessage(JobWorkNotificationJsonMessage jobWorkNotificationJsonMessage) {
        JobWorkNotification m26getPayload = jobWorkNotificationJsonMessage.m26getPayload();
        String chunkId = m26getPayload.getChunkId();
        Validate.notNull(chunkId);
        Optional<WorkChunk> fetchWorkChunkSetStartTimeAndMarkInProgress = this.myJobPersistence.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId);
        if (fetchWorkChunkSetStartTimeAndMarkInProgress.isEmpty()) {
            ourLog.error("Unable to find chunk with ID {} - Aborting", chunkId);
            return;
        }
        WorkChunk workChunk = fetchWorkChunkSetStartTimeAndMarkInProgress.get();
        JobWorkCursor<?, ?, ?> buildCursorFromNotification = buildCursorFromNotification(m26getPayload);
        Validate.isTrue(workChunk.getTargetStepId().equals(buildCursorFromNotification.getCurrentStepId()), "Chunk %s has target step %s but expected %s", new Object[]{chunkId, workChunk.getTargetStepId(), buildCursorFromNotification.getCurrentStepId()});
        JobInstance orElseThrow = this.myJobPersistence.fetchInstance(m26getPayload.getInstanceId()).orElseThrow(() -> {
            return new InternalErrorException("Unknown instance: " + m26getPayload.getInstanceId());
        });
        markInProgressIfQueued(orElseThrow);
        this.myJobDefinitionRegistry.setJobDefinition(orElseThrow);
        String instanceId = orElseThrow.getInstanceId();
        if (!orElseThrow.isCancelled()) {
            this.myJobStepExecutorFactory.newJobStepExecutor(orElseThrow, workChunk, buildCursorFromNotification).executeStep();
        } else {
            ourLog.info("Skipping chunk {} because job instance is cancelled", chunkId);
            this.myJobPersistence.markInstanceAsCompleted(instanceId);
        }
    }

    private void markInProgressIfQueued(JobInstance jobInstance) {
        if (jobInstance.getStatus() == StatusEnum.QUEUED) {
            this.myJobInstanceStatusUpdater.updateInstanceStatus(jobInstance, StatusEnum.IN_PROGRESS);
        }
    }

    private JobWorkCursor<?, ?, ?> buildCursorFromNotification(JobWorkNotification jobWorkNotification) {
        return JobWorkCursor.fromJobDefinitionAndRequestedStepId(this.myJobDefinitionRegistry.getJobDefinitionOrThrowException(jobWorkNotification.getJobDefinitionId(), jobWorkNotification.getJobDefinitionVersion()), jobWorkNotification.getTargetStepId());
    }
}
