package io.confluent.connect.elasticsearch.jest;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import io.confluent.connect.elasticsearch.ElasticsearchClient;
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
import io.confluent.connect.elasticsearch.IndexableRecord;
import io.confluent.connect.elasticsearch.Key;
import io.confluent.connect.elasticsearch.Mapping;
import io.confluent.connect.elasticsearch.RetryUtil;
import io.confluent.connect.elasticsearch.bulk.BulkRequest;
import io.confluent.connect.elasticsearch.bulk.BulkResponse;
import io.confluent.connect.elasticsearch.jest.actions.PortableJestCreateIndexBuilder;
import io.confluent.connect.elasticsearch.jest.actions.PortableJestGetMappingBuilder;
import io.confluent.connect.elasticsearch.jest.actions.PortableJestPutMappingBuilder;
import io.searchbox.action.BulkableAction;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.JestResult;
import io.searchbox.client.config.HttpClientConfig;
import io.searchbox.cluster.NodesInfo;
import io.searchbox.core.Bulk;
import io.searchbox.core.BulkResult;
import io.searchbox.core.Delete;
import io.searchbox.core.DocumentResult;
import io.searchbox.core.Index;
import io.searchbox.core.Search;
import io.searchbox.core.SearchResult;
import io.searchbox.core.Update;
import io.searchbox.indices.CreateIndex;
import io.searchbox.indices.DeleteIndex;
import io.searchbox.indices.IndicesExists;
import io.searchbox.indices.Refresh;
import io.searchbox.indices.mapping.GetMapping;
import io.searchbox.indices.mapping.PutMapping;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import org.apache.http.HttpHost;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.class */
public class JestElasticsearchClient implements ElasticsearchClient {
    protected static final String MAPPER_PARSE_EXCEPTION = "mapper_parse_exception";
    protected static final String VERSION_CONFLICT_ENGINE_EXCEPTION = "version_conflict_engine_exception";
    protected static final String ALL_FIELD_PARAM = "_all";
    protected static final String RESOURCE_ALREADY_EXISTS_EXCEPTION = "resource_already_exists_exception";
    private final JestClient client;
    private final ElasticsearchClient.Version version;
    private long timeout;
    private WriteMethod writeMethod;
    private final Set<String> indexCache;
    private int maxRetries;
    private long retryBackoffMs;
    private final Time time;
    private int retryOnConflict;
    private static final Logger log = LoggerFactory.getLogger(JestElasticsearchClient.class);
    private static final Logger LOG = LoggerFactory.getLogger(JestElasticsearchClient.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    /* loaded from: input_file:io/confluent/connect/elasticsearch/jest/JestElasticsearchClient$WriteMethod.class */
    public enum WriteMethod {
        INSERT,
        UPSERT;

        public static final WriteMethod DEFAULT = INSERT;
        public static final ConfigDef.Validator VALIDATOR = new ConfigDef.Validator() { // from class: io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.WriteMethod.1
            private final ConfigDef.ValidString validator = ConfigDef.ValidString.in(WriteMethod.names());

            public void ensureValid(String str, Object obj) {
                this.validator.ensureValid(str, obj);
            }

            public String toString() {
                return "One of " + WriteMethod.INSERT.toString() + " or " + WriteMethod.UPSERT.toString();
            }
        };

        public static String[] names() {
            return new String[]{INSERT.toString(), UPSERT.toString()};
        }

        @Override // java.lang.Enum
        public String toString() {
            return name().toLowerCase(Locale.ROOT);
        }
    }

    public JestElasticsearchClient(JestClient jestClient) {
        this.writeMethod = WriteMethod.DEFAULT;
        this.indexCache = new HashSet();
        this.time = new SystemTime();
        try {
            this.client = jestClient;
            this.version = getServerVersion();
        } catch (IOException e) {
            throw new ConnectException("Couldn't start ElasticsearchSinkTask due to connection error:", e);
        }
    }

    public JestElasticsearchClient(String str) {
        this.writeMethod = WriteMethod.DEFAULT;
        this.indexCache = new HashSet();
        this.time = new SystemTime();
        try {
            JestClientFactory jestClientFactory = new JestClientFactory();
            jestClientFactory.setHttpClientConfig(new HttpClientConfig.Builder(str).multiThreaded(true).build());
            this.client = jestClientFactory.getObject();
            this.version = getServerVersion();
        } catch (ConfigException e) {
            throw new ConnectException("Couldn't start ElasticsearchSinkTask due to configuration error:", e);
        } catch (IOException e2) {
            throw new ConnectException("Couldn't start ElasticsearchSinkTask due to connection error:", e2);
        }
    }

    public JestElasticsearchClient(Map<String, String> map) {
        this(map, new JestClientFactory());
    }

    protected JestElasticsearchClient(Map<String, String> map, JestClientFactory jestClientFactory) {
        this.writeMethod = WriteMethod.DEFAULT;
        this.indexCache = new HashSet();
        this.time = new SystemTime();
        try {
            ElasticsearchSinkConnectorConfig elasticsearchSinkConnectorConfig = new ElasticsearchSinkConnectorConfig(map);
            jestClientFactory.setHttpClientConfig(getClientConfig(elasticsearchSinkConnectorConfig));
            this.client = jestClientFactory.getObject();
            this.version = getServerVersion();
            this.writeMethod = elasticsearchSinkConnectorConfig.writeMethod();
            this.retryBackoffMs = elasticsearchSinkConnectorConfig.retryBackoffMs();
            this.maxRetries = elasticsearchSinkConnectorConfig.maxRetries();
            this.timeout = elasticsearchSinkConnectorConfig.readTimeoutMs();
            this.retryOnConflict = elasticsearchSinkConnectorConfig.maxInFlightRequests();
        } catch (ConfigException e) {
            throw new ConnectException("Couldn't start ElasticsearchSinkTask due to configuration error:", e);
        } catch (IOException e2) {
            throw new ConnectException("Couldn't start ElasticsearchSinkTask due to connection error:", e2);
        }
    }

    public static HttpClientConfig getClientConfig(ElasticsearchSinkConnectorConfig elasticsearchSinkConnectorConfig) {
        Set<String> connectionUrls = elasticsearchSinkConnectorConfig.connectionUrls();
        HttpClientConfig.Builder multiThreaded = new HttpClientConfig.Builder(connectionUrls).connTimeout(elasticsearchSinkConnectorConfig.connectionTimeoutMs()).readTimeout(elasticsearchSinkConnectorConfig.readTimeoutMs()).requestCompressionEnabled(elasticsearchSinkConnectorConfig.compression()).defaultMaxTotalConnectionPerRoute(elasticsearchSinkConnectorConfig.maxInFlightRequests()).multiThreaded(true);
        if (elasticsearchSinkConnectorConfig.isAuthenticatedConnection()) {
            multiThreaded.defaultCredentials(elasticsearchSinkConnectorConfig.username(), elasticsearchSinkConnectorConfig.password().value()).preemptiveAuthTargetHosts((Set) connectionUrls.stream().map(str -> {
                return HttpHost.create(str);
            }).collect(Collectors.toSet()));
        }
        if (elasticsearchSinkConnectorConfig.secured()) {
            log.info("Using secured connection to {}", connectionUrls);
            configureSslContext(multiThreaded, elasticsearchSinkConnectorConfig);
        } else {
            log.info("Using unsecured connection to {}", connectionUrls);
        }
        return multiThreaded.build();
    }

    private static void configureSslContext(HttpClientConfig.Builder builder, ElasticsearchSinkConnectorConfig elasticsearchSinkConnectorConfig) {
        Object invoke;
        SSLContext sSLContext;
        SslFactory sslFactory = new SslFactory(Mode.CLIENT, (String) null, false);
        sslFactory.configure(elasticsearchSinkConnectorConfig.sslConfigs());
        try {
            sSLContext = (SSLContext) SslFactory.class.getDeclaredMethod("sslContext", new Class[0]).invoke(sslFactory, new Object[0]);
            log.debug("Using AK 2.2 SslFactory methods.");
        } catch (Exception e) {
            log.debug("Could not find AK 2.2 SslFactory methods. Trying AK 2.3+ methods for SslFactory.");
            try {
                invoke = SslFactory.class.getDeclaredMethod("sslEngineBuilder", new Class[0]).invoke(sslFactory, new Object[0]);
                log.debug("Using AK 2.2-2.5 SslFactory methods.");
            } catch (Exception e2) {
                log.debug("Could not find Ak 2.3-2.5 methods for SslFactory. Trying AK 2.6+ methods for SslFactory.");
                try {
                    invoke = SslFactory.class.getDeclaredMethod("sslEngineFactory", new Class[0]).invoke(sslFactory, new Object[0]);
                    log.debug("Using AK 2.6+ SslFactory methods.");
                } catch (Exception e3) {
                    throw new ConnectException("Failed to find methods for SslFactory.", e3);
                }
            }
            try {
                sSLContext = (SSLContext) invoke.getClass().getDeclaredMethod("sslContext", new Class[0]).invoke(invoke, new Object[0]);
            } catch (Exception e4) {
                throw new ConnectException("Could not create SSLContext.", e4);
            }
        }
        HostnameVerifier hostnameVerifier = elasticsearchSinkConnectorConfig.shouldDisableHostnameVerification() ? (str, sSLSession) -> {
            return true;
        } : SSLConnectionSocketFactory.getDefaultHostnameVerifier();
        builder.sslSocketFactory(new SSLConnectionSocketFactory(sSLContext, hostnameVerifier));
        builder.httpsIOSessionStrategy(new SSLIOSessionStrategy(sSLContext, hostnameVerifier));
    }

    protected void setWriteMethod(WriteMethod writeMethod) {
        this.writeMethod = writeMethod;
    }

    private ElasticsearchClient.Version getServerVersion() throws IOException {
        ElasticsearchClient.Version version;
        ElasticsearchClient.Version version2 = ElasticsearchClient.Version.ES_V6;
        JsonObject jsonObject = this.client.execute(((NodesInfo.Builder) new NodesInfo.Builder().addCleanApiParameter("version")).build()).getJsonObject();
        if (jsonObject == null) {
            LOG.warn("Couldn't get Elasticsearch version (result is null); assuming {}", version2);
            return version2;
        }
        if (!jsonObject.has("nodes")) {
            LOG.warn("Couldn't get Elasticsearch version from result {} (result has no nodes). Assuming {}.", jsonObject, version2);
            return version2;
        }
        checkForError(jsonObject);
        JsonObject asJsonObject = jsonObject.get("nodes").getAsJsonObject();
        if (asJsonObject == null || asJsonObject.entrySet().size() == 0) {
            LOG.warn("Couldn't get Elasticsearch version (response nodesRoot is null or empty); assuming {}", version2);
            return version2;
        }
        JsonObject asJsonObject2 = ((JsonElement) ((Map.Entry) asJsonObject.entrySet().iterator().next()).getValue()).getAsJsonObject();
        if (asJsonObject2 == null) {
            LOG.warn("Couldn't get Elasticsearch version (response nodeRoot is null); assuming {}", version2);
            return version2;
        }
        String asString = asJsonObject2.get("version").getAsString();
        if (asString == null) {
            version = version2;
            LOG.warn("Couldn't get Elasticsearch version (response version is null); assuming {}", version);
        } else if (asString.startsWith("1.")) {
            version = ElasticsearchClient.Version.ES_V1;
            log.info("Detected Elasticsearch version is {}", version);
        } else if (asString.startsWith("2.")) {
            version = ElasticsearchClient.Version.ES_V2;
            log.info("Detected Elasticsearch version is {}", version);
        } else if (asString.startsWith("5.")) {
            version = ElasticsearchClient.Version.ES_V5;
            log.info("Detected Elasticsearch version is {}", version);
        } else if (asString.startsWith("6.")) {
            version = ElasticsearchClient.Version.ES_V6;
            log.info("Detected Elasticsearch version is {}", version);
        } else if (asString.startsWith("7.")) {
            version = ElasticsearchClient.Version.ES_V7;
            log.info("Detected Elasticsearch version is {}", version);
        } else {
            version = version2;
            log.info("Detected unexpected Elasticsearch version {}, using {}", asString, version);
        }
        return version;
    }

    private void checkForError(JsonObject jsonObject) {
        if (jsonObject.has("error") && jsonObject.get("error").isJsonObject()) {
            JsonObject asJsonObject = jsonObject.get("error").getAsJsonObject();
            throw new ConnectException("Couldn't connect to Elasticsearch, error: " + (asJsonObject.has("type") ? asJsonObject.get("type").getAsString() : "") + ", reason: " + (asJsonObject.has("reason") ? asJsonObject.get("reason").getAsString() : ""));
        }
    }

    @Override // io.confluent.connect.elasticsearch.ElasticsearchClient
    public ElasticsearchClient.Version getVersion() {
        return this.version;
    }

    private boolean indexExists(String str) {
        if (this.indexCache.contains(str)) {
            return true;
        }
        IndicesExists build = new IndicesExists.Builder(str).build();
        try {
            log.info("Index '{}' not found in local cache; checking for existence", str);
            JestResult execute = this.client.execute(build);
            log.debug("Received response for checking existence of index '{}'", str);
            boolean isSucceeded = execute.isSucceeded();
            if (isSucceeded) {
                this.indexCache.add(str);
                log.info("Index '{}' exists in Elasticsearch; adding to local cache", str);
            } else {
                log.info("Index '{}' not found in Elasticsearch. Error message: {}", str, execute.getErrorMessage());
            }
            return isSucceeded;
        } catch (IOException e) {
            throw new ConnectException(e);
        }
    }

    @Override // io.confluent.connect.elasticsearch.ElasticsearchClient
    public void createIndices(Set<String> set) {
        log.trace("Attempting to discover or create indexes in Elasticsearch: {}", set);
        for (String str : set) {
            if (!indexExists(str)) {
                int i = this.maxRetries + 1;
                int i2 = 1;
                CreateIndex m17build = new PortableJestCreateIndexBuilder(str, this.version, this.timeout).m17build();
                boolean z = false;
                while (!z) {
                    try {
                        createIndex(str, m17build);
                        z = true;
                    } catch (ConnectException e) {
                        if (i2 >= i) {
                            throw e;
                        }
                        long computeRandomRetryWaitTimeInMillis = RetryUtil.computeRandomRetryWaitTimeInMillis(i2 - 1, this.retryBackoffMs);
                        log.warn("Failed to create index {} with attempt {}/{}, will attempt retry after {} ms. Failure reason: {}", new Object[]{str, Integer.valueOf(i2), Integer.valueOf(i), Long.valueOf(computeRandomRetryWaitTimeInMillis), e.getMessage()});
                        this.time.sleep(computeRandomRetryWaitTimeInMillis);
                        i2++;
                    }
                }
            }
        }
    }

    private void createIndex(String str, CreateIndex createIndex) throws ConnectException {
        try {
            log.info("Requesting Elasticsearch create index '{}'", str);
            JestResult execute = this.client.execute(createIndex);
            log.debug("Received response for request to create index '{}'", str);
            if (execute.isSucceeded()) {
                log.info("Index '{}' created in Elasticsearch; adding to local cache", str);
            } else {
                if (!(execute.getErrorMessage().contains(RESOURCE_ALREADY_EXISTS_EXCEPTION) || indexExists(str))) {
                    throw new ConnectException("Could not create index '" + str + "'" + (execute.getErrorMessage() != null ? ": " + execute.getErrorMessage() : ""));
                }
                log.info("Index '{}' exists in Elasticsearch; adding to local cache", str);
            }
            this.indexCache.add(str);
        } catch (IOException e) {
            throw new ConnectException(e);
        }
    }

    @Override // io.confluent.connect.elasticsearch.ElasticsearchClient
    public void createMapping(String str, String str2, Schema schema) throws IOException {
        ObjectNode objectNode = JsonNodeFactory.instance.objectNode();
        objectNode.set(str2, Mapping.inferMapping(this, schema));
        PutMapping m19build = new PortableJestPutMappingBuilder(str, str2, objectNode.toString(), this.version).m19build();
        log.info("Submitting put mapping (type={}) for index '{}' and schema {}", new Object[]{str2, str, schema});
        JestResult execute = this.client.execute(m19build);
        if (!execute.isSucceeded()) {
            throw new ConnectException("Cannot create mapping " + objectNode + " -- " + execute.getErrorMessage());
        }
        log.info("Completed put mapping (type={}) for index '{}' and schema {}", new Object[]{str2, str, schema});
    }

    @Override // io.confluent.connect.elasticsearch.ElasticsearchClient
    public JsonObject getMapping(String str, String str2) throws IOException {
        log.info("Get mapping (type={}) for index '{}'", str2, str);
        JsonObject asJsonObject = this.client.execute(((GetMapping.Builder) ((GetMapping.Builder) new PortableJestGetMappingBuilder(this.version).addIndex(str)).addType(str2)).build()).getJsonObject().getAsJsonObject(str);
        if (asJsonObject == null) {
            log.debug("Received null (root) mapping (type={}) for index '{}'", str2, str);
            return null;
        }
        JsonObject asJsonObject2 = asJsonObject.getAsJsonObject("mappings");
        if (asJsonObject2 == null) {
            log.debug("Received null mapping (type={}) for index '{}'", str2, str);
            return null;
        }
        log.debug("Received mapping (type={}) for index '{}'", str2, str);
        return asJsonObject2.getAsJsonObject(str2);
    }

    @Override // io.confluent.connect.elasticsearch.ElasticsearchClient
    public void deleteAll() throws IOException {
        log.info("Request deletion of all indexes");
        JestResult execute = this.client.execute(new DeleteIndex.Builder(ALL_FIELD_PARAM).build());
        if (execute.isSucceeded()) {
            log.info("Deletion of all indexes succeeded");
        } else {
            log.warn("Could not delete all indexes: {}", execute.getErrorMessage() != null ? ": " + execute.getErrorMessage() : "");
        }
    }

    @Override // io.confluent.connect.elasticsearch.ElasticsearchClient
    public void refresh() throws IOException {
        log.info("Request refresh");
        JestResult execute = this.client.execute(new Refresh.Builder().build());
        if (execute.isSucceeded()) {
            log.info("Refresh completed");
        } else {
            log.warn("Could not refresh: {}", execute.getErrorMessage() != null ? ": " + execute.getErrorMessage() : "");
        }
    }

    @Override // io.confluent.connect.elasticsearch.ElasticsearchClient
    public BulkRequest createBulkRequest(List<IndexableRecord> list) {
        Bulk.Builder builder = new Bulk.Builder();
        Iterator<IndexableRecord> it = list.iterator();
        while (it.hasNext()) {
            builder.addAction(toBulkableAction(it.next()));
        }
        return new JestBulkRequest(builder.build());
    }

    protected BulkableAction<DocumentResult> toBulkableAction(IndexableRecord indexableRecord) {
        return indexableRecord.payload == null ? toDeleteRequest(indexableRecord) : this.writeMethod == WriteMethod.INSERT ? toIndexRequest(indexableRecord) : toUpdateRequest(indexableRecord);
    }

    private Delete toDeleteRequest(IndexableRecord indexableRecord) {
        return ((Delete.Builder) ((Delete.Builder) new Delete.Builder(indexableRecord.key.id).index(indexableRecord.key.index)).type(indexableRecord.key.type)).build();
    }

    private Index toIndexRequest(IndexableRecord indexableRecord) {
        Index.Builder builder = (Index.Builder) ((Index.Builder) ((Index.Builder) new Index.Builder(indexableRecord.payload).index(indexableRecord.key.index)).type(indexableRecord.key.type)).id(indexableRecord.key.id);
        if (indexableRecord.version != null) {
            ((Index.Builder) builder.setParameter("version_type", "external")).setParameter("version", indexableRecord.version);
        }
        return builder.build();
    }

    private Update toUpdateRequest(IndexableRecord indexableRecord) {
        return ((Update.Builder) ((Update.Builder) ((Update.Builder) ((Update.Builder) new Update.Builder("{\"doc\":" + indexableRecord.payload + ", \"doc_as_upsert\":true}").index(indexableRecord.key.index)).type(indexableRecord.key.type)).id(indexableRecord.key.id)).setParameter("retry_on_conflict", Integer.valueOf(this.retryOnConflict))).build();
    }

    @Override // io.confluent.connect.elasticsearch.ElasticsearchClient
    public BulkResponse executeBulk(BulkRequest bulkRequest) throws IOException {
        BulkResult execute = this.client.execute(((JestBulkRequest) bulkRequest).getBulk());
        if (execute.isSucceeded()) {
            return BulkResponse.success();
        }
        log.debug("Bulk request failed; collecting error(s)");
        boolean z = true;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (BulkResult.BulkResultItem bulkResultItem : execute.getItems()) {
            if (bulkResultItem.error != null) {
                String asText = OBJECT_MAPPER.readTree(bulkResultItem.error).get("type").asText("");
                if (VERSION_CONFLICT_ENGINE_EXCEPTION.equals(asText)) {
                    arrayList.add(new Key(bulkResultItem.index, bulkResultItem.type, bulkResultItem.id));
                } else if (MAPPER_PARSE_EXCEPTION.equals(asText)) {
                    z = false;
                    arrayList2.add(bulkResultItem.error);
                } else {
                    arrayList2.add(bulkResultItem.error);
                }
            }
        }
        if (!arrayList.isEmpty()) {
            LOG.warn("Ignoring version conflicts for items: {}", arrayList);
            if (arrayList2.isEmpty()) {
                return BulkResponse.success();
            }
        }
        return BulkResponse.failure(z, arrayList2.isEmpty() ? execute.getErrorMessage() : arrayList2.toString());
    }

    @Override // io.confluent.connect.elasticsearch.ElasticsearchClient
    public JsonObject search(String str, String str2, String str3) throws IOException {
        Search.Builder builder = new Search.Builder(str);
        if (str2 != null) {
            builder.addIndex(str2);
        }
        if (str3 != null) {
            builder.addType(str3);
        }
        log.info("Executing search on index '{}' (type={}): {}", new Object[]{str2, str3, str});
        SearchResult execute = this.client.execute(builder.build());
        if (execute.isSucceeded()) {
            log.info("Executing search succeeded: {}", execute);
        } else {
            log.warn("Failed to execute search: {}", execute.getErrorMessage() != null ? ": " + execute.getErrorMessage() : "");
        }
        return execute.getJsonObject();
    }

    @Override // io.confluent.connect.elasticsearch.ElasticsearchClient, java.lang.AutoCloseable
    public void close() {
        try {
            log.debug("Closing Elasticsearch client");
            this.client.close();
        } catch (IOException e) {
            LOG.error("Exception while closing the JEST client", e);
        }
    }
}
