package io.confluent.connect.elasticsearch.integration;

import io.confluent.common.utils.IntegrationTest;
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
import io.confluent.connect.elasticsearch.helper.ElasticsearchContainer;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.test.TestUtils;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.elasticsearch.search.SearchHit;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorIT.class */
public class ElasticsearchConnectorIT extends ElasticsearchConnectorBaseIT {
    @BeforeClass
    public static void setupBeforeAll() {
        container = ElasticsearchContainer.fromSystemProperties().withBasicAuth(getUsers(), getRoles());
        container.start();
    }

    @Override // io.confluent.connect.elasticsearch.integration.ElasticsearchConnectorBaseIT
    public void setup() {
        if (!container.isRunning()) {
            setupBeforeAll();
        }
        super.setup();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.connect.elasticsearch.integration.ElasticsearchConnectorBaseIT
    public Map<String, String> createProps() {
        this.props = super.createProps();
        this.props.put("connection.username", ElasticsearchConnectorBaseIT.ELASTIC_MINIMAL_PRIVILEGES_NAME);
        this.props.put("connection.password", ElasticsearchConnectorBaseIT.ELASTIC_MINIMAL_PRIVILEGES_PASSWORD);
        return this.props;
    }

    @Test
    public void testStrictMappings() throws Exception {
        this.helperClient.createIndex("test", "{ \"dynamic\" : \"strict\",  \"properties\": { \"longProp\": { \"type\": \"long\" } } } }");
        this.props.put("batch.size", "1");
        this.props.put("max.retries", "1");
        this.props.put("retry.backoff.ms", "10");
        this.props.put("max.in.flight.requests", "2");
        this.connect.configureConnector("es-connector", this.props);
        waitForConnectorToStart("es-connector", 1);
        this.connect.kafka().produce("test", "key1", "{\"longProp\":1}");
        this.connect.kafka().produce("test", "key2", "{\"any-prop\":1}");
        this.connect.kafka().produce("test", "key3", "{\"any-prop\":1}");
        this.connect.kafka().produce("test", "key4", "{\"any-prop\":1}");
        Awaitility.await().atMost(Duration.ofMinutes(1L)).untilAsserted(() -> {
            Assertions.assertThat(((ConnectorStateInfo.TaskState) this.connect.connectorStatus("es-connector").tasks().get(0)).state()).isEqualTo("FAILED");
        });
        Assertions.assertThat(((ConnectorStateInfo.TaskState) this.connect.connectorStatus("es-connector").tasks().get(0)).trace()).contains(new CharSequence[]{"ElasticsearchException[Elasticsearch exception [type=strict_dynamic_mapping_exception, reason=mapping set to strict, dynamic introduction of"});
        Assertions.assertThat(getConnectorOffset("es-connector", "test", 0)).isLessThanOrEqualTo(1L);
    }

    private long getConnectorOffset(String str, String str2, int i) throws Exception {
        OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata) ((Map) this.connect.kafka().createAdminClient().listConsumerGroupOffsets("connect-" + str).partitionsToOffsetAndMetadata().get()).get(new TopicPartition(str2, i));
        if (offsetAndMetadata == null) {
            return 0L;
        }
        return offsetAndMetadata.offset();
    }

    @Test
    public void testBatchByByteSize() throws Exception {
        this.props.put("bulk.size.bytes", Integer.toString(60 * 2));
        this.props.put("linger.ms", "180000");
        this.connect.configureConnector("es-connector", this.props);
        waitForConnectorToStart("es-connector", 1);
        writeRecords(3);
        verifySearchResults(2);
        writeRecords(1);
        verifySearchResults(4);
    }

    @Test
    public void testStopESContainer() throws Exception {
        this.props.put("max.retries", "2");
        this.props.put("retry.backoff.ms", "10");
        this.props.put("batch.size", "1");
        this.props.put("max.in.flight.requests", Integer.toString(4));
        runSimpleTest(this.props);
        container.stop();
        writeRecords(5);
        Awaitility.await().atMost(Duration.ofMinutes(1L)).untilAsserted(() -> {
            Assertions.assertThat(((ConnectorStateInfo.TaskState) this.connect.connectorStatus("es-connector").tasks().get(0)).state()).isEqualTo("FAILED");
        });
        Assertions.assertThat(((ConnectorStateInfo.TaskState) this.connect.connectorStatus("es-connector").tasks().get(0)).trace()).contains(new CharSequence[]{"'java.net.ConnectException: Connection refused' after 3 attempt(s)"});
    }

    @Test
    public void testChangeConfigsAndRestart() throws Exception {
        runSimpleTest(this.props);
        this.props.put("batch.size", "10");
        this.props.put("linger.ms", "1000");
        this.connect.configureConnector("es-connector", this.props);
        writeRecords(5);
        verifySearchResults(10);
    }

