package io.confluent.connect.elasticsearch;

import io.confluent.connect.elasticsearch.AsyncOffsetTracker;
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
import io.confluent.connect.elasticsearch.helper.ElasticsearchContainer;
import io.confluent.connect.elasticsearch.helper.ElasticsearchHelperClient;
import io.confluent.connect.elasticsearch.helper.NetworkErrorContainer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.test.TestUtils;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.search.SearchHit;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:io/confluent/connect/elasticsearch/ElasticsearchClientTest.class */
public class ElasticsearchClientTest {
    private static final String INDEX = "index";
    private static final String ELASTIC_SUPERUSER_NAME = "elastic";
    private static final String ELASTIC_SUPERUSER_PASSWORD = "elastic";
    private static final String TOPIC = "index";
    private static final String DATA_STREAM_TYPE = "logs";
    private static final String DATA_STREAM_DATASET = "dataset";
    private static ElasticsearchContainer container;
    private DataConverter converter;
    private ElasticsearchHelperClient helperClient;
    private ElasticsearchSinkConnectorConfig config;
    private Map<String, String> props;
    private String index;
    private OffsetTracker offsetTracker;

    @BeforeClass
    public static void setupBeforeAll() {
        container = ElasticsearchContainer.fromSystemProperties();
        container.start();
    }

    @AfterClass
    public static void cleanupAfterAll() {
        container.close();
    }

    @Before
    public void setup() {
        this.index = "index";
        this.props = ElasticsearchSinkConnectorConfigTest.addNecessaryProps(new HashMap());
        this.props.put("connection.url", container.getConnectionUrl());
        this.props.put("key.ignore", "true");
        this.props.put("linger.ms", "1000");
        this.config = new ElasticsearchSinkConnectorConfig(this.props);
        this.converter = new DataConverter(this.config);
        this.helperClient = new ElasticsearchHelperClient(container.getConnectionUrl(), this.config);
        this.offsetTracker = (OffsetTracker) Mockito.mock(OffsetTracker.class);
    }

    @After
    public void cleanup() throws IOException {
        if (this.helperClient == null || !this.helperClient.indexExists(this.index)) {
            return;
        }
        this.helperClient.deleteIndex(this.index, this.config.isDataStream());
    }

    @Test
    public void testClose() {
        new ElasticsearchClient(this.config, (ErrantRecordReporter) null, () -> {
            this.offsetTracker.updateOffsets();
        }).close();
    }

    @Test
    public void testCloseFails() throws Exception {
        this.props.put("batch.size", "1");
        this.props.put("max.in.flight.requests", "1");
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(this.config, null, () -> {
            this.offsetTracker.updateOffsets();
        }) { // from class: io.confluent.connect.elasticsearch.ElasticsearchClientTest.1
            public void close() {
                try {
                    if (this.bulkProcessor.awaitClose(1L, TimeUnit.MILLISECONDS)) {
                    } else {
                        throw new ConnectException("Failed to process all outstanding requests in time.");
                    }
                } catch (InterruptedException e) {
                }
            }
        };
        writeRecord(sinkRecord(0), elasticsearchClient);
        Assert.assertThrows("Failed to process all outstanding requests in time.", ConnectException.class, () -> {
            elasticsearchClient.close();
        });
        waitUntilRecordsInES(1);
    }

    @Test
    public void testCreateIndex() throws IOException {
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(this.config, (ErrantRecordReporter) null, () -> {
            this.offsetTracker.updateOffsets();
        });
        Assert.assertFalse(this.helperClient.indexExists(this.index));
        elasticsearchClient.createIndexOrDataStream(this.index);
        Assert.assertTrue(this.helperClient.indexExists(this.index));
        elasticsearchClient.close();
    }

    @Test
    public void testCreateExistingDataStream() throws Exception {
        this.props.put("data.stream.type", DATA_STREAM_TYPE);
        this.props.put("data.stream.dataset", DATA_STREAM_DATASET);
        this.config = new ElasticsearchSinkConnectorConfig(this.props);
        this.index = createIndexName("index");
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(this.config, (ErrantRecordReporter) null, () -> {
            this.offsetTracker.updateOffsets();
        });
        this.index = createIndexName("index");
        Assert.assertTrue(elasticsearchClient.createIndexOrDataStream(this.index));
        Assert.assertTrue(this.helperClient.indexExists(this.index));
        Assert.assertFalse(elasticsearchClient.createIndexOrDataStream(this.index));
        elasticsearchClient.close();
    }

