package org.springframework.yarn.batch.partition;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.StepExecutionSplitter;
import org.springframework.yarn.am.YarnAppmaster;
import org.springframework.yarn.am.container.ContainerRequestHint;
import org.springframework.yarn.batch.BatchSystemConstants;
import org.springframework.yarn.batch.am.BatchYarnAppmaster;
import org.springframework.yarn.batch.listener.PartitionedStepExecutionStateListener;
import org.springframework.yarn.listener.AppmasterStateListener;

/* loaded from: input_file:org/springframework/yarn/batch/partition/AbstractPartitionHandler.class */
public abstract class AbstractPartitionHandler implements PartitionHandler {
    private static final Log log = LogFactory.getLog(AbstractPartitionHandler.class);
    private BatchYarnAppmaster batchAppmaster;
    private String stepName = "remoteStep";
    private String keySplitLocations = BatchSystemConstants.KEY_SPLITLOCATIONS;

    public AbstractPartitionHandler() {
    }

    public AbstractPartitionHandler(BatchYarnAppmaster batchYarnAppmaster) {
        this.batchAppmaster = batchYarnAppmaster;
    }

    public final Collection<StepExecution> handle(StepExecutionSplitter stepExecutionSplitter, StepExecution stepExecution) throws Exception {
        log.info("Handling stepExecution=[" + stepExecution + "] with jobParameters=[" + stepExecution.getJobParameters() + "]");
        Set<StepExecution> createSplits = createSplits(stepExecutionSplitter, stepExecution);
        log.info("Created " + createSplits.size() + " splits for stepName=" + this.stepName);
        Map<StepExecution, ContainerRequestHint> createRequestData = createRequestData(createSplits);
        log.info("Resource request map size is " + createRequestData.size());
        if (log.isDebugEnabled()) {
            for (Map.Entry<StepExecution, ContainerRequestHint> entry : createRequestData.entrySet()) {
                log.debug("Entry stepExecution=[" + entry.getKey() + "] requestData=[" + entry.getValue() + "]");
            }
        }
        this.batchAppmaster.addStepSplits(stepExecution, this.stepName, createSplits, createRequestData);
        waitCompleteState(stepExecution);
        ArrayList arrayList = new ArrayList(this.batchAppmaster.getStepExecutions());
        log.info("Listing statuses of remote executions");
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            log.info("Remote stepExecution=[" + ((StepExecution) it.next()) + "]");
        }
        return arrayList;
    }

    public void setBatchAppmaster(BatchYarnAppmaster batchYarnAppmaster) {
        this.batchAppmaster = batchYarnAppmaster;
    }

    public void setYarnAppmaster(YarnAppmaster yarnAppmaster) {
        if (yarnAppmaster instanceof BatchYarnAppmaster) {
            setBatchAppmaster((BatchYarnAppmaster) yarnAppmaster);
        }
    }

    public void setKeySplitLocations(String str) {
        this.keySplitLocations = str;
    }

    public String getKeySplitLocations() {
        return this.keySplitLocations;
    }

    public String getStepName() {
        return this.stepName;
    }

    public void setStepName(String str) {
        this.stepName = str;
    }

    protected abstract Set<StepExecution> createSplits(StepExecutionSplitter stepExecutionSplitter, StepExecution stepExecution) throws Exception;

    protected Map<StepExecution, ContainerRequestHint> createRequestData(Set<StepExecution> set) throws Exception {
        return new HashMap();
    }

    protected void waitCompleteState(final StepExecution stepExecution) {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.batchAppmaster.addAppmasterStateListener(new AppmasterStateListener() { // from class: org.springframework.yarn.batch.partition.AbstractPartitionHandler.1
            public void state(AppmasterStateListener.AppmasterState appmasterState) {
                AbstractPartitionHandler.log.info("AppmasterStateListener state=[" + appmasterState + "]");
                if (appmasterState == AppmasterStateListener.AppmasterState.COMPLETED || appmasterState == AppmasterStateListener.AppmasterState.FAILED) {
                    countDownLatch.countDown();
                }
            }
        });
        this.batchAppmaster.addPartitionedStepExecutionStateListener(new PartitionedStepExecutionStateListener() { // from class: org.springframework.yarn.batch.partition.AbstractPartitionHandler.2
            @Override // org.springframework.yarn.batch.listener.PartitionedStepExecutionStateListener
            public void state(PartitionedStepExecutionStateListener.PartitionedStepExecutionState partitionedStepExecutionState, StepExecution stepExecution2) {
                AbstractPartitionHandler.log.info("PartitionedStepExecutionStateListener state=[" + partitionedStepExecutionState + "] stepExecution=[" + stepExecution2 + "] masterStepExecution=[" + stepExecution + "]");
                if (partitionedStepExecutionState == PartitionedStepExecutionStateListener.PartitionedStepExecutionState.COMPLETED && stepExecution.equals(stepExecution2)) {
                    AbstractPartitionHandler.log.info("Got complete state for stepExecution=[" + stepExecution2 + "]");
                    countDownLatch.countDown();
                }
            }
        });
        try {
            countDownLatch.await();
        } catch (Exception e) {
            log.warn("Latch wait interrupted, we may not be finished!");
        }
        log.info("Waiting latch complete");
    }
}
