package io.confluent.connect.jdbc.source.integration;

import ch.vorburger.mariadb4j.DBConfigurationBuilder;
import ch.vorburger.mariadb4j.junit.MariaDB4jRule;
import io.confluent.common.utils.IntegrationTest;
import io.confluent.connect.jdbc.JdbcSourceConnector;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/connect/jdbc/source/integration/PauseResumeIT.class */
public class PauseResumeIT {
    private static final String CONNECTOR_NAME = "JdbcSourceConnector";

    @Rule
    public MariaDB4jRule dbRule = new MariaDB4jRule(DBConfigurationBuilder.newBuilder().setPort(0).build(), "testdb", (String) null);
    EmbeddedConnectCluster connect;
    Map<String, String> props;
    private static final long CONNECTOR_STARTUP_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
    private static final long POLLING_INTERVAL_MS = TimeUnit.SECONDS.toMillis(10);
    private static Logger log = LoggerFactory.getLogger(PauseResumeIT.class);

    @Before
    public void before() throws Exception {
        this.props = new HashMap();
        this.props.put("connector.class", JdbcSourceConnector.class.getName());
        this.props.put("tasks.max", "1");
        this.props.put("connection.url", this.dbRule.getURL());
        this.props.put("connection.user", "root");
        this.props.put("mode", "incrementing");
        this.props.put("incrementing.column.name", "id");
        this.props.put("poll.interval.ms", Long.toString(POLLING_INTERVAL_MS));
        this.props.put("topic.prefix", "topic_");
        this.connect = new EmbeddedConnectCluster.Builder().name("connect-cluster").numWorkers(1).brokerProps(new Properties()).build();
        log.debug("Starting embedded Connect worker, Kafka broker, and ZK");
        this.connect.start();
    }

    @After
    public void after() {
        if (this.connect != null) {
            this.connect.stop();
        }
    }

    @Test
    public void testPauseResume() throws Exception {
        Connection connection = getConnection();
        Throwable th = null;
        try {
            Statement createStatement = connection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.executeUpdate("CREATE TABLE accounts(id INTEGER AUTO_INCREMENT NOT NULL, name VARCHAR(255), PRIMARY KEY (id))");
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    this.connect.configureConnector(CONNECTOR_NAME, this.props);
                    waitForConnectorToStart(CONNECTOR_NAME, 1);
                    Thread.sleep(POLLING_INTERVAL_MS);
                    this.connect.requestPut(this.connect.endpointForResource(String.format("connectors/%s/pause", CONNECTOR_NAME)), "");
                    waitForConnectorState(CONNECTOR_NAME, 1, 3 * POLLING_INTERVAL_MS, AbstractStatus.State.PAUSED);
                    this.connect.requestPut(this.connect.endpointForResource(String.format("connectors/%s/resume", CONNECTOR_NAME)), "");
                    waitForConnectorState(CONNECTOR_NAME, 1, 3 * POLLING_INTERVAL_MS, AbstractStatus.State.RUNNING);
                } finally {
                }
            } catch (Throwable th4) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    connection.close();
                }
            }
        }
    }

    protected Optional<Boolean> assertConnectorAndTasksStatus(String str, int i, AbstractStatus.State state) {
        try {
            ConnectorStateInfo connectorStatus = this.connect.connectorStatus(str);
            return Optional.of(Boolean.valueOf(connectorStatus != null && connectorStatus.tasks().size() >= i && connectorStatus.connector().state().equals(state.toString()) && connectorStatus.tasks().stream().allMatch(taskState -> {
                return taskState.state().equals(state.toString());
            })));
        } catch (Exception e) {
            log.debug("Could not check connector state info.", e);
            return Optional.empty();
        }
    }

    protected long waitForConnectorToStart(String str, int i) throws InterruptedException {
        return waitForConnectorState(str, i, CONNECTOR_STARTUP_DURATION_MS, AbstractStatus.State.RUNNING);
    }

    protected long waitForConnectorState(String str, int i, long j, AbstractStatus.State state) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            return assertConnectorAndTasksStatus(str, i, state).orElse(false).booleanValue();
        }, j, "Connector tasks did not transition to state " + state + " in time");
        return System.currentTimeMillis();
    }

    private Connection getConnection() throws SQLException {
        return DriverManager.getConnection(this.dbRule.getURL(), "root", "");
    }
}
