package io.confluent.connect.jdbc.integration;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.storage.ConverterType;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.NoRetryException;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/connect/jdbc/integration/BaseConnectorIT.class */
public abstract class BaseConnectorIT {
    public static final String DLQ_TOPIC_NAME = "dlq-topic";
    private static final Logger log = LoggerFactory.getLogger(BaseConnectorIT.class);
    protected static final long CONSUME_MAX_DURATION_MS = TimeUnit.SECONDS.toMillis(10);
    protected static final long CONNECTOR_STARTUP_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
    protected static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(10);
    protected static final long OFFSETS_READ_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10);
    private Admin kafkaAdminClient;
    protected EmbeddedConnectCluster connect;

    /* JADX INFO: Access modifiers changed from: protected */
    public void startConnect() {
        this.connect = new EmbeddedConnectCluster.Builder().name("jdbc-connect-cluster").build();
        this.connect.start();
        this.kafkaAdminClient = this.connect.kafka().createAdminClient();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JsonConverter jsonConverter() {
        JsonConverter jsonConverter = new JsonConverter();
        jsonConverter.configure(Collections.singletonMap("converter.type", ConverterType.VALUE.getName()));
        return jsonConverter;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, String> baseSinkProps() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", "JdbcSinkConnector");
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", JsonConverter.class.getName());
        hashMap.put("confluent.topic.bootstrap.servers", this.connect.kafka().bootstrapServers());
        hashMap.put("confluent.topic.replication.factor", "1");
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void stopConnect() {
        if (this.kafkaAdminClient != null) {
            this.kafkaAdminClient.close();
            this.kafkaAdminClient = null;
        }
        if (this.connect != null) {
            this.connect.stop();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long waitForConnectorToStart(String str, int i) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            return assertConnectorAndTasksRunning(str, i).orElse(false).booleanValue();
        }, CONNECTOR_STARTUP_DURATION_MS, "Connector tasks did not start in time.");
        return System.currentTimeMillis();
    }

    protected Optional<Boolean> assertConnectorAndTasksRunning(String str, int i) {
        try {
            ConnectorStateInfo connectorStatus = this.connect.connectorStatus(str);
            return Optional.of(Boolean.valueOf(connectorStatus != null && connectorStatus.tasks().size() >= i && connectorStatus.connector().state().equals(AbstractStatus.State.RUNNING.toString()) && connectorStatus.tasks().stream().allMatch(taskState -> {
                return taskState.state().equals(AbstractStatus.State.RUNNING.toString());
            })));
        } catch (Exception e) {
            log.warn("Could not check connector state info.");
            return Optional.empty();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitForCommittedRecords(String str, Collection<String> collection, long j, int i, long j2) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            long j3 = totalCommittedRecords(str, collection);
            if (j3 >= j) {
                return true;
            }
            try {
                Assert.assertTrue("Connector or one of its tasks failed during testing", assertConnectorAndTasksRunning(str, i).orElse(false).booleanValue());
                log.debug("Connector has only committed {} records for topics {} so far; {} expected", new Object[]{Long.valueOf(j3), collection, Long.valueOf(j)});
                Thread.sleep(OFFSET_COMMIT_INTERVAL_MS / 2);
                return false;
            } catch (AssertionError e) {
                throw new NoRetryException(e);
            }
        }, j2, "Either the connector failed, or the message commit duration expired without all expected messages committed");
    }

    protected synchronized long totalCommittedRecords(String str, Collection<String> collection) throws TimeoutException, ExecutionException, InterruptedException {
        Map map = (Map) this.kafkaAdminClient.listConsumerGroupOffsets("connect-" + str).partitionsToOffsetAndMetadata().get(OFFSETS_READ_TIMEOUT_MS, TimeUnit.MILLISECONDS);
        log.trace("Connector {} has so far committed offsets {}", str, map);
        return map.entrySet().stream().filter(entry -> {
            return collection.contains(((TopicPartition) entry.getKey()).topic());
        }).mapToLong(entry2 -> {
            return ((OffsetAndMetadata) entry2.getValue()).offset();
        }).sum();
    }

    protected boolean assertConnectorIsRunningButTasksFailedWith(String str, int i, String str2) {
        try {
            ConnectorStateInfo connectorStatus = this.connect.connectorStatus(str);
            if (connectorStatus != null && connectorStatus.tasks().size() == i && connectorStatus.connector().state().equals(AbstractStatus.State.RUNNING.toString())) {
                if (connectorStatus.tasks().stream().allMatch(taskState -> {
                    return taskState.state().equals(AbstractStatus.State.FAILED.toString()) && taskState.trace().contains(str2);
                })) {
                    return true;
                }
            }
            return false;
        } catch (Exception e) {
            log.error("Could not check connector state info.", e);
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assertTasksFailedWithTrace(String str, int i, String str2) throws InterruptedException {
        try {
            TestUtils.waitForCondition(() -> {
                return assertConnectorIsRunningButTasksFailedWith(str, i, str2);
            }, CONNECTOR_STARTUP_DURATION_MS, "Either the connector is not running or not all the " + i + " tasks have failed.");
        } catch (AssertionError e) {
            throw new AssertionError("failed to verify that tasks have failed", e);
        }
    }
}
