package co.cask.cdap.api.dataset.lib.partitioned;

import co.cask.cdap.api.dataset.lib.PartitionDetail;
import co.cask.cdap.api.dataset.lib.PartitionKey;
import co.cask.cdap.api.dataset.lib.PartitionedFileSet;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:lib/cdap-api-3.4.0.jar:co/cask/cdap/api/dataset/lib/partitioned/ConcurrentPartitionConsumer.class */
public class ConcurrentPartitionConsumer extends AbstractPartitionConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(ConcurrentPartitionConsumer.class);

    public ConcurrentPartitionConsumer(PartitionedFileSet partitionedFileSet, StatePersistor statePersistor) {
        super(partitionedFileSet, statePersistor);
    }

    public ConcurrentPartitionConsumer(PartitionedFileSet partitionedFileSet, StatePersistor statePersistor, ConsumerConfiguration consumerConfiguration) {
        super(partitionedFileSet, statePersistor, consumerConfiguration);
    }

    @Override // co.cask.cdap.api.dataset.lib.partitioned.AbstractPartitionConsumer
    public PartitionConsumerResult doConsume(ConsumerWorkingSet consumerWorkingSet, PartitionAcceptor partitionAcceptor) {
        doExpiry(consumerWorkingSet);
        consumerWorkingSet.populate(getPartitionedFileSet(), getConfiguration());
        return new PartitionConsumerResult(selectPartitions(partitionAcceptor, consumerWorkingSet.getPartitions()), removeDiscardedPartitions(consumerWorkingSet));
    }

    private List<PartitionDetail> selectPartitions(PartitionAcceptor partitionAcceptor, List<? extends ConsumablePartition> list) {
        PartitionDetail partition;
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        for (ConsumablePartition consumablePartition : list) {
            if (ProcessState.AVAILABLE == consumablePartition.getProcessState() && (partition = getPartitionedFileSet().getPartition(consumablePartition.getPartitionKey())) != null) {
                switch (partitionAcceptor.accept(partition)) {
                    case ACCEPT:
                        consumablePartition.take();
                        consumablePartition.setTimestamp(currentTimeMillis);
                        arrayList.add(partition);
                        break;
                    case STOP:
                        return arrayList;
                }
            }
        }
        return arrayList;
    }

    @Override // co.cask.cdap.api.dataset.lib.partitioned.AbstractPartitionConsumer
    public void doFinish(ConsumerWorkingSet consumerWorkingSet, List<? extends PartitionKey> list, boolean z) {
        doExpiry(consumerWorkingSet);
        if (z) {
            commit(consumerWorkingSet, list);
        } else {
            abort(consumerWorkingSet, list);
        }
    }

    protected void commit(ConsumerWorkingSet consumerWorkingSet, List<? extends PartitionKey> list) {
        for (PartitionKey partitionKey : list) {
            assertInProgress(consumerWorkingSet.lookup(partitionKey));
            consumerWorkingSet.remove(partitionKey);
        }
    }

    protected void abort(ConsumerWorkingSet consumerWorkingSet, List<? extends PartitionKey> list) {
        ArrayList arrayList = new ArrayList();
        for (PartitionKey partitionKey : list) {
            ConsumablePartition lookup = consumerWorkingSet.lookup(partitionKey);
            assertInProgress(lookup);
            if (lookup.getNumFailures() < getConfiguration().getMaxRetries()) {
                lookup.retry();
            } else {
                arrayList.add(partitionKey);
                consumerWorkingSet.lookup(partitionKey).discard();
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        LOG.warn("Discarded keys due to being retried {} times: {}", Integer.valueOf(getConfiguration().getMaxRetries()), arrayList);
    }

    protected void assertInProgress(ConsumablePartition consumablePartition) {
        if (consumablePartition.getProcessState() != ProcessState.IN_PROGRESS) {
            throw new IllegalStateException(String.format("Partition not in progress: %s", consumablePartition.getPartitionKey()));
        }
    }

    protected List<PartitionDetail> removeDiscardedPartitions(ConsumerWorkingSet consumerWorkingSet) {
        ArrayList arrayList = new ArrayList();
        Iterator<ConsumablePartition> it = consumerWorkingSet.getPartitions().iterator();
        while (it.hasNext()) {
            ConsumablePartition next = it.next();
            if (next.getProcessState() == ProcessState.DISCARDED) {
                arrayList.add(getPartitionedFileSet().getPartition(next.getPartitionKey()));
                it.remove();
            }
        }
        return arrayList;
    }

    protected long getExpiryBorder() {
        return System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(getConfiguration().getTimeout());
    }

    protected void doExpiry(ConsumerWorkingSet consumerWorkingSet) {
        long expiryBorder = getExpiryBorder();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (ConsumablePartition consumablePartition : consumerWorkingSet.getPartitions()) {
            if (consumablePartition.getProcessState() == ProcessState.IN_PROGRESS && consumablePartition.getTimestamp() < expiryBorder) {
                if (consumablePartition.getNumFailures() < getConfiguration().getMaxRetries()) {
                    consumablePartition.retry();
                } else {
                    consumablePartition.discard();
                }
                arrayList.add(consumablePartition.getPartitionKey());
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        LOG.warn("Expiring in progress partitions: {}", arrayList);
        if (arrayList2.isEmpty()) {
            return;
        }
        LOG.warn("Discarded keys due to being retried {} times: {}", Integer.valueOf(getConfiguration().getMaxRetries()), arrayList2);
    }
}
