package io.confluent.connect.jdbc.sink.integration;

import io.confluent.connect.jdbc.integration.BaseConnectorIT;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.storage.StringConverter;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.FixedHostPortGenericContainer;

/* loaded from: input_file:io/confluent/connect/jdbc/sink/integration/MicrosoftSqlServerSinkIT.class */
public class MicrosoftSqlServerSinkIT extends BaseConnectorIT {
    private static final String CONNECTOR_NAME = "jdbc-sink-connector";
    private static final int TASKS_MAX = 3;
    private static final String MSSQL_URL = "jdbc:sqlserver://0.0.0.0:1433";
    private Map<String, String> props;
    private Connection connection;
    private JsonConverter jsonConverter;
    private static final String USER = "sa";
    private static final Logger log = LoggerFactory.getLogger(MicrosoftSqlServerSinkIT.class);
    private static final String MSSQL_Table = "example_table";
    private static final List<String> KAFKA_TOPICS = Collections.singletonList(MSSQL_Table);
    private static final String PASS = "reallyStrongPwd123";

    @ClassRule
    public static final FixedHostPortGenericContainer mssqlServer = new FixedHostPortGenericContainer("microsoft/mssql-server-linux:latest").withEnv("ACCEPT_EULA", "Y").withEnv("SA_PASSWORD", PASS).withFixedExposedPort(1433, 1433);

    @Before
    public void setup() throws Exception {
        Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
        this.connection = DriverManager.getConnection(MSSQL_URL, USER, PASS);
        startConnect();
        this.jsonConverter = jsonConverter();
    }

    @After
    public void close() throws SQLException {
        this.connect.deleteConnector(CONNECTOR_NAME);
        this.connection.close();
        stopConnect();
    }

    @Test
    public void verifyConnectorFailsWhenTableNameS() throws Exception {
        this.props = configProperties();
        executeSQL(this.connection.prepareStatement("CREATE TABLE guest.example_table (id int NULL, last_name VARCHAR(50), created_at DATETIME2 NOT NULL);"));
        KAFKA_TOPICS.forEach(str -> {
            this.connect.kafka().createTopic(str, 1);
        });
        configureAndWaitForConnector();
        Timestamp from = Timestamp.from(ZonedDateTime.of(2017, 12, 8, 19, 34, 56, 0, ZoneId.of("UTC")).toInstant());
        Schema build = SchemaBuilder.struct().name("com.example.Person").field("id", Schema.INT32_SCHEMA).field("last_name", Schema.STRING_SCHEMA).field("created_at", org.apache.kafka.connect.data.Timestamp.SCHEMA).build();
        this.connect.kafka().produce(MSSQL_Table, (String) null, new String(this.jsonConverter.fromConnectData(MSSQL_Table, build, new Struct(build).put("id", 1).put("last_name", "Brams").put("created_at", from))));
        Thread.sleep(Duration.ofSeconds(30L).toMillis());
        assertTasksFailedWithTrace(CONNECTOR_NAME, Math.min(KAFKA_TOPICS.size(), TASKS_MAX), "Table \"dbo\".\"example_table\" is missing and auto-creation is disabled");
    }

    private Map<String, String> configProperties() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", "JdbcSinkConnector");
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", JsonConverter.class.getName());
        hashMap.put("confluent.topic.bootstrap.servers", this.connect.kafka().bootstrapServers());
        hashMap.put("confluent.topic.replication.factor", "1");
        hashMap.put("connection.url", MSSQL_URL);
        hashMap.put("connection.user", USER);
        hashMap.put("connection.password", PASS);
        hashMap.put("pk.mode", "none");
        hashMap.put("topics", MSSQL_Table);
        return hashMap;
    }

    private void executeSQL(PreparedStatement preparedStatement) throws Exception {
        try {
            preparedStatement.executeUpdate();
        } catch (Exception e) {
            log.error("Could not execute SQL: " + preparedStatement.toString());
            throw e;
        }
    }

    private void configureAndWaitForConnector() throws Exception {
        this.connect.configureConnector(CONNECTOR_NAME, this.props);
        waitForConnectorToStart(CONNECTOR_NAME, Math.min(KAFKA_TOPICS.size(), TASKS_MAX));
    }
}
