package ca.uhn.fhir.jpa.subscription.triggering;

import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.api.svc.ISearchSvc;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum;
import ca.uhn.fhir.rest.annotation.IdParam;
import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage;
import ca.uhn.fhir.util.ParametersUtil;
import ca.uhn.fhir.util.StopWatch;
import ca.uhn.fhir.util.UrlUtil;
import ca.uhn.fhir.util.ValidateUtil;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.hl7.fhir.dstu2.model.IdType;
import org.hl7.fhir.instance.model.api.IBaseParameters;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:ca/uhn/fhir/jpa/subscription/triggering/SubscriptionTriggeringSvcImpl.class */
public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc {
    private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggeringSvcImpl.class);
    private static final int DEFAULT_MAX_SUBMIT = 10000;

    @Autowired
    private FhirContext myFhirContext;

    @Autowired
    private DaoRegistry myDaoRegistry;

    @Autowired
    private DaoConfig myDaoConfig;

    @Autowired
    private ISearchCoordinatorSvc mySearchCoordinatorSvc;

    @Autowired
    private MatchUrlService myMatchUrlService;

    @Autowired
    private IResourceModifiedConsumer myResourceModifiedConsumer;
    private ExecutorService myExecutorService;

    @Autowired
    private ISchedulerService mySchedulerService;

    @Autowired
    private ISearchSvc mySearchService;
    private final List<SubscriptionTriggeringJobDetails> myActiveJobs = new ArrayList();
    private int myMaxSubmitPerPass = DEFAULT_MAX_SUBMIT;

    /* loaded from: input_file:ca/uhn/fhir/jpa/subscription/triggering/SubscriptionTriggeringSvcImpl$Job.class */
    public static class Job implements HapiJob {

        @Autowired
        private ISubscriptionTriggeringSvc myTarget;

        public void execute(JobExecutionContext jobExecutionContext) {
            this.myTarget.runDeliveryPass();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ca/uhn/fhir/jpa/subscription/triggering/SubscriptionTriggeringSvcImpl$SubscriptionTriggeringJobDetails.class */
    public static class SubscriptionTriggeringJobDetails {
        private String myJobId;
        private String mySubscriptionId;
        private List<String> myRemainingResourceIds;
        private List<String> myRemainingSearchUrls;
        private String myCurrentSearchUuid;
        private String myCurrentSearchUrl;
        private Integer myCurrentSearchCount;
        private String myCurrentSearchResourceType;
        private int myCurrentSearchLastUploadedIndex;
        private int myCurrentOffset;

        private SubscriptionTriggeringJobDetails() {
        }

        Integer getCurrentSearchCount() {
            return this.myCurrentSearchCount;
        }

        void setCurrentSearchCount(Integer num) {
            this.myCurrentSearchCount = num;
        }

        String getCurrentSearchResourceType() {
            return this.myCurrentSearchResourceType;
        }

        void setCurrentSearchResourceType(String str) {
            this.myCurrentSearchResourceType = str;
        }

        String getJobId() {
            return this.myJobId;
        }

        void setJobId(String str) {
            this.myJobId = str;
        }

        String getSubscriptionId() {
            return this.mySubscriptionId;
        }

        void setSubscriptionId(String str) {
            this.mySubscriptionId = str;
        }

        List<String> getRemainingResourceIds() {
            return this.myRemainingResourceIds;
        }

        void setRemainingResourceIds(List<String> list) {
            this.myRemainingResourceIds = list;
        }

        List<String> getRemainingSearchUrls() {
            return this.myRemainingSearchUrls;
        }

        void setRemainingSearchUrls(List<String> list) {
            this.myRemainingSearchUrls = list;
        }

        String getCurrentSearchUuid() {
            return this.myCurrentSearchUuid;
        }

        void setCurrentSearchUuid(String str) {
            this.myCurrentSearchUuid = str;
        }

        public String getCurrentSearchUrl() {
            return this.myCurrentSearchUrl;
        }

        public void setCurrentSearchUrl(String str) {
            this.myCurrentSearchUrl = str;
        }

        int getCurrentSearchLastUploadedIndex() {
            return this.myCurrentSearchLastUploadedIndex;
        }

        void setCurrentSearchLastUploadedIndex(int i) {
            this.myCurrentSearchLastUploadedIndex = i;
        }

        public void clearCurrentSearchUrl() {
            this.myCurrentSearchUrl = null;
        }

        public int getCurrentOffset() {
            return this.myCurrentOffset;
        }

        public void setCurrentOffset(Integer num) {
            this.myCurrentOffset = ((Integer) ObjectUtils.defaultIfNull(num, 0)).intValue();
        }
    }

    public IBaseParameters triggerSubscription(List<IPrimitiveType<String>> list, List<IPrimitiveType<String>> list2, @IdParam IIdType iIdType) {
        if (this.myDaoConfig.getSupportedSubscriptionTypes().isEmpty()) {
            throw new PreconditionFailedException(Msg.code(22) + "Subscription processing not active on this server");
        }
        if (iIdType != null) {
            IFhirResourceDao subscriptionDao = this.myDaoRegistry.getSubscriptionDao();
            IIdType iIdType2 = iIdType;
            if (!iIdType2.hasResourceType()) {
                iIdType2 = iIdType2.withResourceType(ResourceTypeEnum.SUBSCRIPTION.getCode());
            }
            subscriptionDao.read(iIdType2, SystemRequestDetails.forAllPartitions());
        }
        List list3 = (List) ObjectUtils.defaultIfNull(list, Collections.emptyList());
        List list4 = (List) ObjectUtils.defaultIfNull(list2, Collections.emptyList());
        if (list3.size() == 0 && list4.size() == 0) {
            throw new InvalidRequestException(Msg.code(23) + "No resource IDs or search URLs specified for triggering");
        }
        Iterator it = list3.iterator();
        while (it.hasNext()) {
            IdType idType = new IdType((String) ((IPrimitiveType) it.next()).getValue());
            ValidateUtil.isTrueOrThrowInvalidRequest(idType.hasResourceType(), "resourceId parameter must have resource type", new Object[0]);
            ValidateUtil.isTrueOrThrowInvalidRequest(idType.hasIdPart(), "resourceId parameter must have resource ID part", new Object[0]);
        }
        Iterator it2 = list4.iterator();
        while (it2.hasNext()) {
            if (!((String) ((IPrimitiveType) it2.next()).getValue()).contains("?")) {
                throw new InvalidRequestException(Msg.code(24) + "Search URL is not valid (must be in the form \"[resource type]?[optional params]\")");
            }
        }
        SubscriptionTriggeringJobDetails subscriptionTriggeringJobDetails = new SubscriptionTriggeringJobDetails();
        subscriptionTriggeringJobDetails.setJobId(UUID.randomUUID().toString());
        subscriptionTriggeringJobDetails.setRemainingResourceIds((List) list3.stream().map(iPrimitiveType -> {
            return (String) iPrimitiveType.getValue();
        }).collect(Collectors.toList()));
        subscriptionTriggeringJobDetails.setRemainingSearchUrls((List) list4.stream().map(iPrimitiveType2 -> {
            return (String) iPrimitiveType2.getValue();
        }).collect(Collectors.toList()));
        if (iIdType != null) {
            subscriptionTriggeringJobDetails.setSubscriptionId(iIdType.getIdPart());
        }
        synchronized (this.myActiveJobs) {
            this.myActiveJobs.add(subscriptionTriggeringJobDetails);
            ourLog.info("Subscription triggering requested for {} resource and {} search - Gave job ID: {} and have {} jobs", new Object[]{Integer.valueOf(list3.size()), Integer.valueOf(list4.size()), subscriptionTriggeringJobDetails.getJobId(), Integer.valueOf(this.myActiveJobs.size())});
        }
        IBaseParameters newInstance = ParametersUtil.newInstance(this.myFhirContext);
        IPrimitiveType newInstance2 = this.myFhirContext.getElementDefinition("string").newInstance();
        newInstance2.setValueAsString("Subscription triggering job submitted as JOB ID: " + subscriptionTriggeringJobDetails.myJobId);
        ParametersUtil.addParameterToParameters(this.myFhirContext, newInstance, "information", newInstance2);
        return newInstance;
    }

    public void runDeliveryPass() {
        synchronized (this.myActiveJobs) {
            if (this.myActiveJobs.isEmpty()) {
                return;
            }
            ourLog.info("Starting pass: currently have {} active job IDs: {}", Integer.valueOf(this.myActiveJobs.size()), (String) this.myActiveJobs.stream().map((v0) -> {
                return v0.getJobId();
            }).collect(Collectors.joining(", ")));
            SubscriptionTriggeringJobDetails subscriptionTriggeringJobDetails = this.myActiveJobs.get(0);
            runJob(subscriptionTriggeringJobDetails);
            if (subscriptionTriggeringJobDetails.getRemainingResourceIds().isEmpty() && subscriptionTriggeringJobDetails.getRemainingSearchUrls().isEmpty() && jobHasCompleted(subscriptionTriggeringJobDetails)) {
                this.myActiveJobs.remove(0);
                ourLog.info("Subscription triggering job {} is complete{}", subscriptionTriggeringJobDetails.getJobId(), this.myActiveJobs.size() > 0 ? "(" + this.myActiveJobs.size() + " jobs remaining)" : "");
            }
        }
    }

    private void runJob(SubscriptionTriggeringJobDetails subscriptionTriggeringJobDetails) {
        List<IBaseResource> allResources;
        StopWatch stopWatch = new StopWatch();
        ourLog.info("Starting pass of subscription triggering job {}", subscriptionTriggeringJobDetails.getJobId());
        int i = 0;
        ArrayList arrayList = new ArrayList();
        while (subscriptionTriggeringJobDetails.getRemainingResourceIds().size() > 0 && i < this.myMaxSubmitPerPass) {
            i++;
            String remove = subscriptionTriggeringJobDetails.getRemainingResourceIds().remove(0);
            arrayList.add(Pair.of(remove, submitResource(subscriptionTriggeringJobDetails.getSubscriptionId(), remove)));
        }
        if (validateFuturesAndReturnTrueIfWeShouldAbort(arrayList)) {
            return;
        }
        IBundleProvider iBundleProvider = null;
        if (isInitialStep(subscriptionTriggeringJobDetails) && CollectionUtils.isNotEmpty(subscriptionTriggeringJobDetails.getRemainingSearchUrls()) && i < this.myMaxSubmitPerPass) {
            String remove2 = subscriptionTriggeringJobDetails.getRemainingSearchUrls().remove(0);
            RuntimeResourceDefinition parseUrlResourceType = UrlUtil.parseUrlResourceType(this.myFhirContext, remove2);
            SearchParameterMap translateMatchUrl = this.myMatchUrlService.translateMatchUrl(remove2.substring(remove2.indexOf(63)), parseUrlResourceType, new MatchUrlService.Flag[0]);
            String name = parseUrlResourceType.getName();
            IFhirResourceDao resourceDao = this.myDaoRegistry.getResourceDao(name);
            ourLog.info("Triggering job[{}] is starting a search for {}", subscriptionTriggeringJobDetails.getJobId(), remove2);
            iBundleProvider = this.mySearchCoordinatorSvc.registerSearch(resourceDao, translateMatchUrl, name, new CacheControlDirective(), (RequestDetails) null, RequestPartitionId.allPartitions());
            if (Objects.isNull(iBundleProvider.getUuid())) {
                subscriptionTriggeringJobDetails.setCurrentSearchUrl(remove2);
                subscriptionTriggeringJobDetails.setCurrentOffset(translateMatchUrl.getOffset());
            } else {
                subscriptionTriggeringJobDetails.setCurrentSearchUuid(iBundleProvider.getUuid());
            }
            subscriptionTriggeringJobDetails.setCurrentSearchResourceType(name);
            subscriptionTriggeringJobDetails.setCurrentSearchCount(translateMatchUrl.getCount());
            subscriptionTriggeringJobDetails.setCurrentSearchLastUploadedIndex(-1);
        }
        if (StringUtils.isNotBlank(subscriptionTriggeringJobDetails.getCurrentSearchUrl()) && i < this.myMaxSubmitPerPass) {
            int currentSearchLastUploadedIndex = subscriptionTriggeringJobDetails.getCurrentSearchLastUploadedIndex() + 1;
            String currentSearchUrl = subscriptionTriggeringJobDetails.getCurrentSearchUrl();
            ourLog.info("Triggered job [{}] - Starting synchronous processing at offset {} and index {}", new Object[]{subscriptionTriggeringJobDetails.getJobId(), Integer.valueOf(subscriptionTriggeringJobDetails.getCurrentOffset()), Integer.valueOf(currentSearchLastUploadedIndex)});
            int i2 = currentSearchLastUploadedIndex + (this.myMaxSubmitPerPass - i);
            if (!Objects.nonNull(iBundleProvider) || iBundleProvider.isEmpty()) {
                if (subscriptionTriggeringJobDetails.getCurrentSearchCount() != null) {
                    i2 = Math.min(i2, subscriptionTriggeringJobDetails.getCurrentSearchCount().intValue());
                }
                RuntimeResourceDefinition parseUrlResourceType2 = UrlUtil.parseUrlResourceType(this.myFhirContext, currentSearchUrl);
                SearchParameterMap translateMatchUrl2 = this.myMatchUrlService.translateMatchUrl(currentSearchUrl.substring(currentSearchUrl.indexOf(63)), parseUrlResourceType2, new MatchUrlService.Flag[0]);
                int currentOffset = subscriptionTriggeringJobDetails.getCurrentOffset() + currentSearchLastUploadedIndex;
                translateMatchUrl2.setOffset(Integer.valueOf(currentOffset));
                translateMatchUrl2.setCount(Integer.valueOf(i2));
                ourLog.info("Triggered job[{}] requesting {} resources from offset {}", new Object[]{subscriptionTriggeringJobDetails.getJobId(), Integer.valueOf(i2), Integer.valueOf(currentOffset)});
                allResources = this.mySearchService.executeQuery(parseUrlResourceType2.getName(), translateMatchUrl2, RequestPartitionId.allPartitions()).getAllResources();
            } else {
                ourLog.info("Triggered job[{}] will process up to {} resources", subscriptionTriggeringJobDetails.getJobId(), Integer.valueOf(i2));
                allResources = iBundleProvider.getResources(0, i2);
            }
            ourLog.info("Triggered job[{}] delivering {} resources", subscriptionTriggeringJobDetails.getJobId(), Integer.valueOf(allResources.size()));
            int currentSearchLastUploadedIndex2 = subscriptionTriggeringJobDetails.getCurrentSearchLastUploadedIndex();
            for (IBaseResource iBaseResource : allResources) {
                arrayList.add(Pair.of(iBaseResource.getIdElement().getIdPart(), submitResource(subscriptionTriggeringJobDetails.getSubscriptionId(), iBaseResource)));
                i++;
                currentSearchLastUploadedIndex2++;
            }
            if (validateFuturesAndReturnTrueIfWeShouldAbort(arrayList)) {
                return;
            }
            subscriptionTriggeringJobDetails.setCurrentSearchLastUploadedIndex(currentSearchLastUploadedIndex2);
            ourLog.info("Triggered job[{}] lastUploadedIndex is {}", subscriptionTriggeringJobDetails.getJobId(), Integer.valueOf(subscriptionTriggeringJobDetails.getCurrentSearchLastUploadedIndex()));
            if (allResources.isEmpty() || (Objects.nonNull(subscriptionTriggeringJobDetails.getCurrentSearchCount()) && i2 >= subscriptionTriggeringJobDetails.getCurrentSearchCount().intValue())) {
                ourLog.info("Triggered job[{}] for search URL {} has completed ", subscriptionTriggeringJobDetails.getJobId(), subscriptionTriggeringJobDetails.getCurrentSearchUrl());
                subscriptionTriggeringJobDetails.setCurrentSearchResourceType(null);
                subscriptionTriggeringJobDetails.clearCurrentSearchUrl();
                subscriptionTriggeringJobDetails.setCurrentSearchLastUploadedIndex(-1);
                subscriptionTriggeringJobDetails.setCurrentSearchCount(null);
            }
        }
        if (StringUtils.isNotBlank(subscriptionTriggeringJobDetails.getCurrentSearchUuid()) && i < this.myMaxSubmitPerPass) {
            int currentSearchLastUploadedIndex3 = subscriptionTriggeringJobDetails.getCurrentSearchLastUploadedIndex() + 1;
            IFhirResourceDao resourceDao2 = this.myDaoRegistry.getResourceDao(subscriptionTriggeringJobDetails.getCurrentSearchResourceType());
            int i3 = currentSearchLastUploadedIndex3 + (this.myMaxSubmitPerPass - i);
            if (subscriptionTriggeringJobDetails.getCurrentSearchCount() != null) {
                i3 = Math.min(i3, subscriptionTriggeringJobDetails.getCurrentSearchCount().intValue());
            }
            ourLog.info("Triggering job[{}] search {} requesting resources {} - {}", new Object[]{subscriptionTriggeringJobDetails.getJobId(), subscriptionTriggeringJobDetails.getCurrentSearchUuid(), Integer.valueOf(currentSearchLastUploadedIndex3), Integer.valueOf(i3)});
            List resources = this.mySearchCoordinatorSvc.getResources(subscriptionTriggeringJobDetails.getCurrentSearchUuid(), currentSearchLastUploadedIndex3, i3, (RequestDetails) null);
            ourLog.info("Triggering job[{}] delivering {} resources", subscriptionTriggeringJobDetails.getJobId(), Integer.valueOf(resources.size()));
            int currentSearchLastUploadedIndex4 = subscriptionTriggeringJobDetails.getCurrentSearchLastUploadedIndex();
            Iterator it = resources.iterator();
            while (it.hasNext()) {
                IBaseResource readByPid = resourceDao2.readByPid((ResourcePersistentId) it.next());
                arrayList.add(Pair.of(readByPid.getIdElement().getIdPart(), submitResource(subscriptionTriggeringJobDetails.getSubscriptionId(), readByPid)));
                i++;
                currentSearchLastUploadedIndex4++;
            }
            if (validateFuturesAndReturnTrueIfWeShouldAbort(arrayList)) {
                return;
            }
            subscriptionTriggeringJobDetails.setCurrentSearchLastUploadedIndex(currentSearchLastUploadedIndex4);
            if (resources.size() == 0 || (subscriptionTriggeringJobDetails.getCurrentSearchCount() != null && i3 >= subscriptionTriggeringJobDetails.getCurrentSearchCount().intValue())) {
                ourLog.info("Triggering job[{}] search {} has completed ", subscriptionTriggeringJobDetails.getJobId(), subscriptionTriggeringJobDetails.getCurrentSearchUuid());
                subscriptionTriggeringJobDetails.setCurrentSearchResourceType(null);
                subscriptionTriggeringJobDetails.setCurrentSearchUuid(null);
                subscriptionTriggeringJobDetails.setCurrentSearchLastUploadedIndex(-1);
                subscriptionTriggeringJobDetails.setCurrentSearchCount(null);
            }
        }
        ourLog.info("Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", new Object[]{subscriptionTriggeringJobDetails.getJobId(), Integer.valueOf(i), Long.valueOf(stopWatch.getMillis()), Double.valueOf(stopWatch.getThroughput(i, TimeUnit.SECONDS))});
    }

    private boolean isInitialStep(SubscriptionTriggeringJobDetails subscriptionTriggeringJobDetails) {
        return StringUtils.isBlank(subscriptionTriggeringJobDetails.myCurrentSearchUuid) && StringUtils.isBlank(subscriptionTriggeringJobDetails.myCurrentSearchUrl);
    }

    private boolean jobHasCompleted(SubscriptionTriggeringJobDetails subscriptionTriggeringJobDetails) {
        return isInitialStep(subscriptionTriggeringJobDetails);
    }

    private boolean validateFuturesAndReturnTrueIfWeShouldAbort(List<Pair<String, Future<Void>>> list) {
        for (Pair<String, Future<Void>> pair : list) {
            String str = (String) pair.getKey();
            try {
                ((Future) pair.getValue()).get();
                ourLog.info("Finished redelivering {}", str);
            } catch (Exception e) {
                ourLog.error("Failure triggering resource " + str, e);
                return true;
            }
        }
        list.clear();
        return false;
    }

    private Future<Void> submitResource(String str, String str2) {
        org.hl7.fhir.r4.model.IdType idType = new org.hl7.fhir.r4.model.IdType(str2);
        return submitResource(str, this.myDaoRegistry.getResourceDao(idType.getResourceType()).read(idType, SystemRequestDetails.forAllPartitions()));
    }

    private Future<Void> submitResource(String str, IBaseResource iBaseResource) {
        ourLog.info("Submitting resource {} to subscription {}", iBaseResource.getIdElement().toUnqualifiedVersionless().getValue(), str);
        ResourceModifiedMessage resourceModifiedMessage = new ResourceModifiedMessage(this.myFhirContext, iBaseResource, BaseResourceMessage.OperationTypeEnum.UPDATE);
        resourceModifiedMessage.setSubscriptionId(str);
        return this.myExecutorService.submit(() -> {
            int i = 0;
            while (true) {
                try {
                    this.myResourceModifiedConsumer.submitResourceModified(resourceModifiedMessage);
                    return null;
                } catch (Exception e) {
                    if (i >= 3) {
                        throw new InternalErrorException(Msg.code(25) + e);
                    }
                    ourLog.warn("Exception while retriggering subscriptions (going to sleep and retry): {}", e.toString());
                    Thread.sleep(1000L);
                    i++;
                }
            }
        });
    }

    public void cancelAll() {
        synchronized (this.myActiveJobs) {
            this.myActiveJobs.clear();
        }
    }

    public void setMaxSubmitPerPass(Integer num) {
        Integer num2 = num;
        if (num2 == null) {
            num2 = Integer.valueOf(DEFAULT_MAX_SUBMIT);
        }
        Validate.isTrue(num2.intValue() > 0, "theMaxSubmitPerPass must be > 0", new Object[0]);
        this.myMaxSubmitPerPass = num2.intValue();
    }

    @PostConstruct
    public void start() {
        createExecutorService();
        scheduleJob();
    }

    private void createExecutorService() {
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(1000);
        this.myExecutorService = new ThreadPoolExecutor(0, 10, 0L, TimeUnit.MILLISECONDS, linkedBlockingQueue, new BasicThreadFactory.Builder().namingPattern("SubscriptionTriggering-%d").daemon(false).priority(5).build(), new RejectedExecutionHandler() { // from class: ca.uhn.fhir.jpa.subscription.triggering.SubscriptionTriggeringSvcImpl.1
            @Override // java.util.concurrent.RejectedExecutionHandler
            public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                SubscriptionTriggeringSvcImpl.ourLog.info("Note: Subscription triggering queue is full ({} elements), waiting for a slot to become available!", Integer.valueOf(linkedBlockingQueue.size()));
                StopWatch stopWatch = new StopWatch();
                try {
                    linkedBlockingQueue.put(runnable);
                    SubscriptionTriggeringSvcImpl.ourLog.info("Slot become available after {}ms", Long.valueOf(stopWatch.getMillis()));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RejectedExecutionException(Msg.code(26) + "Task " + runnable.toString() + " rejected from " + e.toString());
                }
            }
        });
    }

    private void scheduleJob() {
        ScheduledJobDefinition scheduledJobDefinition = new ScheduledJobDefinition();
        scheduledJobDefinition.setId(getClass().getName());
        scheduledJobDefinition.setJobClass(Job.class);
        this.mySchedulerService.scheduleLocalJob(5000L, scheduledJobDefinition);
    }

    public int getActiveJobCount() {
        return this.myActiveJobs.size();
    }
}
