package io.confluent.connect.ibm.mq;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.connect.jms.core.source.BaseConnectorIT;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.FixedHostPortGenericContainer;
import org.testcontainers.containers.GenericContainer;

@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/connect/ibm/mq/IbmMQSourceConnectorIT.class */
public class IbmMQSourceConnectorIT extends BaseConnectorIT {
    private static final int TASKS_MAX = 1;
    private static final String KAFKA_TOPIC = "jms.test";
    private static final Logger log = LoggerFactory.getLogger(IbmMQSourceConnectorIT.class);
    private static final int NUM_RECORDS = 10;
    private static final long BROKER_AVAILABILITY_TIMEOUT_MS = 30000;
    private static final String CONNECTOR_NAME = "ibm-mq-source-connector";
    private Map<String, String> settings;
    private IbmMQSourceConnectorConfig config;
    private IbmMQSourceTask task;
    private ConnectionFactory factory;
    private Connection connection;
    private Session session;
    private ObjectMapper objectMapper;

    @Rule
    public GenericContainer ibmMq = new FixedHostPortGenericContainer("ibmcom/mq").withFixedExposedPort(1414, 1414).withEnv("LICENSE", "accept").withEnv("MQ_QMGR_NAME", "QM1");

    @Before
    public void setup() throws Exception {
        startConnect();
        this.settings = setupConfigs();
        this.config = new IbmMQSourceConnectorConfig(this.settings);
        this.task = new IbmMQSourceTask();
        this.task.start(this.settings);
        this.factory = this.task.connectionFactory();
        setupConnection();
        this.objectMapper = new ObjectMapper();
    }

    @After
    public void close() {
        stopConnect();
    }

    @Test
    public void testSource() throws Throwable {
        this.connect.kafka().createTopic(KAFKA_TOPIC, TASKS_MAX);
        sendMessages(NUM_RECORDS);
        this.connect.configureConnector(CONNECTOR_NAME, this.settings);
        waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX);
        verifyRecords(NUM_RECORDS);
    }

    @Test
    public void testReconnectionRetry() throws Throwable {
        sendMessages(NUM_RECORDS);
        this.connect.kafka().createTopic(KAFKA_TOPIC, TASKS_MAX);
        this.connect.configureConnector(CONNECTOR_NAME, this.settings);
        waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX);
        for (int i = 0; i < 2; i += TASKS_MAX) {
            restartContainer();
            verifyRecords(NUM_RECORDS);
            sendMessages(NUM_RECORDS);
        }
        verifyRecords(NUM_RECORDS);
    }

    private Map<String, String> setupConfigs() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", "IbmMQSourceConnector");
        hashMap.put("tasks.max", Integer.toString(TASKS_MAX));
        hashMap.put("key.converter", JsonConverter.class.getName());
        hashMap.put("value.converter", JsonConverter.class.getName());
        hashMap.put("confluent.topic.replication.factor", "1");
        hashMap.put("confluent.topic.bootstrap.servers", this.connect.kafka().bootstrapServers());
        hashMap.put("kafka.topic", KAFKA_TOPIC);
        hashMap.put("mq.hostname", "localhost");
        hashMap.put("mq.queue.manager", "QM1");
        hashMap.put("mq.channel", "DEV.APP.SVRCONN");
        hashMap.put("mq.port", Integer.toString(1414));
        hashMap.put("mq.username", "app");
        hashMap.put("mq.password", "");
        hashMap.put("jms.destination.name", "DEV.QUEUE.1");
        return hashMap;
    }

    private void sendMessages(int i) throws JMSException {
        ArrayList arrayList = new ArrayList();
        MessageProducer createProducer = this.session.createProducer(this.session.createQueue(this.settings.get("jms.destination.name")));
        for (int i2 = 0; i2 < i; i2 += TASKS_MAX) {
            try {
                TextMessage createTextMessage = this.session.createTextMessage("This is a test");
                createTextMessage.setJMSMessageID(Integer.toString(i2));
                try {
                    createProducer.send(createTextMessage);
                    this.session.commit();
                    arrayList.add(createTextMessage);
                } catch (JMSException e) {
                    throw e;
                }
            } catch (Exception e2) {
                return;
            }
        }
    }

    private void verifyRecords(int i) throws IOException {
        Iterator it = this.connect.kafka().consume(i, CONSUME_MAX_DURATION_MS, new String[]{KAFKA_TOPIC}).iterator();
        while (it.hasNext()) {
            Assert.assertEquals(0, this.objectMapper.readTree((byte[]) ((ConsumerRecord) it.next()).key()).findValue("messageID").asInt());
        }
    }

    private void restartContainer() throws Exception {
        this.ibmMq.stop();
        this.ibmMq.start();
        setupConnection();
    }

    private void setupConnection() throws Exception {
        TestUtils.waitForCondition(() -> {
            return assertBrokerIsAvailable().orElse(false).booleanValue();
        }, BROKER_AVAILABILITY_TIMEOUT_MS, "Container did not start in time");
    }

    private Optional<Boolean> assertBrokerIsAvailable() {
        while (true) {
            try {
                this.connection = this.factory.createConnection(this.config.username(), this.config.password());
                this.session = this.connection.createSession(true, 2);
                return Optional.of(true);
            } catch (Exception e) {
            }
        }
    }
}
