package co.cask.cdap.metrics.process;

import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.metrics.MetricStore;
import co.cask.cdap.api.metrics.MetricValues;
import co.cask.cdap.common.io.BinaryDecoder;
import co.cask.cdap.internal.io.DatumReader;
import co.cask.common.io.ByteBufferInputStream;
import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import org.apache.twill.kafka.client.FetchedMessage;
import org.apache.twill.kafka.client.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/metrics/process/MetricsMessageCallback.class */
public final class MetricsMessageCallback implements KafkaConsumer.MessageCallback {
    private static final Logger LOG = LoggerFactory.getLogger(MetricsMessageCallback.class);
    private final DatumReader<MetricValues> recordReader;
    private final Schema recordSchema;
    private long recordProcessed;
    private MetricStore metricStore;

    public MetricsMessageCallback(DatumReader<MetricValues> datumReader, Schema schema, MetricStore metricStore) {
        this.recordReader = datumReader;
        this.recordSchema = schema;
        this.metricStore = metricStore;
    }

    public void onReceived(Iterator<FetchedMessage> it) {
        final ByteBufferInputStream byteBufferInputStream = new ByteBufferInputStream((ByteBuffer) null);
        ImmutableList copyOf = ImmutableList.copyOf(Iterators.filter(Iterators.transform(it, new Function<FetchedMessage, MetricValues>() { // from class: co.cask.cdap.metrics.process.MetricsMessageCallback.1
            public MetricValues apply(FetchedMessage fetchedMessage) {
                try {
                    return (MetricValues) MetricsMessageCallback.this.recordReader.read(new BinaryDecoder(byteBufferInputStream.reset(fetchedMessage.getPayload())), MetricsMessageCallback.this.recordSchema);
                } catch (IOException e) {
                    MetricsMessageCallback.LOG.info("Failed to decode message to MetricValue. Skipped. {}", e.getMessage());
                    return null;
                }
            }
        }), Predicates.notNull()));
        if (copyOf.isEmpty()) {
            LOG.info("No records to process.");
            return;
        }
        try {
            this.metricStore.add(copyOf);
            this.recordProcessed += copyOf.size();
            if (this.recordProcessed % 1000 == 0) {
                LOG.info("{} metrics records processed", Long.valueOf(this.recordProcessed));
                LOG.info("Last record time: {}", Long.valueOf(((MetricValues) copyOf.get(copyOf.size() - 1)).getTimestamp()));
            }
        } catch (Exception e) {
            LOG.error("Failed to add metrics data to a store");
            throw new RuntimeException("Failed to add metrics data to a store", e);
        }
    }

    public void finished() {
        LOG.info("Metrics MessageCallback completed.");
    }
}
