package co.cask.cdap.logging.read;

import ch.qos.logback.classic.spi.ILoggingEvent;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.logging.LoggingContext;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.logging.LoggingConfiguration;
import co.cask.cdap.logging.appender.kafka.KafkaTopic;
import co.cask.cdap.logging.appender.kafka.LoggingEventSerializer;
import co.cask.cdap.logging.appender.kafka.StringPartitioner;
import co.cask.cdap.logging.context.LoggingContextHelper;
import co.cask.cdap.logging.filter.AndFilter;
import co.cask.cdap.logging.filter.Filter;
import co.cask.cdap.logging.kafka.KafkaConsumer;
import co.cask.cdap.logging.save.LogSaverTableUtil;
import co.cask.cdap.logging.serialize.LogSchema;
import co.cask.cdap.logging.write.FileMetaDataManager;
import co.cask.tephra.TransactionSystemClient;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.twill.common.Threads;
import org.apache.twill.filesystem.Location;
import org.apache.twill.filesystem.LocationFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/read/DistributedLogReader.class */
public final class DistributedLogReader implements LogReader {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedLogReader.class);
    private static final int MAX_THREAD_POOL_SIZE = 20;
    private final List<LoggingConfiguration.KafkaHost> seedBrokers;
    private final String topic;
    private final int numPartitions;
    private final LoggingEventSerializer serializer;
    private final FileMetaDataManager fileMetaDataManager;
    private final Schema schema;
    private final ExecutorService executor;
    private final StringPartitioner partitioner;
    private final int kafkaTailFetchTimeoutMs = 300;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/logging/read/DistributedLogReader$KafkaCallback.class */
    public static class KafkaCallback implements co.cask.cdap.logging.kafka.Callback {
        private final Filter logFilter;
        private final LoggingEventSerializer serializer;
        private final long stopOffset;
        private final int maxEvents;
        private final Callback callback;
        private long lastOffset;
        private int count;

        private KafkaCallback(Filter filter, LoggingEventSerializer loggingEventSerializer, long j, int i, Callback callback) {
            this.lastOffset = -1L;
            this.count = 0;
            this.logFilter = filter;
            this.serializer = loggingEventSerializer;
            this.stopOffset = j;
            this.maxEvents = i;
            this.callback = callback;
        }

        @Override // co.cask.cdap.logging.kafka.Callback
        public void handle(long j, ByteBuffer byteBuffer) {
            ILoggingEvent fromBytes = this.serializer.fromBytes(byteBuffer);
            if (j < this.stopOffset && this.count < this.maxEvents && this.logFilter.match(fromBytes)) {
                this.count++;
                this.callback.handle(new LogEvent(fromBytes, j));
            }
            this.lastOffset = j;
        }

        public long getLastOffset() {
            return this.lastOffset;
        }

        public int getCount() {
            return this.count;
        }
    }

    @Inject
    public DistributedLogReader(DatasetFramework datasetFramework, TransactionSystemClient transactionSystemClient, CConfiguration cConfiguration, LocationFactory locationFactory) {
        try {
            this.seedBrokers = LoggingConfiguration.getKafkaSeedBrokers(cConfiguration.get(LoggingConfiguration.KAFKA_SEED_BROKERS));
            Preconditions.checkArgument(!this.seedBrokers.isEmpty(), "Kafka seed brokers list is empty!");
            this.topic = KafkaTopic.getTopic();
            Preconditions.checkArgument(!this.topic.isEmpty(), "Kafka topic is emtpty!");
            this.numPartitions = cConfiguration.getInt(LoggingConfiguration.NUM_PARTITIONS, -1);
            Preconditions.checkArgument(this.numPartitions > 0, "numPartitions should be greater than 0. Got numPartitions=%s", new Object[]{Integer.valueOf(this.numPartitions)});
            this.partitioner = new StringPartitioner(this.numPartitions);
            this.serializer = new LoggingEventSerializer();
            this.fileMetaDataManager = new FileMetaDataManager(new LogSaverTableUtil(datasetFramework, cConfiguration), transactionSystemClient, locationFactory);
            this.schema = new LogSchema().getAvroSchema();
            this.executor = new ThreadPoolExecutor(0, MAX_THREAD_POOL_SIZE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), Threads.createDaemonThreadFactory("dist-log-reader-%d"), new ThreadPoolExecutor.DiscardPolicy());
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // co.cask.cdap.logging.read.LogReader
    public void getLogNext(final LoggingContext loggingContext, final long j, final int i, final Filter filter, final Callback callback) {
        if (j < 0) {
            getLogPrev(loggingContext, j, i, filter, callback);
        } else {
            this.executor.submit(new Runnable() { // from class: co.cask.cdap.logging.read.DistributedLogReader.1
                @Override // java.lang.Runnable
                public void run() {
                    int partition = DistributedLogReader.this.partitioner.partition(loggingContext.getLogPartition(), DistributedLogReader.this.numPartitions);
                    callback.init();
                    KafkaConsumer kafkaConsumer = new KafkaConsumer(DistributedLogReader.this.seedBrokers, DistributedLogReader.this.topic, partition, 300);
                    try {
                        try {
                            AndFilter andFilter = new AndFilter(ImmutableList.of(LoggingContextHelper.createFilter(loggingContext), filter));
                            long fetchOffset = kafkaConsumer.fetchOffset(KafkaConsumer.Offset.LATEST);
                            long j2 = j + 1;
                            try {
                                if (j2 >= fetchOffset) {
                                    try {
                                        callback.close();
                                        kafkaConsumer.close();
                                        return;
                                    } finally {
                                    }
                                }
                                DistributedLogReader.this.fetchLogEvents(kafkaConsumer, andFilter, j2, fetchOffset, i, callback);
                                try {
                                    try {
                                        callback.close();
                                        kafkaConsumer.close();
                                    } finally {
                                    }
                                } catch (IOException e) {
                                    DistributedLogReader.LOG.error(String.format("Caught exception when closing KafkaConsumer for topic %s, partition %d", DistributedLogReader.this.topic, Integer.valueOf(partition)), e);
                                }
                            } catch (IOException e2) {
                                DistributedLogReader.LOG.error(String.format("Caught exception when closing KafkaConsumer for topic %s, partition %d", DistributedLogReader.this.topic, Integer.valueOf(partition)), e2);
                            }
                        } catch (Throwable th) {
                            try {
                            } catch (IOException e3) {
                                DistributedLogReader.LOG.error(String.format("Caught exception when closing KafkaConsumer for topic %s, partition %d", DistributedLogReader.this.topic, Integer.valueOf(partition)), e3);
                            }
                            try {
                                callback.close();
                                kafkaConsumer.close();
                                throw th;
                            } finally {
                                kafkaConsumer.close();
                            }
                        }
                    } catch (Throwable th2) {
                        DistributedLogReader.LOG.error("Got exception: ", th2);
                        throw Throwables.propagate(th2);
                    }
                }
            });
        }
    }

    @Override // co.cask.cdap.logging.read.LogReader
    public void getLogPrev(final LoggingContext loggingContext, final long j, final int i, final Filter filter, final Callback callback) {
        this.executor.submit(new Runnable() { // from class: co.cask.cdap.logging.read.DistributedLogReader.2
            @Override // java.lang.Runnable
            public void run() {
                int partition = DistributedLogReader.this.partitioner.partition(loggingContext.getLogPartition(), DistributedLogReader.this.numPartitions);
                callback.init();
                KafkaConsumer kafkaConsumer = new KafkaConsumer(DistributedLogReader.this.seedBrokers, DistributedLogReader.this.topic, partition, 300);
                try {
                    try {
                        AndFilter andFilter = new AndFilter(ImmutableList.of(LoggingContextHelper.createFilter(loggingContext), filter));
                        long fetchOffset = kafkaConsumer.fetchOffset(KafkaConsumer.Offset.LATEST);
                        long fetchOffset2 = kafkaConsumer.fetchOffset(KafkaConsumer.Offset.EARLIEST);
                        long j2 = j < 0 ? fetchOffset : j;
                        long j3 = j2 - i;
                        if (j3 < fetchOffset2) {
                            j3 = fetchOffset2;
                        }
                        if (j3 >= j2 || j3 >= fetchOffset) {
                            try {
                                try {
                                    callback.close();
                                    kafkaConsumer.close();
                                    return;
                                } finally {
                                }
                            } catch (IOException e) {
                                DistributedLogReader.LOG.error(String.format("Caught exception when closing KafkaConsumer for topic %s, partition %d", DistributedLogReader.this.topic, Integer.valueOf(partition)), e);
                                return;
                            }
                        }
                        int i2 = 0;
                        while (i2 == 0) {
                            i2 = DistributedLogReader.this.fetchLogEvents(kafkaConsumer, andFilter, j3, j2, i, callback);
                            j2 = j3;
                            if (j2 <= fetchOffset2) {
                                break;
                            }
                            j3 = j2 - i;
                            if (j3 < fetchOffset2) {
                                j3 = fetchOffset2;
                            }
                        }
                        try {
                            try {
                                callback.close();
                                kafkaConsumer.close();
                            } catch (IOException e2) {
                                DistributedLogReader.LOG.error(String.format("Caught exception when closing KafkaConsumer for topic %s, partition %d", DistributedLogReader.this.topic, Integer.valueOf(partition)), e2);
                            }
                        } finally {
                        }
                    } catch (Throwable th) {
                        try {
                            try {
                                callback.close();
                                kafkaConsumer.close();
                            } finally {
                                kafkaConsumer.close();
                            }
                        } catch (IOException e3) {
                            DistributedLogReader.LOG.error(String.format("Caught exception when closing KafkaConsumer for topic %s, partition %d", DistributedLogReader.this.topic, Integer.valueOf(partition)), e3);
                        }
                        throw th;
                    }
                } catch (Throwable th2) {
                    DistributedLogReader.LOG.error("Got exception: ", th2);
                    throw Throwables.propagate(th2);
                }
            }
        });
    }

    @Override // co.cask.cdap.logging.read.LogReader
    public void getLog(final LoggingContext loggingContext, final long j, final long j2, final Filter filter, final Callback callback) {
        this.executor.submit(new Runnable() { // from class: co.cask.cdap.logging.read.DistributedLogReader.3
            @Override // java.lang.Runnable
            public void run() {
                callback.init();
                try {
                    try {
                        AndFilter andFilter = new AndFilter(ImmutableList.of(LoggingContextHelper.createFilter(loggingContext), filter));
                        SortedMap<Long, Location> listFiles = DistributedLogReader.this.fileMetaDataManager.listFiles(loggingContext);
                        long j3 = -1;
                        Location location = null;
                        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(listFiles.size());
                        for (Map.Entry<Long, Location> entry : listFiles.entrySet()) {
                            if (entry.getKey().longValue() >= j && j3 != -1 && j3 < j2) {
                                newArrayListWithExpectedSize.add(location);
                            }
                            j3 = entry.getKey().longValue();
                            location = entry.getValue();
                        }
                        if (j3 != -1 && j3 < j2) {
                            newArrayListWithExpectedSize.add(location);
                        }
                        AvroFileLogReader avroFileLogReader = new AvroFileLogReader(DistributedLogReader.this.schema);
                        Iterator it = newArrayListWithExpectedSize.iterator();
                        while (it.hasNext()) {
                            avroFileLogReader.readLog((Location) it.next(), andFilter, j, j2, Integer.MAX_VALUE, callback);
                        }
                    } catch (Throwable th) {
                        DistributedLogReader.LOG.error("Got exception: ", th);
                        throw Throwables.propagate(th);
                    }
                } finally {
                    callback.close();
                }
            }
        });
    }

    @Override // co.cask.cdap.logging.read.LogReader
    public void close() {
        if (this.executor != null) {
            this.executor.shutdownNow();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int fetchLogEvents(KafkaConsumer kafkaConsumer, Filter filter, long j, long j2, int i, Callback callback) {
        KafkaCallback kafkaCallback = new KafkaCallback(filter, this.serializer, j2, i, callback);
        while (kafkaCallback.getCount() < i && j < j2) {
            kafkaConsumer.fetchMessages(j, kafkaCallback);
            if (kafkaCallback.getLastOffset() == -1) {
                break;
            }
            j = kafkaCallback.getLastOffset() + 1;
        }
        return kafkaCallback.getCount();
    }
}
