package co.cask.cdap.logging.save;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.logging.LoggingConfiguration;
import co.cask.cdap.logging.appender.kafka.KafkaTopic;
import co.cask.cdap.logging.appender.kafka.LoggingEventSerializer;
import co.cask.cdap.logging.kafka.KafkaLogEvent;
import co.cask.cdap.logging.write.AvroFileWriter;
import co.cask.cdap.logging.write.FileMetaDataManager;
import co.cask.cdap.logging.write.LogCleanup;
import co.cask.cdap.logging.write.LogFileWriter;
import co.cask.cdap.watchdog.election.PartitionChangeHandler;
import co.cask.tephra.TransactionSystemClient;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.collect.RowSortedTable;
import com.google.common.collect.TreeBasedTable;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.filesystem.Location;
import org.apache.twill.filesystem.LocationFactory;
import org.apache.twill.kafka.client.KafkaClientService;
import org.apache.twill.kafka.client.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/save/LogSaver.class */
public final class LogSaver extends AbstractIdleService implements PartitionChangeHandler {
    private static final Logger LOG = LoggerFactory.getLogger(LogSaver.class);
    private final String topic;
    private final LoggingEventSerializer serializer;
    private final KafkaClientService kafkaClient;
    private final CheckpointManager checkpointManager;
    private final RowSortedTable<Long, String, Map.Entry<Long, List<KafkaLogEvent>>> messageTable;
    private final long eventBucketIntervalMs;
    private final int logCleanupIntervalMins;
    private final long maxNumberOfBucketsInTable;
    private final LogFileWriter<KafkaLogEvent> logFileWriter;
    private final ListeningScheduledExecutorService scheduledExecutor;
    private final LogCleanup logCleanup;
    private ScheduledFuture<?> logWriterFuture;
    private ScheduledFuture<?> cleanupFuture;
    private Map<Integer, Cancellable> kafkaCancelMap;
    private Map<Integer, CountDownLatch> kafkaCancelCallbackLatchMap;

