package io.confluent.connect.replicator;

import io.confluent.connect.replicator.KafkaConfigs;
import io.confluent.connect.replicator.config.ConnectConfigProvider;
import io.confluent.connect.replicator.util.TrialPeriod;
import io.confluent.license.InvalidLicenseException;
import io.confluent.license.LicenseManager;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.zookeeper.data.Stat;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import scala.Tuple2;

/* loaded from: input_file:io/confluent/connect/replicator/ReplicatorSourceConnectorTest.class */
public class ReplicatorSourceConnectorTest extends EasyMockSupport {

    @Mock
    private ReplicatorSourceConnectorConfig config;

    @Mock
    private TopicMonitorThread monitorThread;

    @Mock
    private LicenseManager licenseManager;

    @Rule
    public ExpectedException exceptionRule = ExpectedException.none();

    @Before
    public void startup() {
        this.config = (ReplicatorSourceConnectorConfig) createMock(ReplicatorSourceConnectorConfig.class);
        this.monitorThread = (TopicMonitorThread) createMock(TopicMonitorThread.class);
        this.licenseManager = (LicenseManager) createMock(LicenseManager.class);
    }

    @Test
    public void testVersion() {
        String version = new ReplicatorSourceConnector(this.config, this.monitorThread, this.licenseManager).version();
        Assert.assertNotNull(version);
        Assert.assertFalse(version.isEmpty());
    }

    @Test
    public void testTaskConfigs() {
        HashMap hashMap = new HashMap();
        hashMap.put(KafkaConfigs.KafkaCluster.SOURCE.prefix() + "bootstrap.servers", "foo:9092");
        hashMap.put(KafkaConfigs.KafkaCluster.DESTINATION.prefix() + "bootstrap.servers", "bar:9092");
        hashMap.put("topic.regex", ".*");
        EasyMock.expect(this.config.originalsStrings()).andStubReturn(hashMap);
        EasyMock.expect(Integer.valueOf(this.config.getOffsetTranslatorTasksMax())).andStubReturn(-1);
        EasyMock.expect(Boolean.valueOf(this.config.areOffsetTranslatorTasksSeparate())).andStubReturn(false);
        HashMap hashMap2 = new HashMap();
        List singletonList = Collections.singletonList(new TopicPartition("topic", 0));
        List singletonList2 = Collections.singletonList(new TopicPartition("topic", 1));
        hashMap2.put("task1", new ConsumerPartitionAssignor.Assignment(singletonList));
        hashMap2.put("task2", new ConsumerPartitionAssignor.Assignment(singletonList2));
        EasyMock.expect(this.monitorThread.assignments(2)).andReturn(hashMap2);
        replayAll();
        List taskConfigs = new ReplicatorSourceConnector(this.config, this.monitorThread, this.licenseManager).taskConfigs(2);
        Assert.assertEquals(2L, taskConfigs.size());
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = new ReplicatorSourceTaskConfig((Map) taskConfigs.get(0));
        Assert.assertEquals(singletonList, replicatorSourceTaskConfig.getPartitionAssignment().partitions());
        Assert.assertEquals("task1", replicatorSourceTaskConfig.getTaskId());
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig2 = new ReplicatorSourceTaskConfig((Map) taskConfigs.get(1));
        Assert.assertEquals(singletonList2, replicatorSourceTaskConfig2.getPartitionAssignment().partitions());
        Assert.assertEquals("task2", replicatorSourceTaskConfig2.getTaskId());
        verifyAll();
    }

