package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StandbyTask.class */
public class StandbyTask extends AbstractTask {
    private boolean updateOffsetLimits;
    private final Sensor closeTaskSensor;
    private final Map<TopicPartition, Long> offsetLimits;

    /* JADX INFO: Access modifiers changed from: package-private */
    public StandbyTask(TaskId taskId, Set<TopicPartition> set, ProcessorTopology processorTopology, Consumer<byte[], byte[]> consumer, ChangelogReader changelogReader, StreamsConfig streamsConfig, StreamsMetricsImpl streamsMetricsImpl, StateDirectory stateDirectory) {
        super(taskId, set, processorTopology, consumer, changelogReader, true, stateDirectory, streamsConfig);
        this.offsetLimits = new HashMap();
        this.closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetricsImpl);
        this.processorContext = new StandbyContextImpl(taskId, streamsConfig, this.stateMgr, streamsMetricsImpl);
        HashSet hashSet = new HashSet(processorTopology.storeToChangelogTopic().values());
        set.stream().filter(topicPartition -> {
            return hashSet.contains(topicPartition.topic());
        }).forEach(topicPartition2 -> {
            this.offsetLimits.put(topicPartition2, 0L);
        });
        this.updateOffsetLimits = true;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void initializeMetadata() {
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public boolean initializeStateStores() {
        this.log.trace("Initializing state stores");
        registerStateStores();
        this.processorContext.initialize();
        this.taskInitialized = true;
        return true;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void initializeTopology() {
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void resume() {
        this.log.debug("Resuming");
        allowUpdateOfOffsetLimit();
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void commit() {
        this.log.trace("Committing");
        flushAndCheckpointState();
        allowUpdateOfOffsetLimit();
        this.commitNeeded = false;
    }

    private void flushAndCheckpointState() {
        this.stateMgr.flush();
        this.stateMgr.checkpoint(Collections.emptyMap());
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void close(boolean z, boolean z2) {
        this.closeTaskSensor.record();
        if (this.taskInitialized) {
            this.log.debug("Closing");
            if (z) {
                try {
                    commit();
                } finally {
                    closeStateManager(true);
                }
            }
            this.taskClosed = true;
        }
    }

    public List<ConsumerRecord<byte[], byte[]>> update(TopicPartition topicPartition, List<ConsumerRecord<byte[], byte[]>> list) {
        if (list.isEmpty()) {
            return Collections.emptyList();
        }
        this.log.trace("Updating standby replicas of its state store for partition [{}]", topicPartition);
        long longValue = this.offsetLimits.getOrDefault(topicPartition, Long.MAX_VALUE).longValue();
        long j = -1;
        ArrayList arrayList = new ArrayList(list.size());
        ArrayList arrayList2 = new ArrayList();
        for (ConsumerRecord<byte[], byte[]> consumerRecord : list) {
            if (consumerRecord.offset() >= longValue && this.updateOffsetLimits) {
                longValue = updateOffsetLimits(topicPartition);
            }
            if (consumerRecord.offset() < longValue) {
                arrayList.add(consumerRecord);
                j = consumerRecord.offset();
            } else {
                arrayList2.add(consumerRecord);
            }
        }
        if (!arrayList.isEmpty()) {
            this.stateMgr.updateStandbyStates(topicPartition, arrayList, j);
            this.commitNeeded = true;
        }
        return arrayList2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<TopicPartition, Long> checkpointedOffsets() {
        return Collections.unmodifiableMap(this.stateMgr.checkpointed());
    }

    public Map<TopicPartition, Long> changelogPositions() {
        return this.stateMgr.standbyRestoredOffsets();
    }

    private long updateOffsetLimits(TopicPartition topicPartition) {
        if (!this.offsetLimits.containsKey(topicPartition)) {
            throw new IllegalArgumentException("Topic is not both a source and a changelog: " + topicPartition);
        }
        Map<TopicPartition, Long> committedOffsetForPartitions = committedOffsetForPartitions(this.offsetLimits.keySet());
        for (Map.Entry<TopicPartition, Long> entry : committedOffsetForPartitions.entrySet()) {
            Long l = this.offsetLimits.get(entry.getKey());
            if (l != null && l.longValue() > entry.getValue().longValue()) {
                throw new IllegalStateException("Offset limit should monotonically increase, but was reduced. New limit: " + entry.getValue() + ". Previous limit: " + l);
            }
        }
        this.offsetLimits.putAll(committedOffsetForPartitions);
        this.updateOffsetLimits = false;
        return this.offsetLimits.get(topicPartition).longValue();
    }

    private Map<TopicPartition, Long> committedOffsetForPartitions(Set<TopicPartition> set) {
        try {
            return (Map) this.consumer.committed(set).entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry -> {
                return Long.valueOf(entry.getValue() == null ? 0L : ((OffsetAndMetadata) entry.getValue()).offset());
            }));
        } catch (AuthorizationException e) {
            throw new ProcessorStateException(String.format("task [%s] AuthorizationException when initializing offsets for %s", this.id, set), e);
        } catch (WakeupException e2) {
            throw e2;
        } catch (KafkaException e3) {
            throw new ProcessorStateException(String.format("task [%s] Failed to initialize offsets for %s", this.id, set), e3);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void allowUpdateOfOffsetLimit() {
        this.updateOffsetLimits = true;
    }
}
