package io.confluent.connect.elasticsearch.integration;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.junit.WireMockRule;
import com.github.tomakehurst.wiremock.stubbing.Scenario;
import io.confluent.common.utils.IntegrationTest;
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnector;
import io.confluent.connect.elasticsearch.helper.ElasticSearchMockUtil;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.storage.StringConverter;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorNetworkIT.class */
public class ElasticsearchConnectorNetworkIT extends BaseConnectorIT {

    @Rule
    public WireMockRule wireMockRule = new WireMockRule(WireMockConfiguration.options().dynamicPort().extensions(new String[]{BlockingTransformer.class.getName()}), false);
    private static final int NUM_RECORDS = 5;
    private static final int TASKS_MAX = 1;
    private static final String CONNECTOR_NAME = "es-connector";
    private static final String TOPIC = "test";
    private Map<String, String> props;

    @Before
    public void setup() {
        startConnect();
        this.connect.kafka().createTopic(TOPIC);
        this.props = createProps();
        WireMock.stubFor(WireMock.any(WireMock.anyUrl()).atPriority(10).willReturn(ElasticSearchMockUtil.basicEmptyOk()));
    }

    @After
    public void cleanup() {
        stopConnect();
    }

    @Test
    public void testRetry() throws Exception {
        this.wireMockRule.stubFor(WireMock.post(WireMock.urlPathEqualTo("/_bulk")).inScenario("bulkRetry1").whenScenarioStateIs("Started").withRequestBody(WireMock.containing("{\"doc_num\":0}")).willReturn(ElasticSearchMockUtil.addMinimalHeaders(WireMock.aResponse().withStatus(500).withBody(ElasticSearchMockUtil.minimumResponseJson()))).willSetStateTo("Failed"));
        this.wireMockRule.stubFor(WireMock.post(WireMock.urlPathEqualTo("/_bulk")).inScenario("bulkRetry1").whenScenarioStateIs("Failed").withRequestBody(WireMock.containing("{\"doc_num\":0}")).willSetStateTo("Fixed").willReturn(ElasticSearchMockUtil.addMinimalHeaders(WireMock.okJson(errorBulkResponse()))));
        this.connect.configureConnector(CONNECTOR_NAME, this.props);
        waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX);
        writeRecords(4);
        Awaitility.await().untilAsserted(() -> {
            Assertions.assertThat(((Scenario) this.wireMockRule.getAllScenarios().getScenarios().get(0)).getState()).isEqualTo("Fixed");
        });
        Assertions.assertThat(((ConnectorStateInfo.TaskState) this.connect.connectorStatus(CONNECTOR_NAME).tasks().get(0)).state()).isEqualTo("RUNNING");
    }

    @Test
    public void testConcurrentRequests() throws Exception {
        this.wireMockRule.stubFor(WireMock.post(WireMock.urlPathEqualTo("/_bulk")).willReturn(WireMock.okJson(errorBulkResponse()).withTransformers(new String[]{BlockingTransformer.NAME})));
        this.props.put("connection.url", this.wireMockRule.url("/"));
        this.props.put("read.timeout.ms", "60000");
        this.props.put("max.retries", "0");
        this.props.put("linger.ms", "60000");
        this.props.put("batch.size", "1");
        this.props.put("max.in.flight.requests", "4");
        this.connect.configureConnector(CONNECTOR_NAME, this.props);
        waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX);
        writeRecords(10);
        BlockingTransformer blockingTransformer = BlockingTransformer.getInstance(this.wireMockRule);
        Awaitility.await().untilAsserted(() -> {
            Assertions.assertThat(blockingTransformer.queueLength()).isEqualTo(3);
        });
        blockingTransformer.release(10);
        Awaitility.await().untilAsserted(() -> {
            Assertions.assertThat(blockingTransformer.requestCount()).isEqualTo(10);
        });
    }

    @Test
    public void testReadTimeout() throws Exception {
        this.wireMockRule.stubFor(WireMock.post(WireMock.urlPathEqualTo("/_bulk")).willReturn(ElasticSearchMockUtil.addMinimalHeaders(WireMock.aResponse().withFixedDelay(2000))));
        this.connect.configureConnector(CONNECTOR_NAME, this.props);
        waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX);
        writeRecords(NUM_RECORDS);
        Awaitility.await().atMost(Duration.ofMinutes(1L)).untilAsserted(() -> {
            Assertions.assertThat(((ConnectorStateInfo.TaskState) this.connect.connectorStatus(CONNECTOR_NAME).tasks().get(0)).state()).isEqualTo("FAILED");
        });
        Assertions.assertThat(((ConnectorStateInfo.TaskState) this.connect.connectorStatus(CONNECTOR_NAME).tasks().get(0)).trace()).contains(new CharSequence[]{"Failed to execute bulk request due to 'java.net.SocketTimeoutException: 1,000 milliseconds timeout on connection"}).contains(new CharSequence[]{"after 3 attempt(s)"});
        WireMock.verify(3, WireMock.postRequestedFor(WireMock.urlPathEqualTo("/_bulk")));
    }

    @Test
    public void testTooManyRequests() throws Exception {
        this.wireMockRule.stubFor(WireMock.post(WireMock.urlPathEqualTo("/_bulk")).willReturn(WireMock.aResponse().withStatus(429).withHeader("Content-Type", new String[]{"application/json"}).withBody("{\n  \"error\": {\n    \"type\": \"circuit_breaking_exception\",\n    \"reason\": \"Data too large\",\n    \"bytes_wanted\": 123848638,\n    \"bytes_limit\": 123273216,\n    \"durability\": \"TRANSIENT\"\n  },\n  \"status\": 429\n}")));
        this.connect.configureConnector(CONNECTOR_NAME, this.props);
        waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX);
        writeRecords(NUM_RECORDS);
        Awaitility.await().atMost(Duration.ofMinutes(1L)).untilAsserted(() -> {
            Assertions.assertThat(((ConnectorStateInfo.TaskState) this.connect.connectorStatus(CONNECTOR_NAME).tasks().get(0)).state()).isEqualTo("FAILED");
        });
        Assertions.assertThat(((ConnectorStateInfo.TaskState) this.connect.connectorStatus(CONNECTOR_NAME).tasks().get(0)).trace()).contains(new CharSequence[]{"Failed to execute bulk request due to 'ElasticsearchStatusException[Elasticsearch exception [type=circuit_breaking_exception, reason=Data too large]]' after 3 attempt(s)"});
        WireMock.verify(3, WireMock.postRequestedFor(WireMock.urlPathEqualTo("/_bulk")));
    }

    @Test
    public void testServiceUnavailable() throws Exception {
        this.wireMockRule.stubFor(WireMock.post(WireMock.urlPathEqualTo("/_bulk")).willReturn(WireMock.aResponse().withStatus(503)));
        this.connect.configureConnector(CONNECTOR_NAME, this.props);
        waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX);
        writeRecords(NUM_RECORDS);
        Awaitility.await().atMost(Duration.ofMinutes(1L)).untilAsserted(() -> {
            Assertions.assertThat(((ConnectorStateInfo.TaskState) this.connect.connectorStatus(CONNECTOR_NAME).tasks().get(0)).state()).isEqualTo("FAILED");
        });
        Assertions.assertThat(((ConnectorStateInfo.TaskState) this.connect.connectorStatus(CONNECTOR_NAME).tasks().get(0)).trace()).contains(new CharSequence[]{"[HTTP/1.1 503 Service Unavailable]"}).contains(new CharSequence[]{"after 3 attempt(s)"});
        WireMock.verify(3, WireMock.postRequestedFor(WireMock.urlPathEqualTo("/_bulk")));
    }

    @Test
    public void testPausePartitions() throws Exception {
        this.wireMockRule.stubFor(WireMock.post(WireMock.urlPathEqualTo("/_bulk")).willReturn(WireMock.okJson(errorBulkResponse()).withTransformers(new String[]{BlockingTransformer.NAME})));
        this.props.put("connection.url", this.wireMockRule.url("/"));
        this.props.put("read.timeout.ms", "600000");
        this.props.put("linger.ms", "600000");
        this.props.put("flush.timeout.ms", "600000");
        this.props.put("max.retries", "0");
        this.props.put("batch.size", "1");
        this.props.put("max.buffered.records", "10");
        this.props.put("max.in.flight.requests", "4");
        this.connect.configureConnector(CONNECTOR_NAME, this.props);
        waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX);
        writeRecords(TASKS_MAX);
        BlockingTransformer blockingTransformer = BlockingTransformer.getInstance(this.wireMockRule);
        Awaitility.await().untilAsserted(() -> {
            Assertions.assertThat(blockingTransformer.queueLength()).isEqualTo(TASKS_MAX);
        });
        this.wireMockRule.stubFor(WireMock.post(WireMock.urlPathEqualTo("/_bulk")).willReturn(WireMock.okJson(errorBulkResponse())));
        writeRecords(1000);
        Awaitility.await().untilAsserted(() -> {
            this.wireMockRule.verify(WireMock.moreThanOrExactly(99), WireMock.postRequestedFor(WireMock.urlPathEqualTo("/_bulk")));
        });
        Awaitility.await().pollDelay(Duration.ofSeconds(2L)).untilAsserted(() -> {
            this.wireMockRule.verify(WireMock.lessThan(1001), WireMock.postRequestedFor(WireMock.urlPathEqualTo("/_bulk")));
        });
        blockingTransformer.release(TASKS_MAX);
        Awaitility.await().untilAsserted(() -> {
            this.wireMockRule.verify(WireMock.moreThanOrExactly(1001), WireMock.postRequestedFor(WireMock.urlPathEqualTo("/_bulk")));
        });
    }

    @Test
    public void testPausePartitionsAndFail() throws Exception {
        this.wireMockRule.stubFor(WireMock.post(WireMock.urlPathEqualTo("/_bulk")).willReturn(WireMock.aResponse().withStatus(500).withTransformers(new String[]{BlockingTransformer.NAME})));
        this.props.put("connection.url", this.wireMockRule.url("/"));
        this.props.put("read.timeout.ms", "600000");
        this.props.put("linger.ms", "600000");
        this.props.put("flush.timeout.ms", "600000");
        this.props.put("max.retries", "0");
        this.props.put("batch.size", "1");
        this.props.put("max.buffered.records", "10");
        this.props.put("max.in.flight.requests", "4");
        this.props.put("flush.synchronously", "false");
        this.connect.configureConnector(CONNECTOR_NAME, this.props);
        waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX);
        writeRecords(TASKS_MAX);
        BlockingTransformer blockingTransformer = BlockingTransformer.getInstance(this.wireMockRule);
        Awaitility.await().untilAsserted(() -> {
            Assertions.assertThat(blockingTransformer.queueLength()).isEqualTo(TASKS_MAX);
        });
        this.wireMockRule.stubFor(WireMock.post(WireMock.urlPathEqualTo("/_bulk")).willReturn(WireMock.okJson(errorBulkResponse())));
        writeRecords(1000);
        Awaitility.await().untilAsserted(() -> {
            this.wireMockRule.verify(WireMock.moreThanOrExactly(99), WireMock.postRequestedFor(WireMock.urlPathEqualTo("/_bulk")));
        });
        Awaitility.await().pollDelay(Duration.ofSeconds(2L)).untilAsserted(() -> {
            this.wireMockRule.verify(WireMock.lessThan(1001), WireMock.postRequestedFor(WireMock.urlPathEqualTo("/_bulk")));
        });
        blockingTransformer.release(TASKS_MAX);
        Awaitility.await().untilAsserted(() -> {
            Assertions.assertThat(((ConnectorStateInfo.TaskState) this.connect.connectorStatus(CONNECTOR_NAME).tasks().get(0)).state()).isEqualTo("FAILED");
        });
        Assertions.assertThat(((ConnectorStateInfo.TaskState) this.connect.connectorStatus(CONNECTOR_NAME).tasks().get(0)).trace()).contains(new CharSequence[]{"status line [HTTP/1.1 500 Server Error]"});
    }

    protected Map<String, String> createProps() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", ElasticsearchSinkConnector.class.getName());
        hashMap.put("topics", TOPIC);
        hashMap.put("tasks.max", Integer.toString(TASKS_MAX));
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", JsonConverter.class.getName());
        hashMap.put("value.converter.schemas.enable", "false");
        hashMap.put("connection.url", this.wireMockRule.url("/"));
        hashMap.put("key.ignore", "true");
        hashMap.put("schema.ignore", "true");
        hashMap.put("read.timeout.ms", "1000");
        hashMap.put("max.retries", "2");
        hashMap.put("retry.backoff.ms", "10");
        hashMap.put("linger.ms", "60000");
        hashMap.put("batch.size", "4");
        hashMap.put("max.in.flight.requests", "1");
        return hashMap;
    }

    protected void writeRecords(int i) {
        for (int i2 = 0; i2 < i; i2 += TASKS_MAX) {
            this.connect.kafka().produce(TOPIC, String.valueOf(i2), String.format("{\"doc_num\":%d}", Integer.valueOf(i2)));
        }
    }

    public static String errorBulkResponse() throws JsonProcessingException {
        return errorBulkResponse(TASKS_MAX);
    }

    public static String errorBulkResponse(int i) throws JsonProcessingException {
        ObjectNode createObjectNode = ElasticSearchMockUtil.MAPPER.createObjectNode();
        ArrayNode putArray = createObjectNode.put("errors", false).putArray("items");
        for (int i2 = 0; i2 < i; i2 += TASKS_MAX) {
            putArray.addObject().putObject("index").put("_index", TOPIC).put("_type", "_doc").put("_id", Integer.toString(i2 + TASKS_MAX)).put("_version", "1").put("result", "created").put("status", 201).put("_seq_no", 0);
        }
        return ElasticSearchMockUtil.MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(createObjectNode);
    }

    public static String errorBulkResponse(int i, String str, int... iArr) throws JsonProcessingException {
        ObjectNode createObjectNode = ElasticSearchMockUtil.MAPPER.createObjectNode();
        ArrayNode putArray = createObjectNode.put("errors", true).putArray("items");
        Set set = (Set) IntStream.of(iArr).boxed().collect(Collectors.toSet());
        for (int i2 = 0; i2 < i; i2 += TASKS_MAX) {
            ObjectNode put = putArray.addObject().putObject("index").put("_index", TOPIC).put("_type", "_doc").put("_id", Integer.toString(i2 + TASKS_MAX)).put("_version", "1").put("_seq_no", 0);
            if (set.contains(Integer.valueOf(i2))) {
                put.put("status", 400).putObject("error").put("type", str).put("reason", "Reason for " + str);
            } else {
                put.put("result", "created").put("status", 201);
            }
        }
        return ElasticSearchMockUtil.MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(createObjectNode);
    }
}