    @Test
    public void testCreateNewDataStream() throws Exception {
        this.props.put("data.stream.type", DATA_STREAM_TYPE);
        this.props.put("data.stream.dataset", DATA_STREAM_DATASET);
        this.config = new ElasticsearchSinkConnectorConfig(this.props);
        this.index = createIndexName("index");
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(this.config, (ErrantRecordReporter) null, () -> {
            this.offsetTracker.updateOffsets();
        });
        this.index = createIndexName("index");
        Assert.assertTrue(elasticsearchClient.createIndexOrDataStream(this.index));
        Assert.assertTrue(this.helperClient.indexExists(this.index));
        elasticsearchClient.close();
    }

    @Test
    public void testDoesNotCreateAlreadyExistingIndex() throws IOException {
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(this.config, (ErrantRecordReporter) null, () -> {
            this.offsetTracker.updateOffsets();
        });
        Assert.assertFalse(this.helperClient.indexExists(this.index));
        Assert.assertTrue(elasticsearchClient.createIndexOrDataStream(this.index));
        Assert.assertTrue(this.helperClient.indexExists(this.index));
        Assert.assertFalse(elasticsearchClient.createIndexOrDataStream(this.index));
        Assert.assertTrue(this.helperClient.indexExists(this.index));
        elasticsearchClient.close();
    }

    @Test
    public void testIndexExists() throws IOException {
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(this.config, (ErrantRecordReporter) null, () -> {
            this.offsetTracker.updateOffsets();
        });
        Assert.assertFalse(this.helperClient.indexExists(this.index));
        Assert.assertTrue(elasticsearchClient.createIndexOrDataStream(this.index));
        Assert.assertTrue(elasticsearchClient.indexExists(this.index));
        elasticsearchClient.close();
    }

    @Test
    public void testIndexDoesNotExist() throws IOException {
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(this.config, (ErrantRecordReporter) null, () -> {
            this.offsetTracker.updateOffsets();
        });
        Assert.assertFalse(this.helperClient.indexExists(this.index));
        Assert.assertFalse(elasticsearchClient.indexExists(this.index));
        elasticsearchClient.close();
    }

    @Test
    public void testCreateMapping() throws IOException {
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(this.config, (ErrantRecordReporter) null, () -> {
            this.offsetTracker.updateOffsets();
        });
        elasticsearchClient.createIndexOrDataStream(this.index);
        elasticsearchClient.createMapping(this.index, schema());
        Assert.assertTrue(elasticsearchClient.hasMapping(this.index));
        Map sourceAsMap = this.helperClient.getMapping(this.index).sourceAsMap();
        Assert.assertTrue(sourceAsMap.containsKey("properties"));
        Map map = (Map) sourceAsMap.get("properties");
        Assert.assertTrue(map.containsKey("offset"));
        Assert.assertTrue(map.containsKey("another"));
        Map map2 = (Map) map.get("offset");
        Assert.assertEquals("integer", map2.get("type"));
        Assert.assertEquals(0, map2.get("null_value"));
        Map map3 = (Map) map.get("another");
        Assert.assertEquals("integer", map3.get("type"));
        Assert.assertEquals(0, map3.get("null_value"));
        elasticsearchClient.close();
    }

    @Test
    public void testHasMapping() {
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(this.config, (ErrantRecordReporter) null, () -> {
            this.offsetTracker.updateOffsets();
        });
        elasticsearchClient.createIndexOrDataStream(this.index);
        elasticsearchClient.createMapping(this.index, schema());
        Assert.assertTrue(elasticsearchClient.hasMapping(this.index));
        elasticsearchClient.close();
    }

    @Test
    public void testDoesNotHaveMapping() {
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(this.config, (ErrantRecordReporter) null, () -> {
            this.offsetTracker.updateOffsets();
        });
        elasticsearchClient.createIndexOrDataStream(this.index);
        Assert.assertFalse(elasticsearchClient.hasMapping(this.index));
        elasticsearchClient.close();
    }

