package pl.allegro.tech.hermes.tracker.elasticsearch.management;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import pl.allegro.tech.hermes.api.MessageTrace;
import pl.allegro.tech.hermes.api.PublishedMessageTrace;
import pl.allegro.tech.hermes.api.SentMessageTrace;
import pl.allegro.tech.hermes.api.SentMessageTraceStatus;
import pl.allegro.tech.hermes.tracker.elasticsearch.ElasticsearchRepositoryException;
import pl.allegro.tech.hermes.tracker.elasticsearch.LogSchemaAware;
import pl.allegro.tech.hermes.tracker.elasticsearch.SchemaManager;
import pl.allegro.tech.hermes.tracker.management.LogRepository;

/* loaded from: input_file:pl/allegro/tech/hermes/tracker/elasticsearch/management/ElasticsearchLogRepository.class */
public class ElasticsearchLogRepository implements LogRepository, LogSchemaAware {
    private static final int LIMIT = 1000;
    private final Client elasticClient;
    private final ObjectMapper objectMapper;

    public ElasticsearchLogRepository(Client client) {
        this(client, SchemaManager.schemaManagerWithDailyIndexes(client));
    }

    public ElasticsearchLogRepository(Client client, SchemaManager schemaManager) {
        this.objectMapper = new ObjectMapper();
        this.elasticClient = client;
        schemaManager.ensureSchema();
    }

    public List<SentMessageTrace> getLastUndeliveredMessages(String str, String str2, int i) {
        return (List) Arrays.stream(searchSentMessages(i, SortOrder.DESC, QueryBuilders.boolQuery().must(QueryBuilders.termQuery(LogSchemaAware.TOPIC_NAME, str)).must(QueryBuilders.termQuery(LogSchemaAware.SUBSCRIPTION, str2)).must(QueryBuilders.termQuery(LogSchemaAware.STATUS, SentMessageTraceStatus.DISCARDED.name()))).getHits().getHits()).map(searchHit -> {
            return (SentMessageTrace) toMessageTrace(searchHit, SentMessageTrace.class);
        }).collect(Collectors.toList());
    }

    public List<MessageTrace> getMessageStatus(String str, String str2, String str3) {
        try {
            return (List) Stream.concat(Arrays.stream(searchPublishedMessages(LIMIT, QueryBuilders.boolQuery().must(QueryBuilders.termQuery(LogSchemaAware.TOPIC_NAME, str)).must(QueryBuilders.termQuery(LogSchemaAware.MESSAGE_ID, str3))).getHits().getHits()).map(searchHit -> {
                return (PublishedMessageTrace) toMessageTrace(searchHit, PublishedMessageTrace.class);
            }), Arrays.stream(searchSentMessages(LIMIT, SortOrder.ASC, QueryBuilders.boolQuery().must(QueryBuilders.termQuery(LogSchemaAware.TOPIC_NAME, str)).must(QueryBuilders.termQuery(LogSchemaAware.SUBSCRIPTION, str2)).must(QueryBuilders.termQuery(LogSchemaAware.MESSAGE_ID, str3))).getHits().getHits()).map(searchHit2 -> {
                return (SentMessageTrace) toMessageTrace(searchHit2, SentMessageTrace.class);
            })).collect(Collectors.toList());
        } catch (InterruptedException | ExecutionException e) {
            throw new ElasticsearchRepositoryException(e);
        }
    }

    private SearchResponse searchSentMessages(int i, SortOrder sortOrder, QueryBuilder queryBuilder) {
        return (SearchResponse) this.elasticClient.prepareSearch(new String[]{SchemaManager.SENT_ALIAS_NAME}).setTypes(new String[]{SchemaManager.SENT_TYPE}).setTrackScores(true).setQuery(queryBuilder).addSort(LogSchemaAware.TIMESTAMP_SECONDS, sortOrder).setSize(i).execute().actionGet();
    }

    private SearchResponse searchPublishedMessages(int i, QueryBuilder queryBuilder) throws InterruptedException, ExecutionException {
        return (SearchResponse) this.elasticClient.prepareSearch(new String[]{SchemaManager.PUBLISHED_ALIAS_NAME}).setTypes(new String[]{SchemaManager.PUBLISHED_TYPE}).setTrackScores(true).setQuery(queryBuilder).addSort(LogSchemaAware.TIMESTAMP_SECONDS, SortOrder.ASC).setSize(i).execute().get();
    }

    private <T> T toMessageTrace(SearchHit searchHit, Class<T> cls) {
        try {
            return (T) this.objectMapper.readValue(searchHit.getSourceRef().streamInput(), cls);
        } catch (IOException e) {
            throw new RuntimeException("Exception during deserialization of message trace class named " + cls.getCanonicalName(), e);
        }
    }
}
