package ml.shifu.guagua.worker;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import ml.shifu.guagua.io.Bytable;
import ml.shifu.guagua.io.GuaguaFileSplit;
import ml.shifu.guagua.io.GuaguaRecordReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ml/shifu/guagua/worker/AbstractCombineWorkerComputable.class */
public abstract class AbstractCombineWorkerComputable<MASTER_RESULT extends Bytable, WORKER_RESULT extends Bytable, KEY extends Bytable, VALUE extends Bytable> implements WorkerComputable<MASTER_RESULT, WORKER_RESULT> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractCombineWorkerComputable.class);
    private AtomicBoolean isLoaded;
    private GuaguaRecordReader<KEY, VALUE> recordReader;
    private Map<KEY, VALUE> dataMap;

    protected AbstractCombineWorkerComputable() {
        this(false);
    }

    protected AbstractCombineWorkerComputable(boolean z) {
        this.isLoaded = new AtomicBoolean(false);
        this.dataMap = null;
        if (z) {
            this.dataMap = new TreeMap();
        } else {
            this.dataMap = new HashMap();
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // ml.shifu.guagua.worker.WorkerComputable
    public WORKER_RESULT compute(WorkerContext<MASTER_RESULT, WORKER_RESULT> workerContext) throws IOException {
        if (!workerContext.isFirstIteration()) {
            long nanoTime = System.nanoTime();
            try {
                for (Map.Entry<KEY, VALUE> entry : this.dataMap.entrySet()) {
                    doCompute(entry.getKey(), entry.getValue(), workerContext);
                }
                LOG.info("Computation time for application {} container {} iteration {}: {}ms.", new Object[]{workerContext.getAppId(), workerContext.getContainerId(), Integer.valueOf(workerContext.getCurrentIteration()), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime))});
            } catch (Throwable th) {
                LOG.info("Computation time for application {} container {} iteration {}: {}ms.", new Object[]{workerContext.getAppId(), workerContext.getContainerId(), Integer.valueOf(workerContext.getCurrentIteration()), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime))});
                throw th;
            }
        } else if (this.isLoaded.compareAndSet(false, true)) {
            init(workerContext);
            long nanoTime2 = System.nanoTime();
            preLoad(workerContext);
            long j = 0;
            for (GuaguaFileSplit guaguaFileSplit : workerContext.getFileSplits()) {
                LOG.info("Loading filesplit: {}", guaguaFileSplit);
                try {
                    initRecordReader(guaguaFileSplit);
                    while (getRecordReader().nextKeyValue()) {
                        KEY currentKey = getRecordReader().getCurrentKey();
                        VALUE currentValue = getRecordReader().getCurrentValue();
                        doCompute(currentKey, currentValue, workerContext);
                        this.dataMap.put(currentKey, currentValue);
                        j++;
                    }
                    if (getRecordReader() != null) {
                        getRecordReader().close();
                    }
                } catch (Throwable th2) {
                    if (getRecordReader() != null) {
                        getRecordReader().close();
                    }
                    throw th2;
                }
            }
            postLoad(workerContext);
            LOG.info("Load {} records.", Long.valueOf(j));
            LOG.info("Data loading time with first iteration computing:{}ms", Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime2)));
        }
        return workerContext.getWorkerResult();
    }

    protected void preLoad(WorkerContext<MASTER_RESULT, WORKER_RESULT> workerContext) {
    }

    protected void postLoad(WorkerContext<MASTER_RESULT, WORKER_RESULT> workerContext) {
    }

    public abstract void initRecordReader(GuaguaFileSplit guaguaFileSplit) throws IOException;

    public abstract void init(WorkerContext<MASTER_RESULT, WORKER_RESULT> workerContext);

    public abstract void doCompute(KEY key, VALUE value, WorkerContext<MASTER_RESULT, WORKER_RESULT> workerContext);

    public GuaguaRecordReader<KEY, VALUE> getRecordReader() {
        return this.recordReader;
    }

    public void setRecordReader(GuaguaRecordReader<KEY, VALUE> guaguaRecordReader) {
        this.recordReader = guaguaRecordReader;
    }
}