    @Test
    public void testBuffersCorrectly() throws Exception {
        this.props.put("max.in.flight.requests", "1");
        this.props.put("max.buffered.records", "1");
        this.config = new ElasticsearchSinkConnectorConfig(this.props);
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(this.config, (ErrantRecordReporter) null, () -> {
            this.offsetTracker.updateOffsets();
        });
        elasticsearchClient.createIndexOrDataStream(this.index);
        writeRecord(sinkRecord(0), elasticsearchClient);
        Assert.assertEquals(1L, elasticsearchClient.numBufferedRecords.get());
        elasticsearchClient.flush();
        waitUntilRecordsInES(1);
        Assert.assertEquals(1L, this.helperClient.getDocCount("index"));
        Assert.assertEquals(0L, elasticsearchClient.numBufferedRecords.get());
        writeRecord(sinkRecord(1), elasticsearchClient);
        Assert.assertEquals(1L, elasticsearchClient.numBufferedRecords.get());
        writeRecord(sinkRecord(2), elasticsearchClient);
        Assert.assertEquals(1L, elasticsearchClient.numBufferedRecords.get());
        waitUntilRecordsInES(3);
        elasticsearchClient.close();
    }

    @Test
    public void testFlush() throws Exception {
        this.props.put("linger.ms", String.valueOf(TimeUnit.DAYS.toMillis(1L)));
        this.config = new ElasticsearchSinkConnectorConfig(this.props);
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(this.config, (ErrantRecordReporter) null, () -> {
            this.offsetTracker.updateOffsets();
        });
        elasticsearchClient.createIndexOrDataStream(this.index);
        writeRecord(sinkRecord(0), elasticsearchClient);
        Assert.assertEquals(0L, this.helperClient.getDocCount(this.index));
        elasticsearchClient.flush();
        waitUntilRecordsInES(1);
        Assert.assertEquals(1L, this.helperClient.getDocCount(this.index));
        elasticsearchClient.close();
    }

    @Test
    public void testIndexRecord() throws Exception {
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(this.config, (ErrantRecordReporter) null, () -> {
            this.offsetTracker.updateOffsets();
        });
        elasticsearchClient.createIndexOrDataStream(this.index);
        writeRecord(sinkRecord(0), elasticsearchClient);
        elasticsearchClient.flush();
        waitUntilRecordsInES(1);
        Assert.assertEquals(1L, this.helperClient.getDocCount(this.index));
        elasticsearchClient.close();
    }

    @Test
    public void testDeleteRecord() throws Exception {
        this.props.put("behavior.on.null.values", ElasticsearchSinkConnectorConfig.BehaviorOnNullValues.DELETE.name());
        this.props.put("key.ignore", "false");
        this.config = new ElasticsearchSinkConnectorConfig(this.props);
        this.converter = new DataConverter(this.config);
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(this.config, (ErrantRecordReporter) null, () -> {
            this.offsetTracker.updateOffsets();
        });
        elasticsearchClient.createIndexOrDataStream(this.index);
        writeRecord(sinkRecord("key0", 0), elasticsearchClient);
        writeRecord(sinkRecord("key1", 1), elasticsearchClient);
        elasticsearchClient.flush();
        waitUntilRecordsInES(2);
        writeRecord(sinkRecord("key0", null, null, 3), elasticsearchClient);
        waitUntilRecordsInES(1);
        elasticsearchClient.close();
    }

    @Test
    public void testUpsertRecords() throws Exception {
        this.props.put("write.method", ElasticsearchSinkConnectorConfig.WriteMethod.UPSERT.name());
        this.props.put("key.ignore", "false");
        this.config = new ElasticsearchSinkConnectorConfig(this.props);
        this.converter = new DataConverter(this.config);
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(this.config, (ErrantRecordReporter) null, () -> {
            this.offsetTracker.updateOffsets();
        });
        elasticsearchClient.createIndexOrDataStream(this.index);
        writeRecord(sinkRecord("key0", 0), elasticsearchClient);
        writeRecord(sinkRecord("key1", 1), elasticsearchClient);
        elasticsearchClient.flush();
        waitUntilRecordsInES(2);
        Schema build = SchemaBuilder.struct().name("record").field("offset", SchemaBuilder.int32().defaultValue(0).build()).field("another", SchemaBuilder.int32().defaultValue(0).build()).build();
        SinkRecord sinkRecord = sinkRecord("key0", build, new Struct(build).put("offset", 2), 2);
        SinkRecord sinkRecord2 = sinkRecord("key0", build, new Struct(build).put("offset", 3), 3);
        writeRecord(sinkRecord, elasticsearchClient);
        writeRecord(sinkRecord2, elasticsearchClient);
        writeRecord(sinkRecord("key2", 4), elasticsearchClient);
        elasticsearchClient.flush();
        waitUntilRecordsInES(3);
        Iterator it = this.helperClient.search(this.index).iterator();
        while (it.hasNext()) {
            SearchHit searchHit = (SearchHit) it.next();
            if (searchHit.getId().equals("key0")) {
                Assert.assertEquals(3, searchHit.getSourceAsMap().get("offset"));
                Assert.assertEquals(0, searchHit.getSourceAsMap().get("another"));
            }
        }
        elasticsearchClient.close();
    }

