package io.confluent.connect.elasticsearch;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.connect.elasticsearch.ElasticsearchWriter;
import io.searchbox.client.JestClient;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
/* loaded from: input_file:io/confluent/connect/elasticsearch/ElasticsearchWriterTest.class */
public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
    private final String key = "key";
    private final Schema schema = createSchema();
    private final Struct record = createRecord(this.schema);
    private final Schema otherSchema = createOtherSchema();
    private final Struct otherRecord = createOtherRecord(this.otherSchema);

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Test
    public void testWriter() throws Exception {
        writeDataAndRefresh(initWriter(this.client, false, false), prepareData(2));
        verifySearchResults(Collections.singletonList(new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.schema, this.record, 1L)), false, false);
    }

    @Test
    public void testWriterIgnoreKey() throws Exception {
        Collection<SinkRecord> prepareData = prepareData(2);
        writeDataAndRefresh(initWriter(this.client, true, false), prepareData);
        verifySearchResults(prepareData, true, false);
    }

    @Test
    public void testWriterIgnoreSchema() throws Exception {
        Collection<SinkRecord> prepareData = prepareData(2);
        writeDataAndRefresh(initWriter(this.client, true, true), prepareData);
        verifySearchResults(prepareData, true, true);
    }

    @Test
    public void testTopicIndexOverride() throws Exception {
        Collection<SinkRecord> prepareData = prepareData(2);
        writeDataAndRefresh(initWriter(this.client, true, Collections.emptySet(), true, Collections.emptySet(), Collections.singletonMap("topic", "index"), false), prepareData);
        verifySearchResults(prepareData, "index", true, true);
    }

    @Test
    public void testIncompatible() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.otherSchema, this.otherRecord, 0L));
        ElasticsearchWriter initWriter = initWriter(this.client, true, false);
        initWriter.write(arrayList);
        Thread.sleep(5000L);
        arrayList.clear();
        arrayList.add(new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.schema, this.record, 1L));
        initWriter.write(arrayList);
        try {
            initWriter.flush();
            fail("should fail because of mapper_parsing_exception");
        } catch (ConnectException e) {
        }
    }

    @Test
    public void testCompatible() throws Exception {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        SinkRecord sinkRecord = new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.schema, this.record, 0L);
        arrayList.add(sinkRecord);
        arrayList2.add(sinkRecord);
        SinkRecord sinkRecord2 = new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.schema, this.record, 1L);
        arrayList.add(sinkRecord2);
        arrayList2.add(sinkRecord2);
        ElasticsearchWriter initWriter = initWriter(this.client, true, false);
        initWriter.write(arrayList);
        arrayList.clear();
        SinkRecord sinkRecord3 = new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.otherSchema, this.otherRecord, 2L);
        arrayList.add(sinkRecord3);
        arrayList2.add(sinkRecord3);
        SinkRecord sinkRecord4 = new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.otherSchema, this.otherRecord, 3L);
        arrayList.add(sinkRecord4);
        arrayList2.add(sinkRecord4);
        writeDataAndRefresh(initWriter, arrayList);
        verifySearchResults(arrayList2, true, false);
    }

    @Test
    public void testSafeRedeliveryRegularKey() throws Exception {
        Struct struct = new Struct(this.schema);
        struct.put("user", "foo");
        struct.put("message", "hi");
        SinkRecord sinkRecord = new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.schema, struct, 0L);
        Struct struct2 = new Struct(this.schema);
        struct2.put("user", "foo");
        struct2.put("message", "bye");
        SinkRecord sinkRecord2 = new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.schema, struct2, 1L);
        ElasticsearchWriter initWriter = initWriter(this.client, false, false);
        initWriter.write(Arrays.asList(sinkRecord, sinkRecord2));
        initWriter.flush();
        writeDataAndRefresh(initWriter, Collections.singleton(sinkRecord));
        verifySearchResults(Collections.singleton(sinkRecord2), false, false);
    }

    @Test
    public void testSafeRedeliveryOffsetInKey() throws Exception {
        Struct struct = new Struct(this.schema);
        struct.put("user", "foo");
        struct.put("message", "hi");
        SinkRecord sinkRecord = new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.schema, struct, 0L);
        Struct struct2 = new Struct(this.schema);
        struct2.put("user", "foo");
        struct2.put("message", "bye");
        List asList = Arrays.asList(sinkRecord, new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.schema, struct2, 1L));
        ElasticsearchWriter initWriter = initWriter(this.client, true, false);
        initWriter.write(asList);
        initWriter.flush();
        writeDataAndRefresh(initWriter, asList);
        verifySearchResults(asList, true, false);
    }

    @Test
    public void testMap() throws Exception {
        Schema build = SchemaBuilder.struct().name("struct").field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build()).build();
        HashMap hashMap = new HashMap();
        hashMap.put(1, "One");
        hashMap.put(2, "Two");
        Struct struct = new Struct(build);
        struct.put("map", hashMap);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", build, struct, 0L));
        writeDataAndRefresh(initWriter(this.client, false, false), arrayList);
        verifySearchResults(arrayList, false, false);
    }

    @Test
    public void testStringKeyedMap() throws Exception {
        Schema build = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build();
        HashMap hashMap = new HashMap();
        hashMap.put("One", 1);
        hashMap.put("Two", 2);
        writeDataAndRefresh(initWriter(this.client, false, false), Collections.singletonList(new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", build, hashMap, 0L)));
        verifySearchResults(Collections.singletonList(new ObjectMapper().writeValueAsString(hashMap)), "topic", false, false);
    }

    @Test
    public void testDecimal() throws Exception {
        BigDecimal bigDecimal = new BigDecimal(new BigInteger(ByteBuffer.allocate(4).putInt(2).array()), 2);
        Schema build = SchemaBuilder.struct().name("struct").field("decimal", Decimal.schema(2)).build();
        Struct struct = new Struct(build);
        struct.put("decimal", bigDecimal);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", build, struct, 0L));
        writeDataAndRefresh(initWriter(this.client, false, false), arrayList);
        verifySearchResults(arrayList, false, false);
    }

    @Test
    public void testBytes() throws Exception {
        Schema build = SchemaBuilder.struct().name("struct").field("bytes", SchemaBuilder.BYTES_SCHEMA).build();
        Struct struct = new Struct(build);
        struct.put("bytes", new byte[]{42});
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", build, struct, 0L));
        writeDataAndRefresh(initWriter(this.client, false, false), arrayList);
        verifySearchResults(arrayList, false, false);
    }

    @Test
    public void testInvalidRecordException() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SinkRecord("topic", 12, Schema.STRING_SCHEMA, (Object) null, (Schema) null, new byte[]{42}, 0L));
        ElasticsearchWriter initWriter = initWriter(this.client, false, true, false);
        this.thrown.expect(ConnectException.class);
        this.thrown.expectMessage("Key is used as document id and can not be null");
        initWriter.write(arrayList);
    }

    @Test
    public void testDropInvalidRecord() throws Exception {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        Schema build = SchemaBuilder.struct().name("struct").field("bytes", SchemaBuilder.BYTES_SCHEMA).build();
        Struct struct = new Struct(build);
        struct.put("bytes", new byte[]{42});
        SinkRecord sinkRecord = new SinkRecord("topic", 12, Schema.STRING_SCHEMA, (Object) null, build, struct, 0L);
        SinkRecord sinkRecord2 = new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", build, struct, 1L);
        arrayList.add(sinkRecord2);
        arrayList.add(sinkRecord);
        arrayList2.add(sinkRecord2);
        writeDataAndRefresh(initWriter(this.client, false, true, true), arrayList);
        verifySearchResults(arrayList2, false, true);
    }

    private Collection<SinkRecord> prepareData(int i) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.schema, this.record, i2));
        }
        return arrayList;
    }

    private ElasticsearchWriter initWriter(JestClient jestClient, boolean z, boolean z2) {
        return initWriter(jestClient, z, Collections.emptySet(), z2, Collections.emptySet(), Collections.emptyMap(), false);
    }

    private ElasticsearchWriter initWriter(JestClient jestClient, boolean z, boolean z2, boolean z3) {
        return initWriter(jestClient, z, Collections.emptySet(), z2, Collections.emptySet(), Collections.emptyMap(), z3);
    }

    private ElasticsearchWriter initWriter(JestClient jestClient, boolean z, Set<String> set, boolean z2, Set<String> set2, Map<String, String> map, boolean z3) {
        ElasticsearchWriter build = new ElasticsearchWriter.Builder(jestClient).setType("kafka-connect").setIgnoreKey(z, set).setIgnoreSchema(z2, set2).setTopicToIndexMap(map).setFlushTimoutMs(10000L).setMaxBufferedRecords(10000).setMaxInFlightRequests(1).setBatchSize(2).setLingerMs(1000L).setRetryBackoffMs(1000L).setMaxRetry(3).setDropInvalidMessage(z3).build();
        build.start();
        build.createIndicesForTopics(Collections.singleton("topic"));
        return build;
    }

    private void writeDataAndRefresh(ElasticsearchWriter elasticsearchWriter, Collection<SinkRecord> collection) throws Exception {
        elasticsearchWriter.write(collection);
        elasticsearchWriter.flush();
        elasticsearchWriter.stop();
        refresh();
    }
}
