package pl.allegro.tech.hermes.tracker.elasticsearch.management;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.FilteredQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import pl.allegro.tech.hermes.api.MessageTrace;
import pl.allegro.tech.hermes.api.PublishedMessageTrace;
import pl.allegro.tech.hermes.api.PublishedMessageTraceStatus;
import pl.allegro.tech.hermes.api.SentMessageTrace;
import pl.allegro.tech.hermes.api.SentMessageTraceStatus;
import pl.allegro.tech.hermes.tracker.elasticsearch.ElasticsearchRepositoryException;
import pl.allegro.tech.hermes.tracker.elasticsearch.LogSchemaAware;
import pl.allegro.tech.hermes.tracker.elasticsearch.SchemaManager;
import pl.allegro.tech.hermes.tracker.management.LogRepository;

/* loaded from: input_file:pl/allegro/tech/hermes/tracker/elasticsearch/management/ElasticsearchLogRepository.class */
public class ElasticsearchLogRepository implements LogRepository, LogSchemaAware {
    private static final int LIMIT = 1000;
    private final Client elasticClient;

    public ElasticsearchLogRepository(Client client) {
        this(client, SchemaManager.schemaManagerWithDailyIndexes(client));
    }

    public ElasticsearchLogRepository(Client client, SchemaManager schemaManager) {
        this.elasticClient = client;
        schemaManager.ensureSchema();
    }

    public List<SentMessageTrace> getLastUndeliveredMessages(String str, String str2, int i) {
        try {
            return (List) Arrays.stream(searchSentMessages(i, SortOrder.DESC, createFilterQuery(filter(LogSchemaAware.TOPIC_NAME, str), filter(LogSchemaAware.SUBSCRIPTION, str2), filter(LogSchemaAware.STATUS, SentMessageTraceStatus.DISCARDED.name()))).getHits().hits()).map(this::toSentMessageTrace).collect(Collectors.toList());
        } catch (InterruptedException | ExecutionException e) {
            throw new ElasticsearchRepositoryException(e);
        }
    }

    public List<MessageTrace> getMessageStatus(String str, String str2, String str3) {
        try {
            return (List) Stream.concat(Arrays.stream(searchPublishedMessages(LIMIT, createFilterQuery(filter(LogSchemaAware.TOPIC_NAME, str), filter(LogSchemaAware.MESSAGE_ID, str3))).getHits().hits()).map(this::toPublishedMessageTrace), Arrays.stream(searchSentMessages(LIMIT, SortOrder.ASC, createFilterQuery(filter(LogSchemaAware.TOPIC_NAME, str), filter(LogSchemaAware.SUBSCRIPTION, str2), filter(LogSchemaAware.MESSAGE_ID, str3))).getHits().hits()).map(this::toSentMessageTrace)).collect(Collectors.toList());
        } catch (InterruptedException | ExecutionException e) {
            throw new ElasticsearchRepositoryException(e);
        }
    }

    private FilteredQueryBuilder createFilterQuery(FilterBuilder... filterBuilderArr) {
        return QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(), FilterBuilders.andFilter(filterBuilderArr));
    }

    private FilterBuilder filter(String str, String str2) {
        return FilterBuilders.termFilter(str, str2);
    }

    private SearchResponse searchSentMessages(int i, SortOrder sortOrder, QueryBuilder queryBuilder) throws InterruptedException, ExecutionException {
        return (SearchResponse) this.elasticClient.prepareSearch(new String[]{SchemaManager.SENT_ALIAS_NAME}).addFields(new String[]{LogSchemaAware.MESSAGE_ID, LogSchemaAware.TIMESTAMP, LogSchemaAware.SUBSCRIPTION, LogSchemaAware.TOPIC_NAME, LogSchemaAware.STATUS, LogSchemaAware.REASON, LogSchemaAware.PARTITION, LogSchemaAware.OFFSET, LogSchemaAware.CLUSTER}).setTrackScores(true).setQuery(queryBuilder).addSort(LogSchemaAware.TIMESTAMP_SECONDS, sortOrder).setSize(i).execute().get();
    }

    private SearchResponse searchPublishedMessages(int i, QueryBuilder queryBuilder) throws InterruptedException, ExecutionException {
        return (SearchResponse) this.elasticClient.prepareSearch(new String[]{SchemaManager.PUBLISHED_ALIAS_NAME}).addFields(new String[]{LogSchemaAware.MESSAGE_ID, LogSchemaAware.TIMESTAMP, LogSchemaAware.TOPIC_NAME, LogSchemaAware.STATUS, LogSchemaAware.REASON, LogSchemaAware.CLUSTER}).setTrackScores(true).setQuery(queryBuilder).addSort(LogSchemaAware.TIMESTAMP_SECONDS, SortOrder.ASC).setSize(i).execute().get();
    }

    private PublishedMessageTrace toPublishedMessageTrace(SearchHit searchHit) {
        return new PublishedMessageTrace((String) searchHit.field(LogSchemaAware.MESSAGE_ID).getValue(), Long.valueOf(((Number) searchHit.field(LogSchemaAware.TIMESTAMP).getValue()).longValue()), (String) searchHit.field(LogSchemaAware.TOPIC_NAME).getValue(), PublishedMessageTraceStatus.valueOf((String) searchHit.field(LogSchemaAware.STATUS).getValue()), searchHit.getFields().containsKey(LogSchemaAware.REASON) ? (String) searchHit.field(LogSchemaAware.REASON).getValue() : null, (String) null, (String) searchHit.field(LogSchemaAware.CLUSTER).getValue());
    }

    private SentMessageTrace toSentMessageTrace(SearchHit searchHit) {
        return new SentMessageTrace((String) searchHit.field(LogSchemaAware.MESSAGE_ID).getValue(), searchHit.getFields().containsKey(LogSchemaAware.BATCH_ID) ? (String) searchHit.field(LogSchemaAware.BATCH_ID).getValue() : null, Long.valueOf(((Number) searchHit.field(LogSchemaAware.TIMESTAMP).getValue()).longValue()), (String) searchHit.field(LogSchemaAware.SUBSCRIPTION).getValue(), (String) searchHit.field(LogSchemaAware.TOPIC_NAME).getValue(), SentMessageTraceStatus.valueOf((String) searchHit.field(LogSchemaAware.STATUS).getValue()), searchHit.getFields().containsKey(LogSchemaAware.REASON) ? (String) searchHit.field(LogSchemaAware.REASON).getValue() : null, (String) null, (Integer) searchHit.field(LogSchemaAware.PARTITION).getValue(), Long.valueOf(((Number) searchHit.field(LogSchemaAware.OFFSET).getValue()).longValue()), (String) searchHit.field(LogSchemaAware.CLUSTER).getValue());
    }
}
