package co.cask.cdap.logging.save;

import co.cask.cdap.logging.kafka.KafkaLogEvent;
import co.cask.cdap.logging.write.LogFileWriter;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.RowSortedTable;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/save/LogWriter.class */
public class LogWriter implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(LogWriter.class);
    private final LogFileWriter<KafkaLogEvent> logFileWriter;
    private final RowSortedTable<Long, String, Map.Entry<Long, List<KafkaLogEvent>>> messageTable;
    private final long eventBucketIntervalMs;
    private final long maxNumberOfBucketsInTable;
    private final ListMultimap<String, KafkaLogEvent> writeListMap = ArrayListMultimap.create();
    private int messages = 0;

    public LogWriter(LogFileWriter<KafkaLogEvent> logFileWriter, RowSortedTable<Long, String, Map.Entry<Long, List<KafkaLogEvent>>> rowSortedTable, long j, long j2) {
        this.logFileWriter = logFileWriter;
        this.messageTable = rowSortedTable;
        this.eventBucketIntervalMs = j;
        this.maxNumberOfBucketsInTable = j2;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            if (this.writeListMap.isEmpty()) {
                this.messages = 0;
                long currentTimeMillis = System.currentTimeMillis() / this.eventBucketIntervalMs;
                synchronized (this.messageTable) {
                    SortedSet rowKeySet = this.messageTable.rowKeySet();
                    if (!rowKeySet.isEmpty()) {
                        Iterator it = this.messageTable.row(Long.valueOf(((Long) rowKeySet.first()).longValue())).entrySet().iterator();
                        while (it.hasNext()) {
                            Map.Entry entry = (Map.Entry) it.next();
                            if (currentTimeMillis < ((Long) ((Map.Entry) entry.getValue()).getKey()).longValue() + this.maxNumberOfBucketsInTable) {
                                break;
                            }
                            this.writeListMap.putAll(entry.getKey(), (Iterable) ((Map.Entry) entry.getValue()).getValue());
                            this.messages += ((List) ((Map.Entry) entry.getValue()).getValue()).size();
                            it.remove();
                        }
                    }
                }
                LOG.debug("Got {} log messages to save", Integer.valueOf(this.messages));
                Iterator it2 = this.writeListMap.asMap().entrySet().iterator();
                while (it2.hasNext()) {
                    List<KafkaLogEvent> list = (List) ((Map.Entry) it2.next()).getValue();
                    Collections.sort(list);
                    this.logFileWriter.append(list);
                    it2.remove();
                }
            }
        } catch (Throwable th) {
            LOG.error("Caught exception during save, will try again.", th);
        }
    }
}
