package org.springframework.yarn.batch.partition;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.StepExecutionSplitter;
import org.springframework.yarn.am.container.ContainerRequestHint;
import org.springframework.yarn.am.container.ContainerResolver;
import org.springframework.yarn.batch.am.AbstractBatchAppmaster;
import org.springframework.yarn.batch.listener.PartitionedStepExecutionStateListener;
import org.springframework.yarn.listener.AppmasterStateListener;

/* loaded from: input_file:org/springframework/yarn/batch/partition/AbstractBatchPartitionHandler.class */
public abstract class AbstractBatchPartitionHandler implements PartitionHandler {
    private static final Log log = LogFactory.getLog(AbstractBatchPartitionHandler.class);
    private AbstractBatchAppmaster batchAppmaster;
    private String stepName = "remoteStep";
    private ContainerResolver containerResolver;

    public AbstractBatchPartitionHandler() {
    }

    public AbstractBatchPartitionHandler(AbstractBatchAppmaster abstractBatchAppmaster) {
        this.batchAppmaster = abstractBatchAppmaster;
    }

    public void setBatchAppmaster(AbstractBatchAppmaster abstractBatchAppmaster) {
        this.batchAppmaster = abstractBatchAppmaster;
    }

    protected abstract Set<StepExecution> createStepExecutionSplits(StepExecutionSplitter stepExecutionSplitter, StepExecution stepExecution) throws Exception;

    protected Map<StepExecution, ContainerRequestHint> createResourceRequestData(Set<StepExecution> set) throws Exception {
        return new HashMap();
    }

    public final Collection<StepExecution> handle(StepExecutionSplitter stepExecutionSplitter, StepExecution stepExecution) throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("partition job parameters:");
            for (Map.Entry entry : stepExecution.getJobParameters().getParameters().entrySet()) {
                log.debug("entry: " + ((String) entry.getKey()) + " / " + entry.getValue());
            }
        }
        ArrayList arrayList = new ArrayList();
        Set<StepExecution> createStepExecutionSplits = createStepExecutionSplits(stepExecutionSplitter, stepExecution);
        if (log.isDebugEnabled()) {
            log.debug("Created " + createStepExecutionSplits.size() + " splits for stepName=" + this.stepName + " with parent stepExecution=" + stepExecution);
            for (StepExecution stepExecution2 : createStepExecutionSplits) {
                log.debug("Splitted step execution: " + stepExecution2 + " with executionContext=" + stepExecution2.getExecutionContext());
            }
        }
        Map<StepExecution, ContainerRequestHint> createResourceRequestData = createResourceRequestData(createStepExecutionSplits);
        if (log.isDebugEnabled()) {
            log.debug("Resource request map size is " + createResourceRequestData.size());
            for (Map.Entry<StepExecution, ContainerRequestHint> entry2 : createResourceRequestData.entrySet()) {
                log.debug("Entry stepExecution=" + entry2.getKey() + " requestData=" + entry2.getValue());
            }
        }
        this.batchAppmaster.addStepSplits(stepExecution, this.stepName, createStepExecutionSplits, createResourceRequestData);
        waitCompleteState(stepExecution);
        arrayList.addAll(this.batchAppmaster.getStepExecutions());
        if (log.isDebugEnabled()) {
            log.debug("Statuses of remote execution ");
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                log.debug("Remote step execution: " + ((StepExecution) it.next()));
            }
        }
        return arrayList;
    }

    public String getStepName() {
        return this.stepName;
    }

    public void setStepName(String str) {
        this.stepName = str;
    }

    public ContainerResolver getContainerResolver() {
        return this.containerResolver;
    }

    public void setContainerResolver(ContainerResolver containerResolver) {
        this.containerResolver = containerResolver;
    }

    protected void waitCompleteState(final StepExecution stepExecution) {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.batchAppmaster.addAppmasterStateListener(new AppmasterStateListener() { // from class: org.springframework.yarn.batch.partition.AbstractBatchPartitionHandler.1
            public void state(AppmasterStateListener.AppmasterState appmasterState) {
                if (AbstractBatchPartitionHandler.log.isDebugEnabled()) {
                    AbstractBatchPartitionHandler.log.debug("AppmasterStateListener state: state=" + appmasterState);
                }
                if (appmasterState == AppmasterStateListener.AppmasterState.COMPLETED) {
                    countDownLatch.countDown();
                }
            }
        });
        this.batchAppmaster.addPartitionedStepExecutionStateListener(new PartitionedStepExecutionStateListener() { // from class: org.springframework.yarn.batch.partition.AbstractBatchPartitionHandler.2
            @Override // org.springframework.yarn.batch.listener.PartitionedStepExecutionStateListener
            public void state(PartitionedStepExecutionStateListener.PartitionedStepExecutionState partitionedStepExecutionState, StepExecution stepExecution2) {
                if (AbstractBatchPartitionHandler.log.isDebugEnabled()) {
                    AbstractBatchPartitionHandler.log.debug("PartitionedStepExecutionStateListener state: state=" + partitionedStepExecutionState + " stepExecution=" + stepExecution2 + " masterStepExecution=" + stepExecution);
                }
                if (partitionedStepExecutionState == PartitionedStepExecutionStateListener.PartitionedStepExecutionState.COMPLETED && stepExecution.equals(stepExecution2)) {
                    if (AbstractBatchPartitionHandler.log.isDebugEnabled()) {
                        AbstractBatchPartitionHandler.log.debug("Got complete state for stepExecution=" + stepExecution2);
                    }
                    countDownLatch.countDown();
                }
            }
        });
        try {
            countDownLatch.await();
        } catch (Exception e) {
            log.warn("Latch wait interrupted, we may not be finished!");
        }
    }
}
