package io.confluent.connect.elasticsearch;

import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.http.HttpHost;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RestHighLevelClientBuilder;
import org.elasticsearch.client.indices.CreateDataStreamRequest;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.GetMappingsRequest;
import org.elasticsearch.client.indices.GetMappingsResponse;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.VersionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/elasticsearch/ElasticsearchClient.class */
public class ElasticsearchClient {
    private static final long WAIT_TIME_MS = 10;
    private static final long CLOSE_WAIT_TIME_MS = 5000;
    private static final String RESOURCE_ALREADY_EXISTS_EXCEPTION = "resource_already_exists_exception";
    private static final String VERSION_CONFLICT_EXCEPTION = "version_conflict_engine_exception";
    private static final String UNKNOWN_VERSION_TAG = "Unknown";
    protected final BulkProcessor bulkProcessor;
    private final ConcurrentMap<Long, List<SinkRecordAndOffset>> inFlightRequests;
    private final ElasticsearchSinkConnectorConfig config;
    private final ErrantRecordReporter reporter;
    private final RestHighLevelClient client;
    private final ExecutorService bulkExecutorService;
    private final Time clock;
    private final String esVersion;
    private static final Logger log = LoggerFactory.getLogger(ElasticsearchClient.class);
    private static final Set<String> MALFORMED_DOC_ERRORS = new HashSet(Arrays.asList("strict_dynamic_mapping_exception", "mapper_parsing_exception", "illegal_argument_exception", "action_request_validation_exception"));
    private final Lock inFlightRequestLock = new ReentrantLock();
    private final Condition inFlightRequestsUpdated = this.inFlightRequestLock.newCondition();
    protected final AtomicInteger numBufferedRecords = new AtomicInteger(0);
    private final AtomicReference<ConnectException> error = new AtomicReference<>();
    private final ConcurrentMap<DocWriteRequest<?>, SinkRecordAndOffset> requestToSinkRecord = new ConcurrentHashMap();

    /* loaded from: input_file:io/confluent/connect/elasticsearch/ElasticsearchClient$ReportingException.class */
    public static class ReportingException extends RuntimeException {
        public ReportingException(String str) {
            super(str);
        }