    @Test
    public void testIgnoreBadRecord() throws Exception {
        this.props.put("behavior.on.malformed.documents", ElasticsearchSinkConnectorConfig.BehaviorOnMalformedDoc.IGNORE.name());
        this.config = new ElasticsearchSinkConnectorConfig(this.props);
        this.converter = new DataConverter(this.config);
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(this.config, (ErrantRecordReporter) null, () -> {
            this.offsetTracker.updateOffsets();
        });
        elasticsearchClient.createIndexOrDataStream(this.index);
        elasticsearchClient.createMapping(this.index, schema());
        Schema build = SchemaBuilder.struct().name("record").field("not_mapped_field", SchemaBuilder.int32().defaultValue(0).build()).build();
        SinkRecord sinkRecord = sinkRecord("key", build, new Struct(build).put("not_mapped_field", 420), 0);
        writeRecord(sinkRecord(0), elasticsearchClient);
        elasticsearchClient.flush();
        writeRecord(sinkRecord, elasticsearchClient);
        elasticsearchClient.flush();
        writeRecord(sinkRecord(1), elasticsearchClient);
        elasticsearchClient.flush();
        waitUntilRecordsInES(2);
        Assert.assertEquals(2L, this.helperClient.getDocCount(this.index));
        elasticsearchClient.close();
    }

    @Test(expected = ConnectException.class)
    public void testFailOnBadRecord() throws Exception {
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(this.config, (ErrantRecordReporter) null, () -> {
            this.offsetTracker.updateOffsets();
        });
        elasticsearchClient.createIndexOrDataStream(this.index);
        elasticsearchClient.createMapping(this.index, schema());
        Schema build = SchemaBuilder.struct().name("record").field("offset", SchemaBuilder.bool().defaultValue(false).build()).build();
        SinkRecord sinkRecord = sinkRecord("key", build, new Struct(build).put("offset", false), 0);
        writeRecord(sinkRecord(0), elasticsearchClient);
        elasticsearchClient.flush();
        waitUntilRecordsInES(1);
        writeRecord(sinkRecord, elasticsearchClient);
        elasticsearchClient.flush();
        for (int i = 0; i < 5; i++) {
            try {
                writeRecord(sinkRecord(i + 1), elasticsearchClient);
                elasticsearchClient.flush();
                waitUntilRecordsInES(i + 2);
            } catch (ConnectException e) {
                elasticsearchClient.close();
                throw e;
            }
        }
    }

    @Test
    public void testRetryRecordsOnSocketTimeoutFailure() throws Exception {
        this.props.put("linger.ms", "60000");
        this.props.put("batch.size", "2");
        this.props.put("max.retries", "100");
        this.props.put("retry.backoff.ms", "1000");
        this.props.put("max.in.flight.requests", "1");
        this.config = new ElasticsearchSinkConnectorConfig(this.props);
        this.converter = new DataConverter(this.config);
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(this.config, (ErrantRecordReporter) null, () -> {
            this.offsetTracker.updateOffsets();
        });
        elasticsearchClient.createIndexOrDataStream(this.index);
        NetworkErrorContainer networkErrorContainer = new NetworkErrorContainer(container.getContainerName());
        networkErrorContainer.start();
        writeRecord(sinkRecord(0), elasticsearchClient);
        elasticsearchClient.flush();
        Thread.sleep(this.config.readTimeoutMs() * 4);
        networkErrorContainer.stop();
        waitUntilRecordsInES(1);
    }

