package io.confluent.connect.elasticsearch.integration;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder;
import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.junit.WireMockRule;
import io.confluent.common.utils.IntegrationTest;
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnector;
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
import io.confluent.connect.elasticsearch.ElasticsearchSinkTask;
import io.confluent.connect.elasticsearch.helper.ElasticSearchMockUtil;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.apache.kafka.connect.storage.StringConverter;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import org.testcontainers.shaded.com.google.common.collect.ImmutableSet;

@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/connect/elasticsearch/integration/ElasticsearchSinkTaskIT.class */
public class ElasticsearchSinkTaskIT {

    @Rule
    public WireMockRule wireMockRule = new WireMockRule(WireMockConfiguration.options().dynamicPort().extensions(new String[]{BlockingTransformer.class.getName()}), false);
    protected static final String TOPIC = "test";
    protected static final int TASKS_MAX = 1;
    private final boolean synchronousFlush;

    @Parameterized.Parameters(name = "{index}: syncFlush={0}")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{true}, new Object[]{false});
    }

    public ElasticsearchSinkTaskIT(boolean z) {
        this.synchronousFlush = z;
    }

    private static ResponseDefinitionBuilder ok() {
        return ElasticSearchMockUtil.basicEmptyOk();
    }

    @Before
    public void setup() {
        WireMock.stubFor(WireMock.any(WireMock.anyUrl()).atPriority(10).willReturn(ok()));
    }

    @Test
    public void testOffsetCommit() {
        this.wireMockRule.stubFor(WireMock.post(WireMock.urlPathEqualTo("/")).willReturn(ok().withFixedDelay(10000)));
        this.wireMockRule.stubFor(WireMock.post(WireMock.urlPathEqualTo("/_bulk")).willReturn(ok().withFixedDelay(60000)));
        Map<String, String> createProps = createProps();
        createProps.put("read.timeout.ms", "1000");
        createProps.put("max.retries", "2");
        createProps.put("retry.backoff.ms", "10");
        createProps.put("batch.size", "1");
        ElasticsearchSinkTask elasticsearchSinkTask = new ElasticsearchSinkTask();
        TopicPartition topicPartition = new TopicPartition(TOPIC, 0);
        SinkTaskContext sinkTaskContext = (SinkTaskContext) Mockito.mock(SinkTaskContext.class);
        Mockito.when(sinkTaskContext.assignment()).thenReturn(ImmutableSet.of(topicPartition));
        elasticsearchSinkTask.initialize(sinkTaskContext);
        elasticsearchSinkTask.start(createProps);
        elasticsearchSinkTask.put(ImmutableList.of(sinkRecord(topicPartition, 0L), sinkRecord(topicPartition, 1L)));
        Assertions.assertThat(elasticsearchSinkTask.preCommit(ImmutableMap.of(topicPartition, new OffsetAndMetadata(2L)))).isEmpty();
    }

    @Test
    public void testIndividualFailure() throws JsonProcessingException {
        this.wireMockRule.stubFor(WireMock.post(WireMock.urlPathEqualTo("/_bulk")).willReturn(WireMock.okJson(ElasticsearchConnectorNetworkIT.errorBulkResponse(3, "strict_dynamic_mapping_exception", TASKS_MAX))));
        Map<String, String> createProps = createProps();
        createProps.put("read.timeout.ms", "1000");
        createProps.put("max.retries", "2");
        createProps.put("retry.backoff.ms", "10");
        createProps.put("max.in.flight.requests", "1");
        createProps.put("batch.size", "3");
        createProps.put("linger.ms", "10000");
        createProps.put("behavior.on.malformed.documents", "ignore");
        TopicPartition topicPartition = new TopicPartition(TOPIC, 0);
        ElasticsearchSinkTask elasticsearchSinkTask = new ElasticsearchSinkTask();
        SinkTaskContext sinkTaskContext = (SinkTaskContext) Mockito.mock(SinkTaskContext.class);
        Mockito.when(sinkTaskContext.assignment()).thenReturn(ImmutableSet.of(topicPartition));
        elasticsearchSinkTask.initialize(sinkTaskContext);
        elasticsearchSinkTask.start(createProps);
        List list = (List) IntStream.range(0, 6).boxed().map(num -> {
            return sinkRecord(topicPartition, num.intValue());
        }).collect(Collectors.toList());
        elasticsearchSinkTask.put(list);
        ImmutableMap of = ImmutableMap.of(topicPartition, new OffsetAndMetadata(6L));
        Assertions.assertThat(elasticsearchSinkTask.preCommit(of)).isEqualTo(of);
        createProps.put("behavior.on.malformed.documents", "fail");
        elasticsearchSinkTask.initialize(sinkTaskContext);
        elasticsearchSinkTask.start(createProps);
        Assertions.assertThatThrownBy(() -> {
            elasticsearchSinkTask.put(list);
        }).isInstanceOf(ConnectException.class).hasMessageContaining("Indexing record failed");
        Assertions.assertThat(getOffsetOrZero(elasticsearchSinkTask.preCommit(ImmutableMap.of(topicPartition, new OffsetAndMetadata(0L))), topicPartition)).isLessThanOrEqualTo(1L);
    }

    private long getOffsetOrZero(Map<TopicPartition, OffsetAndMetadata> map, TopicPartition topicPartition) {
        OffsetAndMetadata offsetAndMetadata = map.get(topicPartition);
        if (offsetAndMetadata == null) {
            return 0L;
        }
        return offsetAndMetadata.offset();
    }

    @Test
    public void testConvertDataException() throws JsonProcessingException {
        this.wireMockRule.stubFor(WireMock.post(WireMock.urlPathEqualTo("/_bulk")).willReturn(WireMock.okJson(ElasticsearchConnectorNetworkIT.errorBulkResponse(3))));
        Map<String, String> createProps = createProps();
        createProps.put("read.timeout.ms", "1000");
        createProps.put("max.retries", "2");
        createProps.put("retry.backoff.ms", "10");
        createProps.put("max.in.flight.requests", "1");
        createProps.put("batch.size", "10");
        createProps.put("linger.ms", "10000");
        createProps.put("key.ignore", "false");
        createProps.put("drop.invalid.message", "true");
        TopicPartition topicPartition = new TopicPartition(TOPIC, 0);
        ElasticsearchSinkTask elasticsearchSinkTask = new ElasticsearchSinkTask();
        SinkTaskContext sinkTaskContext = (SinkTaskContext) Mockito.mock(SinkTaskContext.class);
        Mockito.when(sinkTaskContext.assignment()).thenReturn(ImmutableSet.of(topicPartition));
        elasticsearchSinkTask.initialize(sinkTaskContext);
        elasticsearchSinkTask.start(createProps);
        ImmutableList of = ImmutableList.of(sinkRecord(topicPartition, 0L), sinkRecord(topicPartition, 1L, null, "value"), sinkRecord(topicPartition, 2L));
        elasticsearchSinkTask.put(of);
        ImmutableMap of2 = ImmutableMap.of(topicPartition, new OffsetAndMetadata(3L));
        Assertions.assertThat(elasticsearchSinkTask.preCommit(of2)).isEqualTo(of2);
        createProps.put("drop.invalid.message", "false");
        elasticsearchSinkTask.initialize(sinkTaskContext);
        elasticsearchSinkTask.start(createProps);
        elasticsearchSinkTask.open(ImmutableList.of(topicPartition));
        Assertions.assertThatThrownBy(() -> {
            elasticsearchSinkTask.put(of);
        }).isInstanceOf(DataException.class).hasMessageContaining("Key is used as document id and can not be null");
        Assertions.assertThat(((OffsetAndMetadata) elasticsearchSinkTask.preCommit(ImmutableMap.of(topicPartition, new OffsetAndMetadata(0L))).get(topicPartition)).offset()).isLessThanOrEqualTo(1L);
    }

    @Test
    public void testNullValue() throws JsonProcessingException {
        this.wireMockRule.stubFor(WireMock.post(WireMock.urlPathEqualTo("/_bulk")).willReturn(WireMock.okJson(ElasticsearchConnectorNetworkIT.errorBulkResponse(3))));
        Map<String, String> createProps = createProps();
        createProps.put("read.timeout.ms", "1000");
        createProps.put("max.retries", "2");
        createProps.put("retry.backoff.ms", "10");
        createProps.put("max.in.flight.requests", "1");
        createProps.put("batch.size", "10");
        createProps.put("linger.ms", "10000");
        createProps.put("behavior.on.null.values", "ignore");
        TopicPartition topicPartition = new TopicPartition(TOPIC, 0);
        ElasticsearchSinkTask elasticsearchSinkTask = new ElasticsearchSinkTask();
        SinkTaskContext sinkTaskContext = (SinkTaskContext) Mockito.mock(SinkTaskContext.class);
        Mockito.when(sinkTaskContext.assignment()).thenReturn(ImmutableSet.of(topicPartition));
        elasticsearchSinkTask.initialize(sinkTaskContext);
        elasticsearchSinkTask.start(createProps);
        ImmutableList of = ImmutableList.of(sinkRecord(topicPartition, 0L), sinkRecord(topicPartition, 1L, "testKey", null), sinkRecord(topicPartition, 2L));
        elasticsearchSinkTask.put(of);
        ImmutableMap of2 = ImmutableMap.of(topicPartition, new OffsetAndMetadata(3L));
        Assertions.assertThat(elasticsearchSinkTask.preCommit(of2)).isEqualTo(of2);
        createProps.put("behavior.on.null.values", "fail");
        elasticsearchSinkTask.initialize(sinkTaskContext);
        elasticsearchSinkTask.start(createProps);
        Assertions.assertThatThrownBy(() -> {
            elasticsearchSinkTask.put(of);
        }).isInstanceOf(DataException.class).hasMessageContaining("null value encountered");
        Assertions.assertThat(((OffsetAndMetadata) elasticsearchSinkTask.preCommit(ImmutableMap.of(topicPartition, new OffsetAndMetadata(0L))).get(topicPartition)).offset()).isLessThanOrEqualTo(1L);
    }

    @Test
    public void testPutRetry() throws Exception {
        this.wireMockRule.stubFor(WireMock.post(WireMock.urlPathEqualTo("/_bulk")).withRequestBody(WireMock.containing("{\"doc_num\":0}")).willReturn(WireMock.okJson(ElasticsearchConnectorNetworkIT.errorBulkResponse())));
        this.wireMockRule.stubFor(WireMock.post(WireMock.urlPathEqualTo("/_bulk")).withRequestBody(WireMock.containing("{\"doc_num\":1}")).willReturn(WireMock.aResponse().withFixedDelay(60000)));
        Map<String, String> createProps = createProps();
        createProps.put("read.timeout.ms", "1000");
        createProps.put("max.retries", "2");
        createProps.put("retry.backoff.ms", "10");
        createProps.put("batch.size", "1");
        ElasticsearchSinkTask elasticsearchSinkTask = new ElasticsearchSinkTask();
        TopicPartition topicPartition = new TopicPartition(TOPIC, 0);
        SinkTaskContext sinkTaskContext = (SinkTaskContext) Mockito.mock(SinkTaskContext.class);
        Mockito.when(sinkTaskContext.assignment()).thenReturn(ImmutableSet.of(topicPartition));
        elasticsearchSinkTask.initialize(sinkTaskContext);
        elasticsearchSinkTask.start(createProps);
        ImmutableList of = ImmutableList.of(sinkRecord(topicPartition, 0L), sinkRecord(topicPartition, 1L));
        elasticsearchSinkTask.open(ImmutableList.of(topicPartition));
        elasticsearchSinkTask.put(of);
        ImmutableMap of2 = ImmutableMap.of(topicPartition, new OffsetAndMetadata(2L));
        if (!this.synchronousFlush) {
            Awaitility.await().untilAsserted(() -> {
                Assertions.assertThat(elasticsearchSinkTask.preCommit(of2)).isEqualTo(ImmutableMap.of(topicPartition, new OffsetAndMetadata(1L)));
            });
        }
        this.wireMockRule.stubFor(WireMock.post(WireMock.urlPathEqualTo("/_bulk")).willReturn(WireMock.okJson(ElasticsearchConnectorNetworkIT.errorBulkResponse())));
        elasticsearchSinkTask.put(of);
        Awaitility.await().untilAsserted(() -> {
            Assertions.assertThat(elasticsearchSinkTask.preCommit(of2)).isEqualTo(of2);
        });
    }

    @Test
    public void testOffsetsBackpressure() throws Exception {
        this.wireMockRule.stubFor(WireMock.post(WireMock.urlPathEqualTo("/_bulk")).willReturn(WireMock.okJson(ElasticsearchConnectorNetworkIT.errorBulkResponse())));
        this.wireMockRule.stubFor(WireMock.post(WireMock.urlPathEqualTo("/_bulk")).withRequestBody(WireMock.containing("{\"doc_num\":0}")).willReturn(WireMock.okJson(ElasticsearchConnectorNetworkIT.errorBulkResponse()).withTransformers(new String[]{BlockingTransformer.NAME})));
        Map<String, String> createProps = createProps();
        createProps.put("read.timeout.ms", "1000");
        createProps.put("max.retries", "2");
        createProps.put("retry.backoff.ms", "10");
        createProps.put("batch.size", "1");
        createProps.put("max.buffered.records", "2");
        ElasticsearchSinkTask elasticsearchSinkTask = new ElasticsearchSinkTask();
        TopicPartition topicPartition = new TopicPartition(TOPIC, 0);
        SinkTaskContext sinkTaskContext = (SinkTaskContext) Mockito.mock(SinkTaskContext.class);
        Mockito.when(sinkTaskContext.assignment()).thenReturn(ImmutableSet.of(topicPartition));
        elasticsearchSinkTask.initialize(sinkTaskContext);
        elasticsearchSinkTask.start(createProps);
        elasticsearchSinkTask.open(ImmutableList.of(topicPartition));
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i += TASKS_MAX) {
            arrayList.add(sinkRecord(topicPartition, i));
        }
        elasticsearchSinkTask.put(arrayList);
        if (!this.synchronousFlush) {
            ((SinkTaskContext) Mockito.verify(sinkTaskContext)).pause(new TopicPartition[]{topicPartition});
        }
        BlockingTransformer.getInstance(this.wireMockRule).release(TASKS_MAX);
        Awaitility.await().untilAsserted(() -> {
            elasticsearchSinkTask.put(Collections.emptyList());
            if (this.synchronousFlush) {
                return;
            }
            ((SinkTaskContext) Mockito.verify(sinkTaskContext)).resume(new TopicPartition[]{topicPartition});
        });
    }

    @Test
    public void testRebalance() throws Exception {
        Assume.assumeFalse(this.synchronousFlush);
        this.wireMockRule.stubFor(WireMock.post(WireMock.urlPathEqualTo("/_bulk")).withRequestBody(WireMock.containing("{\"doc_num\":0}")).willReturn(WireMock.okJson(ElasticsearchConnectorNetworkIT.errorBulkResponse())));
        this.wireMockRule.stubFor(WireMock.post(WireMock.urlPathEqualTo("/_bulk")).withRequestBody(WireMock.containing("{\"doc_num\":1}")).willReturn(WireMock.aResponse().withFixedDelay(60000)));
        Map<String, String> createProps = createProps();
        createProps.put("read.timeout.ms", "1000");
        createProps.put("max.retries", "2");
        createProps.put("retry.backoff.ms", "10");
        createProps.put("batch.size", "1");
        ElasticsearchSinkTask elasticsearchSinkTask = new ElasticsearchSinkTask();
        TopicPartition topicPartition = new TopicPartition(TOPIC, 0);
        TopicPartition topicPartition2 = new TopicPartition(TOPIC, TASKS_MAX);
        SinkTaskContext sinkTaskContext = (SinkTaskContext) Mockito.mock(SinkTaskContext.class);
        Mockito.when(sinkTaskContext.assignment()).thenReturn(ImmutableSet.of(topicPartition, topicPartition2));
        elasticsearchSinkTask.initialize(sinkTaskContext);
        elasticsearchSinkTask.start(createProps);
        elasticsearchSinkTask.put(ImmutableList.of(sinkRecord(topicPartition, 0L), sinkRecord(topicPartition, 1L), sinkRecord(topicPartition2, 0L)));
        ImmutableMap of = ImmutableMap.of(topicPartition, new OffsetAndMetadata(2L), topicPartition2, new OffsetAndMetadata(1L));
        Awaitility.await().untilAsserted(() -> {
            Assertions.assertThat(elasticsearchSinkTask.preCommit(of)).isEqualTo(ImmutableMap.of(topicPartition, new OffsetAndMetadata(1L), topicPartition2, new OffsetAndMetadata(1L)));
        });
        elasticsearchSinkTask.close(ImmutableList.of(topicPartition));
        elasticsearchSinkTask.open(ImmutableList.of(new TopicPartition(TOPIC, 2)));
        Mockito.when(sinkTaskContext.assignment()).thenReturn(ImmutableSet.of(topicPartition2));
        Awaitility.await().untilAsserted(() -> {
            Assertions.assertThat(elasticsearchSinkTask.preCommit(of)).isEqualTo(ImmutableMap.of(topicPartition2, new OffsetAndMetadata(1L)));
        });
    }

    private SinkRecord sinkRecord(TopicPartition topicPartition, long j) {
        return sinkRecord(topicPartition.topic(), topicPartition.partition(), j);
    }

    private SinkRecord sinkRecord(TopicPartition topicPartition, long j, String str, Object obj) {
        return sinkRecord(topicPartition.topic(), topicPartition.partition(), j, str, obj);
    }

    private SinkRecord sinkRecord(String str, int i, long j) {
        return sinkRecord(str, i, j, "testKey", ImmutableMap.of("doc_num", Long.valueOf(j)));
    }

    private SinkRecord sinkRecord(String str, int i, long j, String str2, Object obj) {
        return new SinkRecord(str, i, (Schema) null, str2, (Schema) null, obj, j);
    }

    protected Map<String, String> createProps() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", ElasticsearchSinkConnector.class.getName());
        hashMap.put("topics", TOPIC);
        hashMap.put("tasks.max", Integer.toString(TASKS_MAX));
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", JsonConverter.class.getName());
        hashMap.put("value.converter.schemas.enable", "false");
        hashMap.put("connection.url", this.wireMockRule.url("/"));
        hashMap.put("key.ignore", "true");
        hashMap.put("schema.ignore", "true");
        hashMap.put("write.method", ElasticsearchSinkConnectorConfig.WriteMethod.UPSERT.toString());
        hashMap.put("flush.synchronously", Boolean.toString(this.synchronousFlush));
        return hashMap;
    }
}