        @Override // java.lang.Throwable
        public synchronized Throwable fillInStackTrace() {
            return this;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/connect/elasticsearch/ElasticsearchClient$SinkRecordAndOffset.class */
    public static class SinkRecordAndOffset {
        private final SinkRecord sinkRecord;
        private final OffsetState offsetState;

        public SinkRecordAndOffset(SinkRecord sinkRecord, OffsetState offsetState) {
            this.sinkRecord = sinkRecord;
            this.offsetState = offsetState;
        }
    }

    public ElasticsearchClient(ElasticsearchSinkConnectorConfig elasticsearchSinkConnectorConfig, ErrantRecordReporter errantRecordReporter, Runnable runnable) {
        this.bulkExecutorService = Executors.newFixedThreadPool(elasticsearchSinkConnectorConfig.maxInFlightRequests());
        this.inFlightRequests = errantRecordReporter != null ? new ConcurrentHashMap() : null;
        this.config = elasticsearchSinkConnectorConfig;
        this.reporter = errantRecordReporter;
        this.clock = Time.SYSTEM;
        RestClient build = RestClient.builder((HttpHost[]) ((List) elasticsearchSinkConnectorConfig.connectionUrls().stream().map(HttpHost::create).collect(Collectors.toList())).toArray(new HttpHost[elasticsearchSinkConnectorConfig.connectionUrls().size()])).setHttpClientConfigCallback(new ConfigCallbackHandler(elasticsearchSinkConnectorConfig)).build();
        this.esVersion = getServerVersion(build);
        RestHighLevelClientBuilder restHighLevelClientBuilder = new RestHighLevelClientBuilder(build);
        if (shouldSetCompatibilityToES8()) {
            log.info("Staring client in ES 8 compatibility mode");
            restHighLevelClientBuilder.setApiCompatibilityMode(true);
        }
        this.client = restHighLevelClientBuilder.build();
        this.bulkProcessor = BulkProcessor.builder(buildConsumer(), buildListener(runnable)).setBulkActions(elasticsearchSinkConnectorConfig.batchSize()).setBulkSize(elasticsearchSinkConnectorConfig.bulkSize()).setConcurrentRequests(elasticsearchSinkConnectorConfig.maxInFlightRequests() - 1).setFlushInterval(TimeValue.timeValueMillis(elasticsearchSinkConnectorConfig.lingerMs())).setBackoffPolicy(BackoffPolicy.noBackoff()).build();
    }

    private boolean shouldSetCompatibilityToES8() {
        return !version().equals(UNKNOWN_VERSION_TAG) && Integer.parseInt(version().split("\\.")[0]) >= 8;
    }

    private String getServerVersion(RestClient restClient) {
        RestHighLevelClient build = new RestHighLevelClientBuilder(restClient).build();
        String str = UNKNOWN_VERSION_TAG;
        try {
            str = build.info(RequestOptions.DEFAULT).getVersion().getNumber();
        } catch (Exception e) {
            log.warn("Failed to get ES server version", e);
        }
        return str;
    }

    private BiConsumer<BulkRequest, ActionListener<BulkResponse>> buildConsumer() {
        return (bulkRequest, actionListener) -> {
            this.bulkExecutorService.submit(() -> {
                try {
                    actionListener.onResponse((BulkResponse) callWithRetries("execute bulk request", () -> {
                        return this.client.bulk(bulkRequest, RequestOptions.DEFAULT);
                    }));
                } catch (Exception e) {
                    actionListener.onFailure(e);
                } catch (Throwable th) {
                    actionListener.onFailure(new ConnectException("Bulk request failed", th));
                }
            });
        };
    }

    public RestHighLevelClient client() {
        return this.client;
    }

    public void close() {
        try {
            try {
                if (this.bulkProcessor.awaitClose(this.config.flushTimeoutMs(), TimeUnit.MILLISECONDS)) {
                } else {
                    throw new ConnectException("Failed to process outstanding requests in time while closing the ElasticsearchClient.");
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new ConnectException("Interrupted while processing all in-flight requests on ElasticsearchClient close.", e);
            }
        } finally {
            closeResources();
        }
    }

    public boolean createIndexOrDataStream(String str) {
        if (indexExists(str)) {
            return false;
        }
        return this.config.isDataStream() ? createDataStream(str) : createIndex(str);
    }

    public void createMapping(String str, Schema schema) {
        PutMappingRequest source = new PutMappingRequest(new String[]{str}).source(Mapping.buildMapping(schema));
        callWithRetries(String.format("create mapping for index %s with schema %s", str, schema), () -> {
            return this.client.indices().putMapping(source, RequestOptions.DEFAULT);
        });
    }

    public String version() {
        return this.esVersion;
    }

    public void flush() {
        this.bulkProcessor.flush();
    }

    public void waitForInFlightRequests() {
        this.inFlightRequestLock.lock();
        while (this.numBufferedRecords.get() > 0) {
            try {
                try {
                    this.inFlightRequestsUpdated.await();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new ConnectException(e);
                }
            } finally {
                this.inFlightRequestLock.unlock();
            }
        }
    }

    public boolean hasMapping(String str) {
        MappingMetadata mapping = mapping(str);
        return (mapping == null || mapping.sourceAsMap() == null || mapping.sourceAsMap().isEmpty()) ? false : true;
    }

    public void index(SinkRecord sinkRecord, DocWriteRequest<?> docWriteRequest, OffsetState offsetState) {
        throwIfFailed();
        verifyNumBufferedRecords();
        this.requestToSinkRecord.put(docWriteRequest, new SinkRecordAndOffset(sinkRecord, offsetState));
        this.numBufferedRecords.incrementAndGet();
        this.bulkProcessor.add(docWriteRequest);
    }

    public void throwIfFailed() {
        if (isFailed()) {
            try {
                close();
            } catch (ConnectException e) {
                log.warn("Couldn't close elasticsearch client", e);
            }
            throw this.error.get();
        }
    }

    private void verifyNumBufferedRecords() {
        long milliseconds = this.clock.milliseconds() + this.config.flushTimeoutMs();
        while (this.numBufferedRecords.get() >= this.config.maxBufferedRecords()) {
            this.clock.sleep(WAIT_TIME_MS);
            if (this.clock.milliseconds() > milliseconds) {
                throw new ConnectException(String.format("Could not make space in the internal buffer fast enough. Consider increasing %s or %s.", ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG, ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG));
            }
        }
    }

    public boolean indexExists(String str) {
        GetIndexRequest getIndexRequest = new GetIndexRequest(new String[]{str});
        return ((Boolean) callWithRetries("check if index " + str + " exists", () -> {
            return Boolean.valueOf(this.client.indices().exists(getIndexRequest, RequestOptions.DEFAULT));
        })).booleanValue();
    }

    private BulkProcessor.Listener buildListener(final Runnable runnable) {
        return new BulkProcessor.Listener() { // from class: io.confluent.connect.elasticsearch.ElasticsearchClient.1
            public void beforeBulk(long j, BulkRequest bulkRequest) {
                if (ElasticsearchClient.this.inFlightRequests != null) {
                    Stream stream = bulkRequest.requests().stream();
                    ConcurrentMap concurrentMap = ElasticsearchClient.this.requestToSinkRecord;
                    concurrentMap.getClass();
                    ElasticsearchClient.this.inFlightRequests.put(Long.valueOf(j), (List) stream.map((v1) -> {
                        return r1.get(v1);
                    }).collect(Collectors.toList()));
                }
            }

            public void afterBulk(long j, BulkRequest bulkRequest, BulkResponse bulkResponse) {
                List requests = bulkRequest.requests();
                int i = 0;
                Iterator it = bulkResponse.iterator();
                while (it.hasNext()) {
                    BulkItemResponse bulkItemResponse = (BulkItemResponse) it.next();
                    DocWriteRequest<?> docWriteRequest = i < requests.size() ? (DocWriteRequest) requests.get(i) : null;
                    if (!ElasticsearchClient.this.handleResponse(bulkItemResponse, docWriteRequest, j) && docWriteRequest != null) {
                        ((SinkRecordAndOffset) ElasticsearchClient.this.requestToSinkRecord.get(docWriteRequest)).offsetState.markProcessed();
                    }
                    i++;
                }
                runnable.run();
                bulkFinished(j, bulkRequest);
            }

            public void afterBulk(long j, BulkRequest bulkRequest, Throwable th) {
                ElasticsearchClient.log.warn("Bulk request {} failed", Long.valueOf(j), th);
                ElasticsearchClient.this.error.compareAndSet(null, new ConnectException("Bulk request failed", th));
                bulkFinished(j, bulkRequest);
            }

            private void bulkFinished(long j, BulkRequest bulkRequest) {
                List requests = bulkRequest.requests();
                ConcurrentMap concurrentMap = ElasticsearchClient.this.requestToSinkRecord;
                concurrentMap.getClass();
                requests.forEach((v1) -> {
                    r1.remove(v1);
                });
                ElasticsearchClient.this.removeFromInFlightRequests(j);
                ElasticsearchClient.this.inFlightRequestLock.lock();
                try {
                    ElasticsearchClient.this.numBufferedRecords.addAndGet(-bulkRequest.requests().size());
                    ElasticsearchClient.this.inFlightRequestsUpdated.signalAll();
                    ElasticsearchClient.this.inFlightRequestLock.unlock();
                } catch (Throwable th) {
                    ElasticsearchClient.this.inFlightRequestLock.unlock();
                    throw th;
                }
            }
        };
    }

    private <T> T callWithRetries(String str, Callable<T> callable) {
        return (T) RetryUtil.callWithRetries(str, callable, this.config.maxRetries() + 1, this.config.retryBackoffMs());
    }

    private void closeResources() {
        this.bulkExecutorService.shutdown();
        try {
            if (!this.bulkExecutorService.awaitTermination(CLOSE_WAIT_TIME_MS, TimeUnit.MILLISECONDS)) {
                this.bulkExecutorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            this.bulkExecutorService.shutdownNow();
            Thread.currentThread().interrupt();
            log.warn("Interrupted while awaiting for executor service shutdown.", e);
        }
        try {
            this.client.close();
        } catch (IOException e2) {
            log.warn("Failed to close Elasticsearch client.", e2);
        }
    }

    private boolean createDataStream(String str) {
        CreateDataStreamRequest createDataStreamRequest = new CreateDataStreamRequest(str);
        return ((Boolean) callWithRetries("create data stream " + str, () -> {
            try {
                this.client.indices().createDataStream(createDataStreamRequest, RequestOptions.DEFAULT);
                return true;
            } catch (ElasticsearchStatusException | IOException e) {
                if (e.getMessage().contains(RESOURCE_ALREADY_EXISTS_EXCEPTION)) {
                    return false;
                }
                throw e;
            }
        })).booleanValue();
    }

    private boolean createIndex(String str) {
        CreateIndexRequest createIndexRequest = new CreateIndexRequest(str);
        return ((Boolean) callWithRetries("create index " + str, () -> {
            try {
                this.client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
                return true;
            } catch (ElasticsearchStatusException | IOException e) {
                if (e.getMessage().contains(RESOURCE_ALREADY_EXISTS_EXCEPTION)) {
                    return false;
                }
                throw e;
            }
        })).booleanValue();
    }

    protected boolean handleResponse(BulkItemResponse bulkItemResponse, DocWriteRequest<?> docWriteRequest, long j) {
        if (!bulkItemResponse.isFailed()) {
            return false;
        }
        Iterator<String> it = MALFORMED_DOC_ERRORS.iterator();
        while (it.hasNext()) {
            if (bulkItemResponse.getFailureMessage().contains(it.next())) {
                boolean handleMalformedDocResponse = handleMalformedDocResponse(bulkItemResponse);
                if (!handleMalformedDocResponse) {
                    reportBadRecord(bulkItemResponse, j);
                }
                return handleMalformedDocResponse;
            }
        }
        if (!bulkItemResponse.getFailureMessage().contains(VERSION_CONFLICT_EXCEPTION)) {
            this.error.compareAndSet(null, new ConnectException("Indexing record failed.", bulkItemResponse.getFailure().getCause()));
            return true;
        }
        if (docWriteRequest != null && docWriteRequest.versionType() == VersionType.EXTERNAL) {
            log.debug("Ignoring EXTERNAL version conflict for operation {} on document '{}' version {} in index '{}'.", new Object[]{bulkItemResponse.getOpType(), bulkItemResponse.getId(), Long.valueOf(docWriteRequest.version()), bulkItemResponse.getIndex()});
            return false;
        }
        Logger logger = log;
        Object[] objArr = new Object[5];
        objArr[0] = docWriteRequest != null ? docWriteRequest.versionType() : "UNKNOWN";
        objArr[1] = bulkItemResponse.getOpType();
        objArr[2] = bulkItemResponse.getId();
        objArr[3] = Long.valueOf(bulkItemResponse.getVersion());
        objArr[4] = bulkItemResponse.getIndex();
        logger.warn("{} version conflict for operation {} on document '{}' version {} in index '{}'.", objArr);
        Logger logger2 = log;
        Object[] objArr2 = new Object[7];
        objArr2[0] = docWriteRequest != null ? docWriteRequest.versionType() : "UNKNOWN";
        objArr2[1] = bulkItemResponse.getOpType();
        objArr2[2] = bulkItemResponse.getId();
        objArr2[3] = Long.valueOf(bulkItemResponse.getVersion());
        objArr2[4] = bulkItemResponse.getIndex();
        objArr2[5] = bulkItemResponse.getFailure().getCause();
        objArr2[6] = bulkItemResponse.getFailure().getCause().getStackTrace();
        logger2.trace("{} version conflict for operation {} on document '{}' version {} in index '{}' and stacktrace '{}'", objArr2);
        reportBadRecord(bulkItemResponse, j);
        return false;
    }

    private boolean handleMalformedDocResponse(BulkItemResponse bulkItemResponse) {
        String format = String.format("Encountered an illegal document error '%s'. Ignoring and will not index record.", bulkItemResponse.getFailureMessage());
        switch (this.config.behaviorOnMalformedDoc()) {
            case IGNORE:
                log.debug(format);
                return false;
            case WARN:
                log.warn(format);
                return false;
            case FAIL:
            default:
                log.error("Encountered an illegal document error '{}'. To ignore future records like this, change the configuration '{}' to '{}'.", new Object[]{bulkItemResponse.getFailureMessage(), ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG, ElasticsearchSinkConnectorConfig.BehaviorOnMalformedDoc.IGNORE});
                this.error.compareAndSet(null, new ConnectException("Indexing record failed.", bulkItemResponse.getFailure().getCause()));
                return true;
        }
    }

    public boolean isFailed() {
        return this.error.get() != null;
    }

    private MappingMetadata mapping(String str) {
        GetMappingsRequest indices = new GetMappingsRequest().indices(new String[]{str});
        return (MappingMetadata) ((GetMappingsResponse) callWithRetries("get mapping for index " + str, () -> {
            return this.client.indices().getMapping(indices, RequestOptions.DEFAULT);
        })).mappings().get(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeFromInFlightRequests(long j) {
        if (this.inFlightRequests != null) {
            this.inFlightRequests.remove(Long.valueOf(j));
        }
    }

    private synchronized void reportBadRecord(BulkItemResponse bulkItemResponse, long j) {
        if (bulkItemResponse.getFailureMessage().contains(VERSION_CONFLICT_EXCEPTION) && this.config.isDataStream()) {
            log.info("Skipping DLQ insertion for DataStream type.");
            return;
        }
        if (this.reporter != null) {
            List<SinkRecordAndOffset> orDefault = this.inFlightRequests.getOrDefault(Long.valueOf(j), new ArrayList());
            SinkRecordAndOffset sinkRecordAndOffset = orDefault.size() > bulkItemResponse.getItemId() ? orDefault.get(bulkItemResponse.getItemId()) : null;
            if (sinkRecordAndOffset != null) {
                this.reporter.report(sinkRecordAndOffset.sinkRecord, new ReportingException("Indexing failed: " + bulkItemResponse.getFailureMessage()));
            }
        }
    }
}