    @Test
    public void testReporter() throws Exception {
        this.props.put("key.ignore", "false");
        this.props.put("behavior.on.malformed.documents", ElasticsearchSinkConnectorConfig.BehaviorOnMalformedDoc.IGNORE.name());
        this.config = new ElasticsearchSinkConnectorConfig(this.props);
        this.converter = new DataConverter(this.config);
        ErrantRecordReporter errantRecordReporter = (ErrantRecordReporter) Mockito.mock(ErrantRecordReporter.class);
        Mockito.when(errantRecordReporter.report((SinkRecord) ArgumentMatchers.any(), (Throwable) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(null));
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(this.config, errantRecordReporter, () -> {
            this.offsetTracker.updateOffsets();
        });
        elasticsearchClient.createIndexOrDataStream(this.index);
        elasticsearchClient.createMapping(this.index, schema());
        Schema build = SchemaBuilder.struct().name("record").field("offset", SchemaBuilder.bool().defaultValue(false).build()).build();
        SinkRecord sinkRecord = sinkRecord("key0", build, new Struct(build).put("offset", false), 1);
        writeRecord(sinkRecord("key0", 0), elasticsearchClient);
        elasticsearchClient.flush();
        waitUntilRecordsInES(1);
        writeRecord(sinkRecord, elasticsearchClient);
        elasticsearchClient.flush();
        for (int i = 2; i < 7; i++) {
            writeRecord(sinkRecord("key" + i, i + 1), elasticsearchClient);
            elasticsearchClient.flush();
            waitUntilRecordsInES(i);
        }
        ((ErrantRecordReporter) Mockito.verify(errantRecordReporter, Mockito.times(1))).report((SinkRecord) ArgumentMatchers.eq(sinkRecord), (Throwable) ArgumentMatchers.any(Throwable.class));
        elasticsearchClient.close();
    }

    @Test
    public void testReporterNotCalled() throws Exception {
        ErrantRecordReporter errantRecordReporter = (ErrantRecordReporter) Mockito.mock(ErrantRecordReporter.class);
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(this.config, errantRecordReporter, () -> {
            this.offsetTracker.updateOffsets();
        });
        elasticsearchClient.createIndexOrDataStream(this.index);
        writeRecord(sinkRecord(0), elasticsearchClient);
        writeRecord(sinkRecord(1), elasticsearchClient);
        writeRecord(sinkRecord(2), elasticsearchClient);
        elasticsearchClient.flush();
        waitUntilRecordsInES(3);
        Assert.assertEquals(3L, this.helperClient.getDocCount(this.index));
        ((ErrantRecordReporter) Mockito.verify(errantRecordReporter, Mockito.never())).report((SinkRecord) ArgumentMatchers.eq(sinkRecord(0)), (Throwable) ArgumentMatchers.any(Throwable.class));
        elasticsearchClient.close();
    }

    @Test
    public void testNoVersionConflict() throws Exception {
        this.props.put("key.ignore", "false");
        this.props.put("write.method", ElasticsearchSinkConnectorConfig.WriteMethod.UPSERT.name());
        this.config = new ElasticsearchSinkConnectorConfig(this.props);
        this.converter = new DataConverter(this.config);
        ErrantRecordReporter errantRecordReporter = (ErrantRecordReporter) Mockito.mock(ErrantRecordReporter.class);
        ErrantRecordReporter errantRecordReporter2 = (ErrantRecordReporter) Mockito.mock(ErrantRecordReporter.class);
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(this.config, errantRecordReporter, () -> {
            this.offsetTracker.updateOffsets();
        });
        ElasticsearchClient elasticsearchClient2 = new ElasticsearchClient(this.config, errantRecordReporter2, () -> {
            this.offsetTracker.updateOffsets();
        });
        elasticsearchClient.createIndexOrDataStream(this.index);
        writeRecord(sinkRecord(0), elasticsearchClient);
        writeRecord(sinkRecord(1), elasticsearchClient2);
        writeRecord(sinkRecord(2), elasticsearchClient);
        writeRecord(sinkRecord(3), elasticsearchClient2);
        writeRecord(sinkRecord(4), elasticsearchClient);
        writeRecord(sinkRecord(5), elasticsearchClient2);
        waitUntilRecordsInES(1);
        Assert.assertEquals(1L, this.helperClient.getDocCount(this.index));
        ((ErrantRecordReporter) Mockito.verify(errantRecordReporter, Mockito.never())).report((SinkRecord) ArgumentMatchers.any(SinkRecord.class), (Throwable) ArgumentMatchers.any(Throwable.class));
        ((ErrantRecordReporter) Mockito.verify(errantRecordReporter2, Mockito.never())).report((SinkRecord) ArgumentMatchers.any(SinkRecord.class), (Throwable) ArgumentMatchers.any(Throwable.class));
        elasticsearchClient.close();
        elasticsearchClient2.close();
    }

    @Test
    public void testSsl() throws Exception {
        container.close();
        container = ElasticsearchContainer.fromSystemProperties().withSslEnabled(true);
        container.start();
        String connectionUrl = container.getConnectionUrl(false);
        this.props.put("connection.url", connectionUrl);
        this.props.put("connection.username", "elastic");
        this.props.put("connection.password", "elastic");
        this.props.put("elastic.security.protocol", ElasticsearchSinkConnectorConfig.SecurityProtocol.SSL.name());
        this.props.put("elastic.https.ssl.keystore.location", container.getKeystorePath());
        this.props.put("elastic.https.ssl.keystore.password", container.getKeystorePassword());
        this.props.put("elastic.https.ssl.truststore.location", container.getTruststorePath());
        this.props.put("elastic.https.ssl.truststore.password", container.getTruststorePassword());
        this.props.put("elastic.https.ssl.key.password", container.getKeyPassword());
        this.config = new ElasticsearchSinkConnectorConfig(this.props);
        this.converter = new DataConverter(this.config);
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(this.config, (ErrantRecordReporter) null, () -> {
            this.offsetTracker.updateOffsets();
        });
        this.helperClient = new ElasticsearchHelperClient(connectionUrl, this.config);
        elasticsearchClient.createIndexOrDataStream(this.index);
        writeRecord(sinkRecord(0), elasticsearchClient);
        elasticsearchClient.flush();
        waitUntilRecordsInES(1);
        Assert.assertEquals(1L, this.helperClient.getDocCount(this.index));
        elasticsearchClient.close();
        this.helperClient = null;
        container.close();
        container = ElasticsearchContainer.fromSystemProperties();
        container.start();
    }

    @Test
    public void testWriteDataStreamInjectTimestamp() throws Exception {
        this.props.put("data.stream.type", DATA_STREAM_TYPE);
        this.props.put("data.stream.dataset", DATA_STREAM_DATASET);
        this.config = new ElasticsearchSinkConnectorConfig(this.props);
        this.converter = new DataConverter(this.config);
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(this.config, (ErrantRecordReporter) null, () -> {
            this.offsetTracker.updateOffsets();
        });
        this.index = createIndexName("index");
        Assert.assertTrue(elasticsearchClient.createIndexOrDataStream(this.index));
        Assert.assertTrue(this.helperClient.indexExists(this.index));
        writeRecord(sinkRecord(0), elasticsearchClient);
        elasticsearchClient.flush();
        waitUntilRecordsInES(1);
        Assert.assertEquals(1L, this.helperClient.getDocCount(this.index));
        elasticsearchClient.close();
    }

    private String createIndexName(String str) {
        return this.config.isDataStream() ? String.format("%s-%s-%s", DATA_STREAM_TYPE, DATA_STREAM_DATASET, str) : str;
    }

    @Test
    public void testConnectionUrlExtraSlash() {
        this.props.put("connection.url", container.getConnectionUrl() + "/");
        this.config = new ElasticsearchSinkConnectorConfig(this.props);
        new ElasticsearchClient(this.config, (ErrantRecordReporter) null, () -> {
            this.offsetTracker.updateOffsets();
        }).close();
    }

    private static Schema schema() {
        return SchemaBuilder.struct().name("record").field("offset", SchemaBuilder.int32().defaultValue(0).build()).field("another", SchemaBuilder.int32().defaultValue(0).build()).build();
    }

    private static SinkRecord sinkRecord(int i) {
        return sinkRecord("key", i);
    }

    private static SinkRecord sinkRecord(String str, int i) {
        return sinkRecord(str, schema(), new Struct(schema()).put("offset", Integer.valueOf(i)).put("another", Integer.valueOf(i + 1)), i);
    }

    private static SinkRecord sinkRecord(String str, Schema schema, Struct struct, int i) {
        return new SinkRecord("index", 0, Schema.STRING_SCHEMA, str, schema, struct, i, Long.valueOf(System.currentTimeMillis()), TimestampType.CREATE_TIME);
    }

    private void waitUntilRecordsInES(int i) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            try {
                return this.helperClient.getDocCount(this.index) == ((long) i);
            } catch (ElasticsearchStatusException e) {
                if (e.getMessage().contains("index_not_found_exception")) {
                    return false;
                }
                throw e;
            }
        }, TimeUnit.MINUTES.toMillis(1L), String.format("Could not find expected documents (%d) in time.", Integer.valueOf(i)));
    }

    private void writeRecord(SinkRecord sinkRecord, ElasticsearchClient elasticsearchClient) {
        elasticsearchClient.index(sinkRecord, this.converter.convertRecord(sinkRecord, createIndexName(sinkRecord.topic())), new AsyncOffsetTracker.AsyncOffsetState(sinkRecord.kafkaOffset()));
    }
}