    @Test
    public void testInfersProducerConfig() throws InvalidLicenseException {
        HashMap hashMap = new HashMap();
        hashMap.put(KafkaConfigs.KafkaCluster.DESTINATION.prefix() + "client.id", "someClientId");
        HashMap hashMap2 = new HashMap();
        hashMap2.put("task1", new ConsumerPartitionAssignor.Assignment(Collections.singletonList(new TopicPartition("topic", 0))));
        EasyMock.expect(this.monitorThread.assignments(1)).andReturn(hashMap2);
        this.monitorThread.start();
        EasyMock.expectLastCall().asStub();
        EasyMock.expect(this.licenseManager.registerOrValidateLicense(EasyMock.anyString())).andReturn((Object) null);
        ConnectConfigProvider connectConfigProvider = (ConnectConfigProvider) mock(ConnectConfigProvider.class);
        EasyMock.expect(connectConfigProvider.getProducerConfig()).andReturn(new HashMap<String, String>() { // from class: io.confluent.connect.replicator.ReplicatorSourceConnectorTest.1
            {
                put("bootstrap.servers", "buzz:9092");
            }
        });
        replayAll();
        ReplicatorSourceConnector replicatorSourceConnector = new ReplicatorSourceConnector((ReplicatorSourceConnectorConfig) null, this.monitorThread, this.licenseManager);
        replicatorSourceConnector.setConnectConfigProvider(connectConfigProvider);
        replicatorSourceConnector.start(hashMap);
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = new ReplicatorSourceTaskConfig((Map) replicatorSourceConnector.taskConfigs(1).get(0));
        Assert.assertEquals(2L, replicatorSourceTaskConfig.dstAdminClientConfig().size());
        Assert.assertEquals("buzz:9092", replicatorSourceTaskConfig.dstAdminClientConfig().get("bootstrap.servers"));
        Assert.assertEquals("someClientId", replicatorSourceTaskConfig.dstAdminClientConfig().get("client.id"));
        verifyAll();
    }

    @Test
    public void testDestKafkaOverrides() throws InvalidLicenseException {
        HashMap hashMap = new HashMap();
        hashMap.put(KafkaConfigs.KafkaCluster.DESTINATION.prefix() + "security.protocol", "SASL_SSL");
        HashMap hashMap2 = new HashMap();
        hashMap2.put("task1", new ConsumerPartitionAssignor.Assignment(Collections.singletonList(new TopicPartition("topic", 0))));
        EasyMock.expect(this.monitorThread.assignments(1)).andReturn(hashMap2);
        this.monitorThread.start();
        EasyMock.expectLastCall().asStub();
        EasyMock.expect(this.licenseManager.registerOrValidateLicense(EasyMock.anyString())).andReturn((Object) null);
        ConnectConfigProvider connectConfigProvider = (ConnectConfigProvider) mock(ConnectConfigProvider.class);
        EasyMock.expect(connectConfigProvider.getProducerConfig()).andReturn(new HashMap<String, String>() { // from class: io.confluent.connect.replicator.ReplicatorSourceConnectorTest.2
            {
                put("security.protocol", "PLAINTEXT");
            }
        });
        replayAll();
        ReplicatorSourceConnector replicatorSourceConnector = new ReplicatorSourceConnector((ReplicatorSourceConnectorConfig) null, this.monitorThread, this.licenseManager);
        replicatorSourceConnector.setConnectConfigProvider(connectConfigProvider);
        replicatorSourceConnector.start(hashMap);
        Assert.assertEquals("SASL_SSL", new ReplicatorSourceTaskConfig((Map) replicatorSourceConnector.taskConfigs(1).get(0)).dstAdminClientConfig().get("security.protocol"));
        verifyAll();
    }

    @Test
    public void testNoConnectOverridesIfDestBootstrapServersExist() throws InvalidLicenseException {
        HashMap hashMap = new HashMap();
        hashMap.put(KafkaConfigs.KafkaCluster.DESTINATION.prefix() + "bootstrap.servers", "foo:9092");
        HashMap hashMap2 = new HashMap();
        hashMap2.put("task1", new ConsumerPartitionAssignor.Assignment(Collections.singletonList(new TopicPartition("topic", 0))));
        EasyMock.expect(this.monitorThread.assignments(1)).andReturn(hashMap2);
        this.monitorThread.start();
        EasyMock.expectLastCall().asStub();
        EasyMock.expect(this.licenseManager.registerOrValidateLicense(EasyMock.anyString())).andReturn((Object) null);
        ConnectConfigProvider connectConfigProvider = (ConnectConfigProvider) mock(ConnectConfigProvider.class);
        replayAll();
        ReplicatorSourceConnector replicatorSourceConnector = new ReplicatorSourceConnector((ReplicatorSourceConnectorConfig) null, this.monitorThread, this.licenseManager);
        replicatorSourceConnector.setConnectConfigProvider(connectConfigProvider);
        replicatorSourceConnector.start(hashMap);
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = new ReplicatorSourceTaskConfig((Map) replicatorSourceConnector.taskConfigs(1).get(0));
        Assert.assertEquals((Object) null, replicatorSourceTaskConfig.dstAdminClientConfig().get("security.protocol"));
        Assert.assertEquals("foo:9092", replicatorSourceTaskConfig.dstAdminClientConfig().get("bootstrap.servers"));
        verifyAll();
    }

