package pl.allegro.tech.hermes.tracker.elasticsearch.frontend;

import com.codahale.metrics.MetricRegistry;
import java.io.IOException;
import java.util.Map;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import pl.allegro.tech.hermes.api.PublishedMessageTraceStatus;
import pl.allegro.tech.hermes.common.http.ExtraRequestHeadersCollector;
import pl.allegro.tech.hermes.metrics.PathsCompiler;
import pl.allegro.tech.hermes.tracker.BatchingLogRepository;
import pl.allegro.tech.hermes.tracker.elasticsearch.ElasticsearchDocument;
import pl.allegro.tech.hermes.tracker.elasticsearch.ElasticsearchQueueCommitter;
import pl.allegro.tech.hermes.tracker.elasticsearch.IndexFactory;
import pl.allegro.tech.hermes.tracker.elasticsearch.LogSchemaAware;
import pl.allegro.tech.hermes.tracker.elasticsearch.SchemaManager;
import pl.allegro.tech.hermes.tracker.elasticsearch.metrics.Gauges;
import pl.allegro.tech.hermes.tracker.frontend.LogRepository;

/* loaded from: input_file:pl/allegro/tech/hermes/tracker/elasticsearch/frontend/FrontendElasticsearchLogRepository.class */
public class FrontendElasticsearchLogRepository extends BatchingLogRepository<ElasticsearchDocument> implements LogRepository, LogSchemaAware {
    private static final int DOCUMENT_EXPECTED_SIZE = 1024;
    private final Client elasticClient;

    /* loaded from: input_file:pl/allegro/tech/hermes/tracker/elasticsearch/frontend/FrontendElasticsearchLogRepository$Builder.class */
    public static class Builder {
        private Client elasticClient;
        private String clusterName = "primary";
        private String hostName = "unknown";
        private int queueSize = 1000;
        private int commitInterval = 100;
        private FrontendIndexFactory indexFactory = new FrontendDailyIndexFactory();
        private String typeName = SchemaManager.PUBLISHED_TYPE;
        private final MetricRegistry metricRegistry;
        private final PathsCompiler pathsCompiler;

        public Builder(Client client, PathsCompiler pathsCompiler, MetricRegistry metricRegistry) {
            this.elasticClient = client;
            this.pathsCompiler = pathsCompiler;
            this.metricRegistry = metricRegistry;
        }

        public Builder withElasticClient(Client client) {
            this.elasticClient = client;
            return this;
        }

        public Builder withClusterName(String str) {
            this.clusterName = str;
            return this;
        }

        public Builder withHostName(String str) {
            this.hostName = str;
            return this;
        }

        public Builder withQueueSize(int i) {
            this.queueSize = i;
            return this;
        }

        public Builder withCommitInterval(int i) {
            this.commitInterval = i;
            return this;
        }

        public Builder withTypeName(String str) {
            this.typeName = str;
            return this;
        }

        public Builder withIndexFactory(FrontendIndexFactory frontendIndexFactory) {
            this.indexFactory = frontendIndexFactory;
            return this;
        }

        public FrontendElasticsearchLogRepository build() {
            return new FrontendElasticsearchLogRepository(this.elasticClient, this.clusterName, this.hostName, this.queueSize, this.commitInterval, this.indexFactory, this.typeName, this.metricRegistry, this.pathsCompiler);
        }
    }

    private FrontendElasticsearchLogRepository(Client client, String str, String str2, int i, int i2, IndexFactory indexFactory, String str3, MetricRegistry metricRegistry, PathsCompiler pathsCompiler) {
        super(i, str, str2, metricRegistry, pathsCompiler);
        this.elasticClient = client;
        registerQueueSizeGauge(Gauges.PRODUCER_TRACKER_ELASTICSEARCH_QUEUE_SIZE);
        registerRemainingCapacityGauge(Gauges.PRODUCER_TRACKER_ELASTICSEARCH_REMAINING_CAPACITY);
        ElasticsearchQueueCommitter.scheduleCommitAtFixedRate(this.queue, indexFactory, str3, client, metricRegistry.timer(pathsCompiler.compile("tracker.elasticsearch.commit-latency")), i2);
    }

    public void logPublished(String str, long j, String str2, String str3, Map<String, String> map) {
        this.queue.offer(ElasticsearchDocument.build(() -> {
            return document(str, j, str2, PublishedMessageTraceStatus.SUCCESS, str3, map);
        }));
    }

    public void logError(String str, long j, String str2, String str3, String str4, Map<String, String> map) {
        this.queue.offer(ElasticsearchDocument.build(() -> {
            return document(str, j, str2, PublishedMessageTraceStatus.ERROR, str3, str4, map);
        }));
    }

    public void logInflight(String str, long j, String str2, String str3, Map<String, String> map) {
        this.queue.offer(ElasticsearchDocument.build(() -> {
            return document(str, j, str2, PublishedMessageTraceStatus.INFLIGHT, str3, map);
        }));
    }

    public void close() {
        this.elasticClient.close();
    }

    private XContentBuilder document(String str, long j, String str2, PublishedMessageTraceStatus publishedMessageTraceStatus, String str3, Map<String, String> map) throws IOException {
        return notEndedDocument(str, j, str2, publishedMessageTraceStatus.toString(), str3, map).endObject();
    }

    private XContentBuilder document(String str, long j, String str2, PublishedMessageTraceStatus publishedMessageTraceStatus, String str3, String str4, Map<String, String> map) throws IOException {
        return notEndedDocument(str, j, str2, publishedMessageTraceStatus.toString(), str4, map).field(LogSchemaAware.REASON, str3).endObject();
    }

    protected XContentBuilder notEndedDocument(String str, long j, String str2, String str3, String str4, Map<String, String> map) throws IOException {
        return XContentFactory.jsonBuilder(new BytesStreamOutput(DOCUMENT_EXPECTED_SIZE)).startObject().field(LogSchemaAware.MESSAGE_ID, str).field(LogSchemaAware.TIMESTAMP, j).field(LogSchemaAware.TIMESTAMP_SECONDS, toSeconds(j)).field(LogSchemaAware.TOPIC_NAME, str2).field(LogSchemaAware.STATUS, str3).field(LogSchemaAware.CLUSTER, this.clusterName).field(LogSchemaAware.SOURCE_HOSTNAME, this.hostname).field(LogSchemaAware.REMOTE_HOSTNAME, str4).field(LogSchemaAware.EXTRA_REQUEST_HEADERS, (String) map.entrySet().stream().collect(ExtraRequestHeadersCollector.extraRequestHeadersCollector()));
    }

    private long toSeconds(long j) {
        return j / 1000;
    }
}
