package co.cask.cdap.logging.save;

import ch.qos.logback.classic.spi.ILoggingEvent;
import co.cask.cdap.common.logging.LoggingContext;
import co.cask.cdap.logging.appender.kafka.LoggingEventSerializer;
import co.cask.cdap.logging.context.LoggingContextHelper;
import co.cask.cdap.logging.kafka.KafkaLogEvent;
import com.google.common.collect.Lists;
import com.google.common.collect.Table;
import java.util.Iterator;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.twill.kafka.client.FetchedMessage;
import org.apache.twill.kafka.client.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/save/LogCollectorCallback.class */
public class LogCollectorCallback implements KafkaConsumer.MessageCallback {
    private static final Logger LOG = LoggerFactory.getLogger(LogCollectorCallback.class);
    private final Table<Long, String, List<KafkaLogEvent>> messageTable;
    private final LoggingEventSerializer serializer;
    private final long eventBucketIntervalMs;

    public LogCollectorCallback(Table<Long, String, List<KafkaLogEvent>> table, LoggingEventSerializer loggingEventSerializer, long j) {
        this.messageTable = table;
        this.serializer = loggingEventSerializer;
        this.eventBucketIntervalMs = j;
    }

    public void onReceived(Iterator<FetchedMessage> it) {
        int i = 0;
        while (it.hasNext()) {
            FetchedMessage next = it.next();
            try {
                GenericRecord genericRecord = this.serializer.toGenericRecord(next.getPayload());
                ILoggingEvent fromGenericRecord = this.serializer.fromGenericRecord(genericRecord);
                LoggingContext loggingContext = LoggingContextHelper.getLoggingContext(fromGenericRecord.getMDCPropertyMap());
                synchronized (this.messageTable) {
                    long timeStamp = fromGenericRecord.getTimeStamp() / this.eventBucketIntervalMs;
                    List list = (List) this.messageTable.get(Long.valueOf(timeStamp), loggingContext.getLogPathFragment());
                    if (list == null) {
                        list = Lists.newArrayList();
                        this.messageTable.put(Long.valueOf(timeStamp), loggingContext.getLogPathFragment(), list);
                    }
                    list.add(new KafkaLogEvent(genericRecord, fromGenericRecord, loggingContext, next.getTopicPartition().getPartition(), next.getNextOffset()));
                }
            } catch (Throwable th) {
                LOG.warn("Exception while processing message with nextOffset {}. Skipping it.", Long.valueOf(next.getNextOffset()), th);
            }
            i++;
        }
        LOG.debug("Got {} messages from kafka", Integer.valueOf(i));
    }

    public void finished() {
        LOG.info("LogCollectorCallback finished.");
    }
}