    @Test
    public void testDestKafkaOverridesIgnoreInterceptors() throws InvalidLicenseException {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap2.put("task1", new ConsumerPartitionAssignor.Assignment(Collections.singletonList(new TopicPartition("topic", 0))));
        EasyMock.expect(this.monitorThread.assignments(1)).andReturn(hashMap2);
        this.monitorThread.start();
        EasyMock.expectLastCall().asStub();
        EasyMock.expect(this.licenseManager.registerOrValidateLicense(EasyMock.anyString())).andReturn((Object) null);
        ConnectConfigProvider connectConfigProvider = (ConnectConfigProvider) mock(ConnectConfigProvider.class);
        EasyMock.expect(connectConfigProvider.getProducerConfig()).andReturn(new HashMap<String, String>() { // from class: io.confluent.connect.replicator.ReplicatorSourceConnectorTest.3
            {
                put("interceptor.classes", "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor");
            }
        });
        replayAll();
        ReplicatorSourceConnector replicatorSourceConnector = new ReplicatorSourceConnector((ReplicatorSourceConnectorConfig) null, this.monitorThread, this.licenseManager);
        replicatorSourceConnector.setConnectConfigProvider(connectConfigProvider);
        replicatorSourceConnector.start(hashMap);
        List taskConfigs = replicatorSourceConnector.taskConfigs(1);
        Assert.assertEquals(1L, taskConfigs.size());
        Assert.assertEquals(0L, new ReplicatorSourceTaskConfig((Map) taskConfigs.get(0)).dstAdminClientConfig().size());
        verifyAll();
    }

