package co.cask.cdap.logging.pipeline.kafka;

import co.cask.cdap.common.NotFoundException;
import co.cask.cdap.logging.meta.Checkpoint;
import co.cask.cdap.logging.serialize.LoggingEventSerializer;
import co.cask.cdap.metrics.store.upgrade.UpgradeMetricsConstants;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Iterator;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.twill.kafka.client.BrokerInfo;
import org.apache.twill.kafka.client.BrokerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/pipeline/kafka/KafkaOffsetResolver.class */
class KafkaOffsetResolver {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetResolver.class);
    private static final int SINGLE_MESSAGE_MAX_SIZE = 51200;
    private static final int BUFFER_SIZE = 51200000;
    private static final int SO_TIMEOUT_MILLIS = 5000;
    private final BrokerService brokerService;
    private final KafkaPipelineConfig config;
    private final LoggingEventSerializer serializer = new LoggingEventSerializer();

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaOffsetResolver(BrokerService brokerService, KafkaPipelineConfig kafkaPipelineConfig) {
        this.brokerService = brokerService;
        this.config = kafkaPipelineConfig;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getStartOffset(Checkpoint checkpoint, int i) {
        long eventTimeByOffset;
        Preconditions.checkArgument(checkpoint.getNextOffset() > 0, "Invalid checkpoint offset");
        String topic = this.config.getTopic();
        BrokerInfo leader = this.brokerService.getLeader(topic, i);
        if (leader == null) {
            throw new LeaderNotAvailableException(String.format("BrokerInfo from BrokerService is null for topic %s partition %d. Will retry in next run.", topic, Integer.valueOf(i)));
        }
        SimpleConsumer simpleConsumer = new SimpleConsumer(leader.getHost(), leader.getPort(), SO_TIMEOUT_MILLIS, BUFFER_SIZE, "offset-finder-" + topic + UpgradeMetricsConstants.EMPTY_TAG + i);
        long nextOffset = checkpoint.getNextOffset() - 1;
        try {
            eventTimeByOffset = getEventTimeByOffset(simpleConsumer, i, nextOffset);
        } catch (NotFoundException | OffsetOutOfRangeException e) {
            LOG.debug("Cannot get valid log event in {}:{} at offset {}", new Object[]{topic, Integer.valueOf(i), Long.valueOf(nextOffset)});
        }
        if (eventTimeByOffset == checkpoint.getNextEventTime()) {
            return checkpoint.getNextOffset();
        }
        LOG.debug("Event timestamp in {}:{} at offset {} is {}. It doesn't match with checkpoint timestamp {}", new Object[]{topic, Integer.valueOf(i), Long.valueOf(nextOffset), Long.valueOf(eventTimeByOffset), Long.valueOf(checkpoint.getNextEventTime())});
        long findStartOffset = findStartOffset(simpleConsumer, i, checkpoint.getNextEventTime());
        LOG.debug("Found new nextOffset {} for topic {} partition {} with existing checkpoint {}.", new Object[]{Long.valueOf(findStartOffset), topic, Integer.valueOf(i), checkpoint});
        return findStartOffset;
    }

    private long findStartOffset(SimpleConsumer simpleConsumer, int i, long j) throws KafkaException {
        long decodeEventTimestamp;
        String topic = this.config.getTopic();
        long eventDelayMillis = j - this.config.getEventDelayMillis();
        long eventDelayMillis2 = j + this.config.getEventDelayMillis();
        long offsetByTimestamp = KafkaUtil.getOffsetByTimestamp(simpleConsumer, topic, i, eventDelayMillis);
        long j2 = offsetByTimestamp;
        boolean z = false;
        while (!z) {
            z = true;
            Iterator it = KafkaUtil.fetchMessages(simpleConsumer, topic, i, this.config.getKafkaFetchBufferSize(), offsetByTimestamp).iterator();
            while (true) {
                if (it.hasNext()) {
                    MessageAndOffset messageAndOffset = (MessageAndOffset) it.next();
                    z = false;
                    offsetByTimestamp = messageAndOffset.nextOffset();
                    try {
                        decodeEventTimestamp = this.serializer.decodeEventTimestamp(messageAndOffset.message().payload());
                    } catch (IOException e) {
                        LOG.trace("Fail to decode logging event time {}:{} at offset {}. Skipping it.", new Object[]{topic, Integer.valueOf(i), Long.valueOf(messageAndOffset.offset()), e});
                    }
                    if (decodeEventTimestamp == j) {
                        LOG.debug("Matching offset found in {}:{} at {} for timestamp {}", new Object[]{topic, Integer.valueOf(i), Long.valueOf(messageAndOffset.offset()), Long.valueOf(j)});
                        return offsetByTimestamp;
                    }
                    if (decodeEventTimestamp < eventDelayMillis) {
                        j2 = offsetByTimestamp;
                    }
                    if (decodeEventTimestamp > eventDelayMillis2) {
                        z = true;
                        break;
                    }
                }
            }
        }
        LOG.debug("Fail to find a log event with timestamp {} in {}:{}. The largest offset with event timestamp smaller than {} (target event time minus event delay {}) is {}", new Object[]{Long.valueOf(j), topic, Integer.valueOf(i), Long.valueOf(eventDelayMillis), Long.valueOf(this.config.getEventDelayMillis()), Long.valueOf(j2)});
        return j2;
    }

    private long getEventTimeByOffset(SimpleConsumer simpleConsumer, int i, long j) throws NotFoundException {
        String topic = this.config.getTopic();
        Iterator it = KafkaUtil.fetchMessages(simpleConsumer, topic, i, SINGLE_MESSAGE_MAX_SIZE, j).iterator();
        if (!it.hasNext()) {
            throw new NotFoundException("No message found in " + topic + ":" + i + " at offset " + j);
        }
        try {
            return this.serializer.decodeEventTimestamp(((MessageAndOffset) it.next()).message().payload());
        } catch (IOException e) {
            throw new NotFoundException("Invalid log event found in " + topic + ":" + i + " at offset " + j);
        }
    }
}