    @Test
    public void testDelete() throws Exception {
        this.props.put("behavior.on.null.values", ElasticsearchSinkConnectorConfig.BehaviorOnNullValues.DELETE.name());
        this.props.put("key.ignore", "false");
        runSimpleTest(this.props);
        this.connect.kafka().produce("test", String.valueOf(4), (String) null);
        verifySearchResults(4);
    }

    @Test
    public void testHappyPath() throws Exception {
        runSimpleTest(this.props);
    }

    @Test
    public void testHappyPathDataStream() throws Exception {
        setDataStream();
        runSimpleTest(this.props);
        Assert.assertEquals(this.index, this.helperClient.getDataStream(this.index).getName());
    }

    @Test
    public void testNullValue() throws Exception {
        runSimpleTest(this.props);
        this.connect.kafka().produce("test", String.valueOf(5), (String) null);
        verifySearchResults(5);
    }

    @Test
    public void testPrimitive() throws Exception {
        this.props.put("value.converter", StringConverter.class.getName());
        this.connect.configureConnector("es-connector", this.props);
        waitForConnectorToStart("es-connector", 1);
        for (int i = 0; i < 5; i++) {
            this.connect.kafka().produce("test", String.valueOf(i), String.valueOf(i));
        }
        waitForRecords(0);
    }

    @Test
    public void testUpsert() throws Exception {
        this.props.put("write.method", ElasticsearchSinkConnectorConfig.WriteMethod.UPSERT.toString());
        this.props.put("key.ignore", "false");
        runSimpleTest(this.props);
        this.connect.kafka().produce("test", String.valueOf(4), String.format("{\"doc_num\":%d}", 0));
        writeRecordsFromStartIndex(5, 5);
        verifySearchResults(10);
        Iterator it = this.helperClient.search("test").iterator();
        while (it.hasNext()) {
            if (Integer.parseInt(((SearchHit) it.next()).getId()) == 4) {
                Assert.assertEquals(0L, ((Integer) r0.getSourceAsMap().get("doc_num")).intValue());
            }
        }
    }

    @Test
    public void testBackwardsCompatibilityDataStream() throws Exception {
        container.close();
        container = ElasticsearchContainer.withESVersion("7.0.1");
        container.start();
        setupFromContainer();
        runSimpleTest(this.props);
        this.helperClient = null;
        container.close();
        container = ElasticsearchContainer.fromSystemProperties();
        container.start();
    }

    @Test
    public void testRoutingSmtSynchronousMode() throws Exception {
        this.index = addRoutingSmt("YYYYMM", "route-it-to-here-${topic}-at-${timestamp}");
        this.props.put("flush.synchronously", "true");
        runSimpleTest(this.props);
        waitForCommittedOffsets("es-connector", "test", 0, 5);
    }

    @Test
    public void testRoutingSmtAsynchronousMode() throws Exception {
        this.index = addRoutingSmt("YYYYMM", "route-it-to-here-${topic}-at-${timestamp}");
        this.props.put("flush.synchronously", "false");
        assertConnectorFailsOnWriteRecords(this.props, "Connector doesn't support topic mutating SMTs");
    }

    @Test
    public void testReconfigureToUseRoutingSMT() throws Exception {
        this.props.put("flush.synchronously", "false");
        runSimpleTest(this.props);
        this.props.put("flush.synchronously", "true");
        this.index = addRoutingSmt("YYYYMM", "route-it-to-here-${topic}-at-${timestamp}");
        runSimpleTest(this.props);
        waitForCommittedOffsets("es-connector", "test", 0, 10);
        this.props.put("flush.synchronously", "false");
        assertConnectorFailsOnWriteRecords(this.props, "Connector doesn't support topic mutating SMTs");
    }

    public void waitForCommittedOffsets(String str, String str2, int i, int i2) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            return ((long) i2) == getConnectorOffset(str, str2, i);
        }, CONNECTOR_COMMIT_DURATION_MS, "Connector tasks did not commit offsets in time.");
    }

    private String addRoutingSmt(String str, String str2) {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat(str);
        Date date = new Date(System.currentTimeMillis());
        this.props.put("transforms", "TimestampRouter");
        this.props.put("transforms.TimestampRouter.type", "org.apache.kafka.connect.transforms.TimestampRouter");
        this.props.put("transforms.TimestampRouter.topic.format", str2);
        this.props.put("transforms.TimestampRouter.timestamp.format", str);
        return str2.replace("${topic}", "test").replace("${timestamp}", simpleDateFormat.format(date));
    }
}
