package co.cask.cdap.logging.save;

import co.cask.cdap.logging.kafka.KafkaLogEvent;
import co.cask.cdap.logging.write.LogFileWriter;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Table;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/save/LogWriter.class */
public class LogWriter implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(LogWriter.class);
    private final LogFileWriter<KafkaLogEvent> logFileWriter;
    private final Table<Long, String, List<KafkaLogEvent>> messageTable;
    private final long eventProcessingDelayMs;
    private final long eventBucketIntervalMs;
    private final ListMultimap<String, KafkaLogEvent> writeListMap = ArrayListMultimap.create();
    private int messages = 0;

    public LogWriter(LogFileWriter<KafkaLogEvent> logFileWriter, Table<Long, String, List<KafkaLogEvent>> table, long j, long j2) {
        this.logFileWriter = logFileWriter;
        this.messageTable = table;
        this.eventProcessingDelayMs = j;
        this.eventBucketIntervalMs = j2;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            if (this.writeListMap.isEmpty()) {
                this.messages = 0;
                long currentTimeMillis = (System.currentTimeMillis() - this.eventProcessingDelayMs) / this.eventBucketIntervalMs;
                synchronized (this.messageTable) {
                    Iterator it = this.messageTable.cellSet().iterator();
                    while (it.hasNext()) {
                        Table.Cell cell = (Table.Cell) it.next();
                        if (((Long) cell.getRowKey()).longValue() < currentTimeMillis) {
                            this.writeListMap.putAll(cell.getColumnKey(), (Iterable) cell.getValue());
                            this.messages += ((List) cell.getValue()).size();
                            it.remove();
                        }
                    }
                }
            }
            LOG.debug("Got {} log messages to save", Integer.valueOf(this.messages));
            Iterator it2 = this.writeListMap.keySet().iterator();
            while (it2.hasNext()) {
                List<KafkaLogEvent> list = this.writeListMap.get((String) it2.next());
                Collections.sort(list);
                this.logFileWriter.append(list);
                it2.remove();
            }
        } catch (Throwable th) {
            LOG.error("Caught exception during save, will try again.", th);
        }
    }
}