    @Inject
    public LogSaver(LogSaverTableUtil logSaverTableUtil, TransactionSystemClient transactionSystemClient, KafkaClientService kafkaClientService, CConfiguration cConfiguration, LocationFactory locationFactory) throws Exception {
        LOG.info("Initializing LogSaver...");
        this.topic = KafkaTopic.getTopic();
        LOG.info(String.format("Kafka topic is %s", this.topic));
        this.serializer = new LoggingEventSerializer();
        this.checkpointManager = new CheckpointManager(logSaverTableUtil, transactionSystemClient, this.topic);
        FileMetaDataManager fileMetaDataManager = new FileMetaDataManager(logSaverTableUtil, transactionSystemClient, locationFactory);
        this.messageTable = TreeBasedTable.create();
        this.kafkaClient = kafkaClientService;
        String str = cConfiguration.get(LoggingConfiguration.LOG_BASE_DIR);
        Preconditions.checkNotNull(str, "Log base dir cannot be null");
        Location create = locationFactory.create(str);
        LOG.info(String.format("Log base dir is %s", create.toURI()));
        long j = cConfiguration.getLong(LoggingConfiguration.LOG_RETENTION_DURATION_DAYS, 30L);
        Preconditions.checkArgument(j > 0, "Log file retention duration is invalid: %s", new Object[]{Long.valueOf(j)});
        long convert = TimeUnit.MILLISECONDS.convert(j, TimeUnit.DAYS);
        long j2 = cConfiguration.getLong(LoggingConfiguration.LOG_MAX_FILE_SIZE_BYTES, 20971520L);
        Preconditions.checkArgument(j2 > 0, "Max log file size is invalid: %s", new Object[]{Long.valueOf(j2)});
        int i = cConfiguration.getInt(LoggingConfiguration.LOG_FILE_SYNC_INTERVAL_BYTES, 51200);
        Preconditions.checkArgument(i > 0, "Log file sync interval is invalid: %s", new Object[]{Integer.valueOf(i)});
        long j3 = cConfiguration.getLong(LoggingConfiguration.LOG_SAVER_CHECKPOINT_INTERVAL_MS, LoggingConfiguration.DEFAULT_LOG_SAVER_CHECKPOINT_INTERVAL_MS);
        Preconditions.checkArgument(j3 > 0, "Checkpoint interval is invalid: %s", new Object[]{Long.valueOf(j3)});
        long j4 = cConfiguration.getLong(LoggingConfiguration.LOG_SAVER_INACTIVE_FILE_INTERVAL_MS, LoggingConfiguration.DEFAULT_LOG_SAVER_INACTIVE_FILE_INTERVAL_MS);
        Preconditions.checkArgument(j4 > 0, "Inactive interval is invalid: %s", new Object[]{Long.valueOf(j4)});
        this.eventBucketIntervalMs = cConfiguration.getLong(LoggingConfiguration.LOG_SAVER_EVENT_BUCKET_INTERVAL_MS, 1000L);
        Preconditions.checkArgument(this.eventBucketIntervalMs > 0, "Event bucket interval is invalid: %s", new Object[]{Long.valueOf(this.eventBucketIntervalMs)});
        this.maxNumberOfBucketsInTable = cConfiguration.getLong(LoggingConfiguration.LOG_SAVER_MAXIMUM_INMEMORY_EVENT_BUCKETS, 8L);
        Preconditions.checkArgument(this.maxNumberOfBucketsInTable > 0, "Maximum number of event buckets in memory is invalid: %s", new Object[]{Long.valueOf(this.maxNumberOfBucketsInTable)});
        long j5 = cConfiguration.getLong(LoggingConfiguration.LOG_SAVER_TOPIC_WAIT_SLEEP_MS, LoggingConfiguration.DEFAULT_LOG_SAVER_TOPIC_WAIT_SLEEP_MS);
        Preconditions.checkArgument(j5 > 0, "Topic creation wait sleep is invalid: %s", new Object[]{Long.valueOf(j5)});
        this.logCleanupIntervalMins = cConfiguration.getInt(LoggingConfiguration.LOG_CLEANUP_RUN_INTERVAL_MINS, LoggingConfiguration.DEFAULT_LOG_CLEANUP_RUN_INTERVAL_MINS);
        Preconditions.checkArgument(this.logCleanupIntervalMins > 0, "Log cleanup run interval is invalid: %s", new Object[]{Integer.valueOf(this.logCleanupIntervalMins)});
        this.logFileWriter = new CheckpointingLogFileWriter(new AvroFileWriter(fileMetaDataManager, create, this.serializer.getAvroSchema(), j2, i, j4), this.checkpointManager, j3);
        this.scheduledExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("log-saver-main")));
        this.logCleanup = new LogCleanup(fileMetaDataManager, create, convert);
        this.kafkaCancelMap = new HashMap();
        this.kafkaCancelCallbackLatchMap = new HashMap();
    }

    @Override // co.cask.cdap.watchdog.election.PartitionChangeHandler
    public void partitionsChanged(Set<Integer> set) {
        try {
            LOG.info("Changed partitions: {}", set);
            unscheduleTasks();
            scheduleTasks(set);
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    protected void startUp() throws Exception {
        LOG.info("Starting LogSaver...");
    }

    protected void shutDown() throws Exception {
        LOG.info("Stopping LogSaver...");
        cancelLogCollectorCallbacks();
        this.scheduledExecutor.shutdown();
        this.logFileWriter.flush();
        this.logFileWriter.close();
    }

    private void scheduleTasks(Set<Integer> set) throws Exception {
        if (!isRunning()) {
            LOG.info("Not scheduling when stopping!");
            return;
        }
        subscribe(set);
        this.logWriterFuture = this.scheduledExecutor.scheduleWithFixedDelay(new LogWriter(this.logFileWriter, this.messageTable, this.eventBucketIntervalMs, this.maxNumberOfBucketsInTable), 100L, 200L, TimeUnit.MILLISECONDS);
        if (set.contains(0)) {
            LOG.info("Scheduling cleanup task");
            this.cleanupFuture = this.scheduledExecutor.scheduleAtFixedRate(this.logCleanup, 10L, this.logCleanupIntervalMins, TimeUnit.MINUTES);
        }
    }

    private void unscheduleTasks() throws Exception {
        if (this.logWriterFuture != null && !this.logWriterFuture.isCancelled() && !this.logWriterFuture.isDone()) {
            this.logWriterFuture.cancel(false);
            this.logWriterFuture = null;
        }
        if (this.cleanupFuture != null && !this.cleanupFuture.isCancelled() && !this.cleanupFuture.isDone()) {
            this.cleanupFuture.cancel(false);
            this.cleanupFuture = null;
        }
        this.logFileWriter.flush();
        cancelLogCollectorCallbacks();
        this.messageTable.clear();
    }

    private void cancelLogCollectorCallbacks() {
        for (Map.Entry<Integer, Cancellable> entry : this.kafkaCancelMap.entrySet()) {
            if (entry.getValue() != null) {
                LOG.info("Cancelling kafka callback for partition {}", entry.getKey());
                this.kafkaCancelCallbackLatchMap.get(entry.getKey()).countDown();
                entry.getValue().cancel();
            }
        }
        this.kafkaCancelMap.clear();
        this.kafkaCancelCallbackLatchMap.clear();
    }

    private void subscribe(Set<Integer> set) throws Exception {
        LOG.info("Prepare to subscribe for partitions: {}", set);
        HashMap newHashMap = Maps.newHashMap();
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            KafkaConsumer.Preparer prepare = this.kafkaClient.getConsumer().prepare();
            long checkpoint = this.checkpointManager.getCheckpoint(intValue);
            newHashMap.put(Integer.valueOf(intValue), Long.valueOf(checkpoint));
            if (checkpoint >= 0) {
                prepare.add(this.topic, intValue, checkpoint);
            } else {
                prepare.addFromBeginning(this.topic, intValue);
            }
            this.kafkaCancelCallbackLatchMap.put(Integer.valueOf(intValue), new CountDownLatch(1));
            this.kafkaCancelMap.put(Integer.valueOf(intValue), prepare.consume(new LogCollectorCallback(this.messageTable, this.serializer, this.eventBucketIntervalMs, this.maxNumberOfBucketsInTable, this.kafkaCancelCallbackLatchMap.get(Integer.valueOf(intValue)))));
        }
        LOG.info("Consumer created for topic {}, partitions {}", this.topic, newHashMap);
    }
}