    @Test
    public void testConnectorStart() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(KafkaConfigs.KafkaCluster.SOURCE.prefix() + "bootstrap.servers", "foo:9092");
        hashMap.put(KafkaConfigs.KafkaCluster.DESTINATION.prefix() + "bootstrap.servers", "bar:9092");
        hashMap.put("topic.regex", ".*");
        hashMap.put("confluent.topic", "_confluent_custom_topic");
        EasyMock.expect(this.config.originalsStrings()).andStubReturn(hashMap);
        EasyMock.expect(this.config.getName()).andReturn("replicator");
        EasyMock.expect(this.config.getString("confluent.topic")).andReturn("_confluent_custom_topic");
        EasyMock.expect(this.config.getString(EasyMock.anyString())).andReturn("").times(2);
        EasyMock.expect(this.config.originalsWithPrefix(EasyMock.anyString())).andReturn(new HashMap());
        EasyMock.expect(this.config.originalsWithPrefix(EasyMock.anyString())).andReturn(new HashMap());
        EasyMock.expect(this.config.originalsWithPrefix(EasyMock.anyString())).andReturn(new HashMap());
        EasyMock.expect(this.config.originalsWithPrefix(EasyMock.anyString())).andReturn(new HashMap());
        EasyMock.expect(this.config.getSourceConsumerConfigs()).andStubReturn(new HashMap());
        EasyMock.expect(this.licenseManager.registerOrValidateLicense(EasyMock.anyString())).andReturn((Object) null);
        EasyMock.expect(this.config.getString(EasyMock.anyString())).andReturn((Object) null);
        this.monitorThread.start();
        EasyMock.expectLastCall();
        replayAll();
        new ReplicatorSourceConnector(this.config, this.monitorThread, this.licenseManager).start(hashMap);
        verifyAll();
    }

    @Test
    public void testConnectorStartWithSourceZk() throws InvalidLicenseException {
        HashMap hashMap = new HashMap();
        hashMap.put(KafkaConfigs.KafkaCluster.SOURCE.prefix() + "bootstrap.servers", "foo:9092");
        hashMap.put(KafkaConfigs.KafkaCluster.DESTINATION.prefix() + "bootstrap.servers", "bar:9092");
        hashMap.put("topic.regex", ".*");
        EasyMock.expect(this.config.originalsStrings()).andStubReturn(hashMap);
        EasyMock.expect(this.config.getName()).andReturn("replicator");
        EasyMock.expect(this.config.getString(EasyMock.anyString())).andReturn("");
        EasyMock.expect(this.config.getString(EasyMock.anyString())).andReturn("foo:2181");
        EasyMock.expect(this.config.originalsWithPrefix(KafkaConfigs.KafkaCluster.DESTINATION.prefix())).andReturn(Collections.singletonMap("bootstrap.servers", "bar:9092"));
        EasyMock.expect(this.config.originalsWithPrefix("confluent.topic.")).andReturn(Collections.singletonMap("bootstrap.servers", "bar:9092"));
        EasyMock.expect(this.config.originalsWithPrefix("confluent.topic.consumer.")).andReturn(Collections.singletonMap("group.id", "foobar"));
        EasyMock.expect(this.config.originalsWithPrefix("confluent.topic.producer.")).andReturn(Collections.singletonMap("client.id", "foobar"));
        EasyMock.expect(this.config.getString("confluent.topic")).andReturn("_confluent-command");
        EasyMock.expect(this.config.getString("confluent.license")).andReturn("");
        EasyMock.expect(this.licenseManager.registerOrValidateLicense("")).andThrow(new InvalidLicenseException(""));
        EasyMock.expect(Boolean.valueOf(this.config.isTrial())).andReturn(true);
        KafkaZkClient kafkaZkClient = (KafkaZkClient) createMock(KafkaZkClient.class);
        EasyMock.expect(this.config.buildDestKafkaZkClient(Time.SYSTEM)).andReturn(kafkaZkClient);
        kafkaZkClient.close();
        EasyMock.expectLastCall();
        EasyMock.expect(kafkaZkClient.defaultAcls("/confluent-replicator")).andReturn((Object) null);
        EasyMock.expect(Boolean.valueOf(kafkaZkClient.pathExists("/confluent-replicator"))).andReturn(true);
        kafkaZkClient.makeSurePersistentPathExists(Mockito.anyString());
        EasyMock.expectLastCall().andVoid();
        Tuple2 tuple2 = (Tuple2) createMock(Tuple2.class);
        EasyMock.expect(kafkaZkClient.getDataAndStat(EasyMock.anyString())).andReturn(tuple2);
        Stat stat = (Stat) createMock(Stat.class);
        EasyMock.expect(tuple2._2()).andReturn(stat);
        EasyMock.expect(Long.valueOf(stat.getCtime())).andReturn(Long.valueOf(System.currentTimeMillis() - (TrialPeriod.TRIAL_LIMIT_MILLIS + 1000000)));
        replayAll();
        this.exceptionRule.expect(ConnectException.class);
        this.exceptionRule.expectMessage("Trial period expired");
        new ReplicatorSourceConnector(this.config, this.monitorThread, this.licenseManager).start(hashMap);
        verifyAll();
    }

    @Test
    public void testConnectorStop() {
        HashMap hashMap = new HashMap();
        hashMap.put(KafkaConfigs.KafkaCluster.SOURCE.prefix() + "bootstrap.servers", "foo:9092");
        hashMap.put(KafkaConfigs.KafkaCluster.DESTINATION.prefix() + "bootstrap.servers", "bar:9092");
        hashMap.put("topic.regex", ".*");
        EasyMock.expect(this.config.originalsStrings()).andStubReturn(hashMap);
        this.licenseManager.stop();
        EasyMock.expectLastCall();
        EasyMock.expect(this.config.getName()).andReturn("replicator");
        this.monitorThread.shutdown();
        EasyMock.expectLastCall();
        replayAll();
        new ReplicatorSourceConnector(this.config, this.monitorThread, this.licenseManager).stop();
        verifyAll();
    }
}
