package io.confluent.connect.replicator;

import io.confluent.connect.replicator.ReplicatorSourceConnectorConfig;
import io.confluent.connect.replicator.metrics.ConfluentReplicatorTaskMetricsGroup;
import io.confluent.connect.replicator.offsets.ConsumerOffsetsTopicCommitter;
import io.confluent.connect.replicator.offsets.ConsumerOffsetsTranslator;
import io.confluent.connect.replicator.offsets.ConsumerTimestampsCommitter;
import io.confluent.connect.replicator.offsets.ConsumerTimestampsWriter;
import io.confluent.connect.replicator.util.ByteArrayConverter;
import io.confluent.connect.replicator.util.MockTime;
import io.confluent.connect.replicator.util.ReplicatorAdminClient;
import io.confluent.connect.replicator.util.TopicMetadata;
import io.confluent.connect.replicator.util.TranslatorMonitor;
import io.confluent.connect.replicator.util.Utils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.easymock.IAnswer;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.theories.DataPoint;
import org.junit.experimental.theories.Theories;
import org.junit.experimental.theories.Theory;
import org.junit.runner.RunWith;

@RunWith(Theories.class)
/* loaded from: input_file:io/confluent/connect/replicator/ReplicatorSourceTaskTest.class */
public class ReplicatorSourceTaskTest extends EasyMockSupport {
    private MockTime time;
    private static final TimestampType DEFAULT_TIMESTAMP_TYPE = TimestampType.CREATE_TIME;

    @Mock
    private SourceTaskContext context;

    @Mock
    private Consumer consumer;

    @Mock
    private OffsetStorageReader offsetReader;

    @Mock
    private ConsumerOffsetsTranslator offsetsReplicator;

    @Mock
    private ConsumerOffsetsTopicCommitter offsetsCommiter;

    @Mock
    private ConsumerTimestampsWriter writer;

    @Mock
    private ConfluentReplicatorTaskMetricsGroup metricsGroup;
    private final String taskId = "replicator-1";
    private final Converter byteArrayConverter = new ByteArrayConverter();
    private final int topicCreateBackoffMs = 10000;
    private final int topicConfigSyncIntervalMs = 120000;
    private final String sourceTopic = "foo";
    private final String topicRenameFormat = "dc.${topic}";
    private final String destTopic = "dc.foo";

    @DataPoint
    public static TimestampType[] typeParams() {
        return new TimestampType[]{TimestampType.NO_TIMESTAMP_TYPE, TimestampType.CREATE_TIME, TimestampType.LOG_APPEND_TIME};
    }

    @DataPoint
    public static TimestampType[] userDefinedTypeParams() {
        return new TimestampType[]{TimestampType.CREATE_TIME, TimestampType.LOG_APPEND_TIME, null};
    }

    @Before
    public void setup() {
        this.time = new MockTime();
        this.context = (SourceTaskContext) createMock(SourceTaskContext.class);
        this.consumer = (Consumer) createMock(Consumer.class);
        this.offsetReader = (OffsetStorageReader) createMock(OffsetStorageReader.class);
        this.offsetsReplicator = (ConsumerOffsetsTranslator) createMock(ConsumerOffsetsTranslator.class);
        this.offsetsCommiter = (ConsumerOffsetsTopicCommitter) createMock(ConsumerOffsetsTopicCommitter.class);
        this.metricsGroup = (ConfluentReplicatorTaskMetricsGroup) createMock(ConfluentReplicatorTaskMetricsGroup.class);
        this.writer = (ConsumerTimestampsWriter) createMock(ConsumerTimestampsWriter.class);
    }

    @Test
    public void testVersion() {
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = (ReplicatorSourceTaskConfig) createMock(ReplicatorSourceTaskConfig.class);
        ReplicatorAdminClient replicatorAdminClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient replicatorAdminClient2 = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        TranslatorMonitor translatorMonitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.getTopicPreservePartitions())).andStubReturn(true);
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.getTopicAutoCreate())).andStubReturn(false);
        EasyMock.expect(this.offsetsReplicator.topic()).andReturn("__consumer_timestamps");
        replayAll();
        String version = new ReplicatorSourceTask(replicatorSourceTaskConfig, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, this.byteArrayConverter, this.byteArrayConverter, this.metricsGroup).version();
        Assert.assertNotNull(version);
        Assert.assertFalse(version.isEmpty());
        verifyAll();
    }

    @Test
    public void testTaskStart() throws Exception {
        HashMap hashMap = new HashMap();
        ReplicatorAdminClient replicatorAdminClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient replicatorAdminClient2 = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        TranslatorMonitor translatorMonitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        EasyMock.expect(replicatorAdminClient.clusterId()).andReturn("srcClusterId");
        EasyMock.expect(replicatorAdminClient2.clusterId()).andReturn("destClusterId");
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = (ReplicatorSourceTaskConfig) createMock(ReplicatorSourceTaskConfig.class);
        ConsumerPartitionAssignor.Assignment assignment = new ConsumerPartitionAssignor.Assignment(Collections.emptyList());
        EasyMock.expect(replicatorSourceTaskConfig.getTaskId()).andReturn("replicator-0");
        EasyMock.expect(replicatorSourceTaskConfig.getSourceKeyConverter()).andReturn(new ByteArrayConverter());
        EasyMock.expect(replicatorSourceTaskConfig.getSourceValueConverter()).andReturn(new ByteArrayConverter());
        EasyMock.expect(replicatorSourceTaskConfig.getSourceHeaderConverter()).andReturn(new ByteArrayConverter());
        EasyMock.expect(replicatorSourceTaskConfig.originalsStrings()).andReturn((Object) null);
        EasyMock.expect(replicatorSourceTaskConfig.getString(EasyMock.anyString())).andReturn((Object) null).times(3);
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.isOffsetTimestampsCommitEnabled())).andReturn(false);
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.isOffsetTopicCommitEnabled())).andReturn(false);
        EasyMock.expect(replicatorSourceTaskConfig.getSchemaRegistryTopic()).andReturn((Object) null);
        EasyMock.expect(this.offsetsReplicator.topic()).andReturn("__consumer_timestamps");
        EasyMock.expect(Integer.valueOf(replicatorSourceTaskConfig.getOffsetTranslatorBatchPeriodMs())).andReturn(0);
        EasyMock.expect(Integer.valueOf(replicatorSourceTaskConfig.getOffsetTranslatorBatchSize())).andReturn(0);
        EasyMock.expect(replicatorSourceTaskConfig.getPartitionAssignment()).andReturn(assignment);
        this.consumer.assign((Collection) EasyMock.anyObject(List.class));
        EasyMock.expectLastCall();
        replicatorAdminClient2.setInterestedTopics((Set) EasyMock.anyObject(Set.class), (ReplicatorAdminClient.TopicMetadataListener) EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        translatorMonitor.setInterestedTranslators((Collection) EasyMock.anyObject(Collection.class), (TranslatorMonitor.TranslatorListener) EasyMock.anyObject(TranslatorMonitor.TranslatorListener.class));
        EasyMock.expectLastCall();
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(ConsumerRecords.empty());
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.getTopicPreservePartitions())).andStubReturn(true);
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.isProvenanceHeaderEnabled())).andReturn(true);
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(replicatorSourceTaskConfig, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, this.byteArrayConverter, this.byteArrayConverter, this.metricsGroup);
        replicatorSourceTask.start(hashMap);
        Assert.assertTrue(replicatorSourceTask.poll().isEmpty());
        verifyAll();
    }

    @Test
    public void testTaskStartSameClusterId() throws Exception {
        HashMap hashMap = new HashMap();
        ReplicatorAdminClient replicatorAdminClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient replicatorAdminClient2 = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        TranslatorMonitor translatorMonitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        EasyMock.expect(replicatorAdminClient.clusterId()).andReturn("srcClusterId");
        EasyMock.expect(replicatorAdminClient2.clusterId()).andReturn("srcClusterId");
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = (ReplicatorSourceTaskConfig) createMock(ReplicatorSourceTaskConfig.class);
        ConsumerPartitionAssignor.Assignment assignment = new ConsumerPartitionAssignor.Assignment(Collections.emptyList());
        EasyMock.expect(replicatorSourceTaskConfig.getTaskId()).andReturn("replicator-0");
        EasyMock.expect(replicatorSourceTaskConfig.getSourceKeyConverter()).andReturn(new ByteArrayConverter());
        EasyMock.expect(replicatorSourceTaskConfig.getSourceValueConverter()).andReturn(new ByteArrayConverter());
        EasyMock.expect(replicatorSourceTaskConfig.getSourceHeaderConverter()).andReturn(new ByteArrayConverter());
        EasyMock.expect(replicatorSourceTaskConfig.originalsStrings()).andReturn((Object) null);
        EasyMock.expect(replicatorSourceTaskConfig.getString(EasyMock.anyString())).andReturn((Object) null).times(3);
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.isOffsetTimestampsCommitEnabled())).andReturn(false);
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.isOffsetTopicCommitEnabled())).andReturn(false);
        EasyMock.expect(replicatorSourceTaskConfig.getSchemaRegistryTopic()).andReturn((Object) null);
        EasyMock.expect(this.offsetsReplicator.topic()).andReturn("__consumer_timestamps");
        EasyMock.expect(Integer.valueOf(replicatorSourceTaskConfig.getOffsetTranslatorBatchPeriodMs())).andReturn(0);
        EasyMock.expect(Integer.valueOf(replicatorSourceTaskConfig.getOffsetTranslatorBatchSize())).andReturn(0);
        EasyMock.expect(replicatorSourceTaskConfig.getPartitionAssignment()).andReturn(assignment);
        this.consumer.assign((Collection) EasyMock.anyObject(List.class));
        EasyMock.expectLastCall();
        replicatorAdminClient2.setInterestedTopics((Set) EasyMock.anyObject(Set.class), (ReplicatorAdminClient.TopicMetadataListener) EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        translatorMonitor.setInterestedTranslators((Collection) EasyMock.anyObject(Collection.class), (TranslatorMonitor.TranslatorListener) EasyMock.anyObject(TranslatorMonitor.TranslatorListener.class));
        EasyMock.expectLastCall();
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(ConsumerRecords.empty());
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.getTopicPreservePartitions())).andStubReturn(true);
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.isProvenanceHeaderEnabled())).andReturn(true);
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(replicatorSourceTaskConfig, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, this.byteArrayConverter, this.byteArrayConverter, this.metricsGroup);
        replicatorSourceTask.start(hashMap);
        Assert.assertTrue(replicatorSourceTask.poll().isEmpty());
        verifyAll();
    }

    @Test
    public void testConsumerRecordWithNoTimestamp() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        List<SourceRecord> expectSimpleReceive = expectSimpleReceive("foo", topicPartition, "dc.${topic}", new ConsumerRecords<>(Collections.singletonMap(topicPartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, new byte[0], new byte[0])))), null);
        Assert.assertEquals(1L, expectSimpleReceive.size());
        Assert.assertNull(expectSimpleReceive.get(0).timestamp());
        verifyAll();
    }

    @Test
    public void testPartitionPreservationDisabled() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        List<SourceRecord> expectSimpleReceive = expectSimpleReceive("foo", topicPartition, "dc.${topic}", new ConsumerRecords<>(Collections.singletonMap(topicPartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, new byte[0], new byte[0])))), null, false);
        Assert.assertEquals(1L, expectSimpleReceive.size());
        Assert.assertNull(expectSimpleReceive.get(0).kafkaPartition());
        verifyAll();
    }

    @Test
    public void testHeaderReplication() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        HashMap hashMap = new HashMap();
        ReplicatorAdminClient replicatorAdminClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient replicatorAdminClient2 = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        TranslatorMonitor translatorMonitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        EasyMock.expect(replicatorAdminClient.clusterId()).andReturn("srcClusterId");
        EasyMock.expect(replicatorAdminClient2.clusterId()).andReturn("destClusterId");
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = (ReplicatorSourceTaskConfig) createMock(ReplicatorSourceTaskConfig.class);
        ConsumerPartitionAssignor.Assignment assignment = new ConsumerPartitionAssignor.Assignment(Collections.emptyList());
        EasyMock.expect(replicatorSourceTaskConfig.getTaskId()).andReturn("replicator-0");
        EasyMock.expect(replicatorSourceTaskConfig.getSourceKeyConverter()).andReturn(new ByteArrayConverter());
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.getTopicPreservePartitions())).andReturn(true);
        EasyMock.expect(replicatorSourceTaskConfig.getTopicRenameFormat()).andReturn("dc.${topic}");
        EasyMock.expect(replicatorSourceTaskConfig.getSourceValueConverter()).andReturn(new ByteArrayConverter());
        EasyMock.expect(replicatorSourceTaskConfig.getSourceHeaderConverter()).andReturn(new ByteArrayConverter());
        EasyMock.expect(replicatorSourceTaskConfig.originalsStrings()).andReturn((Object) null);
        EasyMock.expect(replicatorSourceTaskConfig.getString(EasyMock.anyString())).andReturn((Object) null).times(3);
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.isOffsetTimestampsCommitEnabled())).andReturn(false);
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.isOffsetTopicCommitEnabled())).andReturn(false);
        EasyMock.expect(replicatorSourceTaskConfig.getSchemaRegistryTopic()).andReturn((Object) null);
        EasyMock.expect(this.offsetsReplicator.topic()).andReturn("__consumer_timestamps");
        EasyMock.expect(Integer.valueOf(replicatorSourceTaskConfig.getOffsetTranslatorBatchPeriodMs())).andReturn(0);
        EasyMock.expect(Integer.valueOf(replicatorSourceTaskConfig.getOffsetTranslatorBatchSize())).andReturn(0);
        EasyMock.expect(replicatorSourceTaskConfig.getPartitionAssignment()).andReturn(assignment);
        this.consumer.assign((Collection) EasyMock.anyObject(List.class));
        EasyMock.expectLastCall();
        replicatorAdminClient2.setInterestedTopics((Set) EasyMock.anyObject(Set.class), (ReplicatorAdminClient.TopicMetadataListener) EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        translatorMonitor.setInterestedTranslators((Collection) EasyMock.anyObject(Collection.class), (TranslatorMonitor.TranslatorListener) EasyMock.anyObject(TranslatorMonitor.TranslatorListener.class));
        EasyMock.expectLastCall();
        RecordHeaders recordHeaders = new RecordHeaders();
        recordHeaders.add(new RecordHeader("my_header", "some_value".getBytes()));
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(new ConsumerRecords(Collections.singletonMap(topicPartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, new byte[0], new byte[0], recordHeaders)))));
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.isProvenanceHeaderEnabled())).andReturn(true).times(3);
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(replicatorSourceTaskConfig, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, this.byteArrayConverter, this.byteArrayConverter, this.metricsGroup);
        replicatorSourceTask.start(hashMap);
        List poll = replicatorSourceTask.poll();
        Assert.assertEquals(1L, poll.size());
        Assert.assertEquals(2L, ((SourceRecord) poll.get(0)).headers().size());
        Header header = (Header) ((SourceRecord) poll.get(0)).headers().iterator().next();
        Assert.assertEquals("my_header", header.key());
        Assert.assertArrayEquals("some_value".getBytes(), (byte[]) header.value());
        verifyAll();
    }

    @Test
    public void testProvenanceHeaderFilter() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        HashMap hashMap = new HashMap();
        ReplicatorAdminClient replicatorAdminClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient replicatorAdminClient2 = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        TranslatorMonitor translatorMonitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        EasyMock.expect(replicatorAdminClient.clusterId()).andReturn("srcClusterId");
        EasyMock.expect(replicatorAdminClient2.clusterId()).andReturn("destClusterId");
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = (ReplicatorSourceTaskConfig) createMock(ReplicatorSourceTaskConfig.class);
        ConsumerPartitionAssignor.Assignment assignment = new ConsumerPartitionAssignor.Assignment(Collections.emptyList());
        EasyMock.expect(replicatorSourceTaskConfig.getTaskId()).andReturn("replicator-0");
        EasyMock.expect(replicatorSourceTaskConfig.getSourceKeyConverter()).andReturn(new ByteArrayConverter());
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.getTopicPreservePartitions())).andReturn(true);
        EasyMock.expect(replicatorSourceTaskConfig.getTopicRenameFormat()).andReturn("dc.${topic}");
        EasyMock.expect(replicatorSourceTaskConfig.getSourceValueConverter()).andReturn(new ByteArrayConverter());
        EasyMock.expect(replicatorSourceTaskConfig.getSourceHeaderConverter()).andReturn(new ByteArrayConverter());
        EasyMock.expect(replicatorSourceTaskConfig.originalsStrings()).andReturn((Object) null);
        EasyMock.expect(replicatorSourceTaskConfig.getString(EasyMock.anyString())).andReturn((Object) null).times(3);
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.isOffsetTimestampsCommitEnabled())).andReturn(false);
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.isOffsetTopicCommitEnabled())).andReturn(true);
        EasyMock.expect(Integer.valueOf(replicatorSourceTaskConfig.getOffsetTopicCommitBatchPeriodMs())).andReturn(-1);
        EasyMock.expect(replicatorSourceTaskConfig.getSchemaRegistryTopic()).andReturn((Object) null);
        EasyMock.expect(this.offsetsReplicator.topic()).andReturn("__consumer_timestamps");
        EasyMock.expect(Integer.valueOf(replicatorSourceTaskConfig.getOffsetTranslatorBatchPeriodMs())).andReturn(0);
        EasyMock.expect(Integer.valueOf(replicatorSourceTaskConfig.getOffsetTranslatorBatchSize())).andReturn(0);
        EasyMock.expect(replicatorSourceTaskConfig.getPartitionAssignment()).andReturn(assignment);
        this.consumer.assign((Collection) EasyMock.anyObject(List.class));
        EasyMock.expectLastCall();
        replicatorAdminClient2.setInterestedTopics((Set) EasyMock.anyObject(Set.class), (ReplicatorAdminClient.TopicMetadataListener) EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        translatorMonitor.setInterestedTranslators((Collection) EasyMock.anyObject(Collection.class), (TranslatorMonitor.TranslatorListener) EasyMock.anyObject(TranslatorMonitor.TranslatorListener.class));
        EasyMock.expectLastCall();
        RecordHeaders recordHeaders = new RecordHeaders();
        recordHeaders.add(new RecordHeader("__replicator_id", "some_value".getBytes()));
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(new ConsumerRecords(Collections.singletonMap(topicPartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, new byte[0], new byte[0], recordHeaders)))));
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.isProvenanceHeaderEnabled())).andReturn(true).times(2);
        replayAll();
        new ReplicatorSourceTask(replicatorSourceTaskConfig, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, this.byteArrayConverter, this.byteArrayConverter, this.metricsGroup).start(hashMap);
        Assert.assertEquals(0L, r0.poll().size());
        verifyAll();
    }

    @Theory
    public void testTimestampTypeOverride(TimestampType timestampType, TimestampType timestampType2) throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        expectSimpleReceive("foo", topicPartition, "dc.${topic}", new ConsumerRecords<>(Collections.singletonMap(topicPartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, 500L, timestampType, 0L, 0, 0, new byte[0], new byte[0])))), timestampType2);
        verifyAll();
    }

    @Test
    public void testTopicRename() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        List<SourceRecord> expectSimpleReceive = expectSimpleReceive("foo", topicPartition, "dc.${topic}", new ConsumerRecords<>(Collections.singletonMap(topicPartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, 500L, TimestampType.CREATE_TIME, 0L, 0, 0, new byte[0], new byte[0])))), null);
        Assert.assertEquals(1L, expectSimpleReceive.size());
        SourceRecord sourceRecord = expectSimpleReceive.get(0);
        Assert.assertEquals("dc.foo", sourceRecord.topic());
        Assert.assertEquals(0L, sourceRecord.kafkaPartition().intValue());
        Assert.assertEquals(500L, sourceRecord.timestamp().longValue());
        Map sourcePartition = sourceRecord.sourcePartition();
        Assert.assertEquals("foo", sourcePartition.get("topic"));
        Assert.assertEquals(0, sourcePartition.get("partition"));
        verifyAll();
    }

    @Test
    public void testNonExistingTopicCreation() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        TopicPartition topicPartition2 = new TopicPartition("dc.foo", 0);
        TopicPartition topicPartition3 = new TopicPartition("bar", 0);
        TopicPartition topicPartition4 = new TopicPartition("dc.bar", 0);
        EasyMock.expect(this.context.offsetStorageReader()).andStubReturn(this.offsetReader);
        ReplicatorAdminClient replicatorAdminClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient replicatorAdminClient2 = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        TranslatorMonitor translatorMonitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = setupTaskConfigMock("dc.${topic}", true, true, 10000, true, 120000, DEFAULT_TIMESTAMP_TYPE);
        EasyMock.expect(this.offsetsReplicator.topic()).andReturn("__consumer_timestamps");
        List asList = Arrays.asList(topicPartition, topicPartition3);
        this.consumer.assign(asList);
        EasyMock.expectLastCall();
        replicatorAdminClient2.setInterestedTopics((Set) EasyMock.eq(new HashSet(Arrays.asList("dc.foo", "dc.bar"))), (ReplicatorAdminClient.TopicMetadataListener) EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        translatorMonitor.setInterestedTranslators((Collection) EasyMock.anyObject(Collection.class), (TranslatorMonitor.TranslatorListener) EasyMock.anyObject(TranslatorMonitor.TranslatorListener.class));
        EasyMock.expectLastCall();
        Node node = new Node(0, "localhost", 9092);
        EasyMock.expect(this.consumer.partitionsFor("foo")).andStubReturn(Collections.singletonList(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})));
        EasyMock.expect(this.consumer.partitionsFor("bar")).andStubReturn(Collections.singletonList(new PartitionInfo("bar", 0, node, new Node[]{node}, new Node[]{node})));
        Properties properties = new Properties();
        EasyMock.expect(replicatorAdminClient.topicConfig("foo")).andStubReturn(properties);
        EasyMock.expect(replicatorAdminClient2.topicMetadata("dc.foo")).andReturn((Object) null);
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.partitionExists(topicPartition2))).andReturn(false);
        EasyMock.expect(replicatorSourceTaskConfig.getOffsetStart()).andReturn(ReplicatorSourceConnectorConfig.OffsetStart.CONNECT);
        EasyMock.expect(this.offsetReader.offset(Utils.toConnectPartition("foo", 0))).andReturn((Object) null);
        EasyMock.expect(this.consumer.committed(topicPartition)).andReturn((Object) null);
        this.consumer.seekToBeginning(Collections.singleton(topicPartition));
        EasyMock.expectLastCall();
        this.consumer.pause(Collections.singleton(topicPartition));
        EasyMock.expectLastCall();
        EasyMock.expect(replicatorAdminClient.topicConfig("bar")).andStubReturn(properties);
        EasyMock.expect(replicatorAdminClient2.topicMetadata("dc.bar")).andReturn((Object) null);
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.partitionExists(topicPartition4))).andReturn(false);
        EasyMock.expect(replicatorSourceTaskConfig.getOffsetStart()).andReturn(ReplicatorSourceConnectorConfig.OffsetStart.CONNECT);
        EasyMock.expect(this.offsetReader.offset(Utils.toConnectPartition("bar", 0))).andReturn((Object) null);
        EasyMock.expect(this.consumer.committed(topicPartition3)).andReturn((Object) null);
        this.consumer.seekToBeginning(Collections.singleton(topicPartition3));
        EasyMock.expectLastCall();
        this.consumer.pause(Collections.singleton(topicPartition3));
        EasyMock.expectLastCall();
        expectTopicCreation("dc.foo", properties, replicatorAdminClient2);
        expectTopicCreation("dc.bar", properties, replicatorAdminClient2);
        this.offsetsCommiter.checkCommit();
        EasyMock.expect(Long.valueOf(this.offsetsReplicator.nextDeadline())).andReturn(Long.MAX_VALUE);
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(ConsumerRecords.empty());
        EasyMock.expect(Boolean.valueOf(this.offsetsReplicator.isDestinationReady())).andReturn(true);
        ArrayList arrayList = new ArrayList();
        EasyMock.expect(this.offsetsReplicator.translateCollectedRecords()).andReturn(arrayList);
        this.offsetsCommiter.commitRecords(arrayList);
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(replicatorSourceTaskConfig, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, this.byteArrayConverter, this.byteArrayConverter, (ConsumerTimestampsCommitter) null, this.offsetsCommiter, this.metricsGroup);
        replicatorSourceTask.doStart(asList);
        Assert.assertTrue(replicatorSourceTask.poll().isEmpty());
        verifyAll();
    }

    @Test
    public void testMetadataUpdateDuringStartup() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        TopicPartition topicPartition2 = new TopicPartition("dc.foo", 0);
        List singletonList = Collections.singletonList(topicPartition);
        EasyMock.expect(this.context.offsetStorageReader()).andStubReturn(this.offsetReader);
        ReplicatorAdminClient replicatorAdminClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient replicatorAdminClient2 = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        TranslatorMonitor translatorMonitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = setupTaskConfigMock("dc.${topic}", true, true, 10000, true, 120000, DEFAULT_TIMESTAMP_TYPE);
        EasyMock.expect(this.offsetsReplicator.topic()).andReturn("__consumer_timestamps");
        this.consumer.assign(singletonList);
        EasyMock.expectLastCall();
        final Capture newInstance = Capture.newInstance();
        replicatorAdminClient2.setInterestedTopics((Set) EasyMock.eq(Collections.singleton("dc.foo")), (ReplicatorAdminClient.TopicMetadataListener) EasyMock.capture(newInstance));
        EasyMock.expectLastCall();
        translatorMonitor.setInterestedTranslators((Collection) EasyMock.anyObject(Collection.class), (TranslatorMonitor.TranslatorListener) EasyMock.anyObject(TranslatorMonitor.TranslatorListener.class));
        EasyMock.expectLastCall();
        final Node node = new Node(0, "localhost", 9092);
        EasyMock.expect(this.consumer.partitionsFor("foo")).andAnswer(new IAnswer<List<PartitionInfo>>() { // from class: io.confluent.connect.replicator.ReplicatorSourceTaskTest.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public List<PartitionInfo> m4answer() throws Throwable {
                ((ReplicatorAdminClient.TopicMetadataListener) newInstance.getValue()).onTopicMetadataRefresh();
                return Collections.singletonList(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
            }
        });
        EasyMock.expect(replicatorSourceTaskConfig.getOffsetStart()).andReturn(ReplicatorSourceConnectorConfig.OffsetStart.CONNECT);
        EasyMock.expect(this.offsetReader.offset(Utils.toConnectPartition("foo", 0))).andReturn((Object) null);
        EasyMock.expect(this.consumer.committed(topicPartition)).andReturn((Object) null);
        this.consumer.seekToBeginning(Collections.singleton(topicPartition));
        EasyMock.expectLastCall();
        EasyMock.expect(replicatorAdminClient2.topicMetadata("dc.foo")).andStubReturn(new TopicMetadata("dc.foo", 1));
        EasyMock.expect(Integer.valueOf(replicatorAdminClient2.aliveBrokers())).andStubReturn(1);
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.partitionExists(topicPartition2))).andReturn(true);
        replayAll();
        new ReplicatorSourceTask(replicatorSourceTaskConfig, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, this.byteArrayConverter, this.byteArrayConverter, this.metricsGroup).doStart(singletonList);
        verifyAll();
    }

    @Test
    public void testUpdateTopicConfiguration() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        TopicPartition topicPartition2 = new TopicPartition("dc.foo", 0);
        List singletonList = Collections.singletonList(topicPartition);
        EasyMock.expect(this.context.offsetStorageReader()).andStubReturn(this.offsetReader);
        ReplicatorAdminClient replicatorAdminClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient replicatorAdminClient2 = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        TranslatorMonitor translatorMonitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = setupTaskConfigMock("dc.${topic}", true, true, 10000, true, 120000, DEFAULT_TIMESTAMP_TYPE);
        EasyMock.expect(this.offsetsReplicator.topic()).andReturn("__consumer_timestamps");
        this.consumer.assign(singletonList);
        EasyMock.expectLastCall();
        EasyMock.expect(Long.valueOf(this.offsetsReplicator.nextDeadline())).andReturn(Long.MAX_VALUE);
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(ConsumerRecords.empty());
        EasyMock.expect(Boolean.valueOf(this.offsetsReplicator.isDestinationReady())).andReturn(true);
        EasyMock.expect(this.offsetsReplicator.translateCollectedRecords()).andReturn(new ArrayList());
        replicatorAdminClient2.setInterestedTopics((Set) EasyMock.eq(Collections.singleton("dc.foo")), (ReplicatorAdminClient.TopicMetadataListener) EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        translatorMonitor.setInterestedTranslators((Collection) EasyMock.anyObject(Collection.class), (TranslatorMonitor.TranslatorListener) EasyMock.anyObject(TranslatorMonitor.TranslatorListener.class));
        EasyMock.expectLastCall();
        Node node = new Node(0, "localhost", 9092);
        EasyMock.expect(this.consumer.partitionsFor("foo")).andStubReturn(Collections.singletonList(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})));
        Properties properties = new Properties();
        properties.put("max.message.bytes", "2000000");
        EasyMock.expect(replicatorSourceTaskConfig.getOffsetStart()).andReturn(ReplicatorSourceConnectorConfig.OffsetStart.CONNECT);
        EasyMock.expect(this.offsetReader.offset(Utils.toConnectPartition("foo", 0))).andReturn((Object) null);
        EasyMock.expect(this.consumer.committed(topicPartition)).andReturn((Object) null);
        this.consumer.seekToBeginning(Collections.singleton(topicPartition));
        EasyMock.expectLastCall();
        EasyMock.expect(replicatorAdminClient2.topicMetadata("dc.foo")).andStubReturn(new TopicMetadata("dc.foo", 1));
        EasyMock.expect(Integer.valueOf(replicatorAdminClient2.aliveBrokers())).andStubReturn(1);
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.partitionExists(topicPartition2))).andReturn(true);
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.topicExists("dc.foo"))).andReturn(true);
        EasyMock.expect(replicatorAdminClient.topicConfig("foo")).andReturn(properties);
        EasyMock.expect(replicatorAdminClient2.topicConfig("dc.foo")).andReturn(new Properties());
        replicatorAdminClient2.changeTopicConfig("dc.foo", properties);
        EasyMock.expectLastCall();
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(replicatorSourceTaskConfig, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, this.byteArrayConverter, this.byteArrayConverter, this.metricsGroup);
        replicatorSourceTask.doStart(singletonList);
        Assert.assertTrue(replicatorSourceTask.poll().isEmpty());
        verifyAll();
    }

    @Test
    public void testConfigNotQueriedWhenAutoSyncDisabled() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        TopicPartition topicPartition2 = new TopicPartition("dc.foo", 0);
        List singletonList = Collections.singletonList(topicPartition);
        EasyMock.expect(this.context.offsetStorageReader()).andStubReturn(this.offsetReader);
        ReplicatorAdminClient replicatorAdminClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient replicatorAdminClient2 = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        TranslatorMonitor translatorMonitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = setupTaskConfigMock("dc.${topic}", true, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        EasyMock.expect(this.offsetsReplicator.topic()).andReturn("__consumer_timestamps");
        this.consumer.assign(singletonList);
        EasyMock.expectLastCall();
        EasyMock.expect(Long.valueOf(this.offsetsReplicator.nextDeadline())).andReturn(Long.MAX_VALUE);
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(ConsumerRecords.empty());
        EasyMock.expect(Boolean.valueOf(this.offsetsReplicator.isDestinationReady())).andReturn(true);
        EasyMock.expect(this.offsetsReplicator.translateCollectedRecords()).andReturn(new ArrayList());
        replicatorAdminClient2.setInterestedTopics((Set) EasyMock.eq(Collections.singleton("dc.foo")), (ReplicatorAdminClient.TopicMetadataListener) EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        translatorMonitor.setInterestedTranslators((Collection) EasyMock.anyObject(Collection.class), (TranslatorMonitor.TranslatorListener) EasyMock.anyObject(TranslatorMonitor.TranslatorListener.class));
        EasyMock.expectLastCall();
        Node node = new Node(0, "localhost", 9092);
        EasyMock.expect(this.consumer.partitionsFor("foo")).andStubReturn(Collections.singletonList(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})));
        EasyMock.expect(replicatorSourceTaskConfig.getOffsetStart()).andReturn(ReplicatorSourceConnectorConfig.OffsetStart.CONNECT);
        EasyMock.expect(this.offsetReader.offset(Utils.toConnectPartition("foo", 0))).andReturn((Object) null);
        EasyMock.expect(this.consumer.committed(topicPartition)).andReturn((Object) null);
        this.consumer.seekToBeginning(Collections.singleton(topicPartition));
        EasyMock.expectLastCall();
        EasyMock.expect(replicatorAdminClient2.topicMetadata("dc.foo")).andStubReturn(new TopicMetadata("dc.foo", 1));
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.partitionExists(topicPartition2))).andReturn(true);
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(replicatorSourceTaskConfig, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, this.byteArrayConverter, this.byteArrayConverter, this.metricsGroup);
        replicatorSourceTask.doStart(singletonList);
        Assert.assertTrue(replicatorSourceTask.poll().isEmpty());
        verifyAll();
    }

    @Test
    public void testSkipTopicCreationIfNotEnoughBrokers() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        TopicPartition topicPartition2 = new TopicPartition("dc.foo", 0);
        List singletonList = Collections.singletonList(topicPartition);
        EasyMock.expect(this.context.offsetStorageReader()).andStubReturn(this.offsetReader);
        ReplicatorAdminClient replicatorAdminClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient replicatorAdminClient2 = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        TranslatorMonitor translatorMonitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = setupTaskConfigMock("dc.${topic}", true, true, 10000, true, 120000, DEFAULT_TIMESTAMP_TYPE);
        EasyMock.expect(this.offsetsReplicator.topic()).andReturn("__consumer_timestamps");
        this.consumer.assign(singletonList);
        EasyMock.expectLastCall();
        EasyMock.expect(Long.valueOf(this.offsetsReplicator.nextDeadline())).andReturn(Long.MAX_VALUE);
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(ConsumerRecords.empty());
        EasyMock.expect(Boolean.valueOf(this.offsetsReplicator.isDestinationReady())).andReturn(true);
        EasyMock.expect(this.offsetsReplicator.translateCollectedRecords()).andReturn(new ArrayList());
        replicatorAdminClient2.setInterestedTopics((Set) EasyMock.eq(Collections.singleton("dc.foo")), (ReplicatorAdminClient.TopicMetadataListener) EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        translatorMonitor.setInterestedTranslators((Collection) EasyMock.anyObject(Collection.class), (TranslatorMonitor.TranslatorListener) EasyMock.anyObject(TranslatorMonitor.TranslatorListener.class));
        EasyMock.expectLastCall();
        Node node = new Node(0, "localhost", 9092);
        Node node2 = new Node(0, "localhost", 9093);
        EasyMock.expect(this.consumer.partitionsFor("foo")).andStubReturn(Collections.singletonList(new PartitionInfo("foo", 0, node, new Node[]{node, node2}, new Node[]{node, node2})));
        EasyMock.expect(replicatorAdminClient.topicConfig("foo")).andReturn(new Properties());
        EasyMock.expect(replicatorSourceTaskConfig.getTopicTimestampType()).andStubReturn(DEFAULT_TIMESTAMP_TYPE.toString());
        EasyMock.expect(replicatorAdminClient2.topicMetadata("dc.foo")).andStubReturn((Object) null);
        EasyMock.expect(Integer.valueOf(replicatorAdminClient2.aliveBrokers())).andReturn(1);
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.partitionExists(topicPartition2))).andReturn(false);
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.topicExists("dc.foo"))).andReturn(false);
        EasyMock.expect(replicatorSourceTaskConfig.getOffsetStart()).andReturn(ReplicatorSourceConnectorConfig.OffsetStart.CONNECT);
        EasyMock.expect(this.offsetReader.offset(Utils.toConnectPartition("foo", 0))).andReturn((Object) null);
        EasyMock.expect(this.consumer.committed(topicPartition)).andReturn((Object) null);
        this.consumer.pause(Collections.singleton(topicPartition));
        EasyMock.expectLastCall();
        this.consumer.seekToBeginning(Collections.singleton(topicPartition));
        EasyMock.expectLastCall();
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(replicatorSourceTaskConfig, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, this.byteArrayConverter, this.byteArrayConverter, this.metricsGroup);
        replicatorSourceTask.doStart(singletonList);
        Assert.assertTrue(replicatorSourceTask.poll().isEmpty());
        verifyAll();
    }

    @Test
    public void testTopicCreationRetry() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        TopicPartition topicPartition2 = new TopicPartition("dc.foo", 0);
        List singletonList = Collections.singletonList(topicPartition);
        EasyMock.expect(this.context.offsetStorageReader()).andStubReturn(this.offsetReader);
        ReplicatorAdminClient replicatorAdminClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient replicatorAdminClient2 = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        TranslatorMonitor translatorMonitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = setupTaskConfigMock("dc.${topic}", true, true, 5000, true, 10000, DEFAULT_TIMESTAMP_TYPE);
        EasyMock.expect(this.offsetsReplicator.topic()).andReturn("__consumer_timestamps");
        this.consumer.assign(singletonList);
        EasyMock.expectLastCall();
        replicatorAdminClient2.setInterestedTopics((Set) EasyMock.eq(Collections.singleton("dc.foo")), (ReplicatorAdminClient.TopicMetadataListener) EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        translatorMonitor.setInterestedTranslators((Collection) EasyMock.anyObject(Collection.class), (TranslatorMonitor.TranslatorListener) EasyMock.anyObject(TranslatorMonitor.TranslatorListener.class));
        EasyMock.expectLastCall();
        this.consumer.seekToBeginning(Collections.singleton(topicPartition));
        EasyMock.expectLastCall();
        EasyMock.expect(replicatorSourceTaskConfig.getOffsetStart()).andReturn(ReplicatorSourceConnectorConfig.OffsetStart.CONNECT);
        EasyMock.expect(this.offsetReader.offset(Utils.toConnectPartition("foo", 0))).andReturn((Object) null);
        EasyMock.expect(this.consumer.committed(topicPartition)).andReturn((Object) null);
        this.consumer.pause(Collections.singleton(topicPartition));
        EasyMock.expectLastCall();
        Node node = new Node(0, "localhost", 9092);
        EasyMock.expect(this.consumer.partitionsFor("foo")).andStubReturn(Collections.singletonList(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})));
        Properties properties = new Properties();
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.createTopic("dc.foo", 1, (short) 1, properties))).andThrow(new RuntimeException());
        EasyMock.expect(Integer.valueOf(replicatorSourceTaskConfig.getTopicCreateBackoffMs())).andStubReturn(5000);
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.topicExists("dc.foo"))).andStubReturn(false);
        EasyMock.expect(replicatorAdminClient.topicConfig("foo")).andStubReturn(properties);
        EasyMock.expect(replicatorAdminClient2.topicMetadata("dc.foo")).andStubReturn((Object) null);
        EasyMock.expect(Integer.valueOf(replicatorAdminClient2.aliveBrokers())).andStubReturn(1);
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.partitionExists(topicPartition2))).andStubReturn(false);
        EasyMock.expect(Integer.valueOf(replicatorSourceTaskConfig.getTopicConfigSyncIntervalMs())).andReturn(10000);
        EasyMock.expect(Long.valueOf(this.offsetsReplicator.nextDeadline())).andReturn(Long.MAX_VALUE);
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(ConsumerRecords.empty());
        EasyMock.expect(Boolean.valueOf(this.offsetsReplicator.isDestinationReady())).andReturn(true);
        ArrayList arrayList = new ArrayList();
        EasyMock.expect(this.offsetsReplicator.translateCollectedRecords()).andReturn(arrayList);
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.getTopicPreservePartitions())).andStubReturn(true);
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(replicatorSourceTaskConfig, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, this.byteArrayConverter, this.byteArrayConverter, this.metricsGroup);
        replicatorSourceTask.doStart(singletonList);
        Assert.assertTrue(replicatorSourceTask.poll().isEmpty());
        verifyAll();
        resetAll();
        EasyMock.expect(Integer.valueOf(replicatorSourceTaskConfig.getTopicCreateBackoffMs())).andStubReturn(5000);
        EasyMock.expect(replicatorSourceTaskConfig.getTopicRenameFormat()).andStubReturn("dc.${topic}");
        EasyMock.expect(Integer.valueOf(replicatorSourceTaskConfig.getTopicConfigSyncIntervalMs())).andStubReturn(120000);
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.getTopicAutoCreate())).andStubReturn(true);
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.getTopicConfigSync())).andStubReturn(true);
        EasyMock.expect(replicatorSourceTaskConfig.getTopicTimestampType()).andStubReturn(DEFAULT_TIMESTAMP_TYPE.toString());
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.getTopicPreservePartitions())).andStubReturn(true);
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.partitionExists(topicPartition2))).andStubReturn(false);
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.topicExists("dc.foo"))).andStubReturn(false);
        EasyMock.expect(Long.valueOf(this.offsetsReplicator.nextDeadline())).andReturn(Long.MAX_VALUE);
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(ConsumerRecords.empty());
        EasyMock.expect(Boolean.valueOf(this.offsetsReplicator.isDestinationReady())).andReturn(true);
        EasyMock.expect(this.offsetsReplicator.translateCollectedRecords()).andReturn(arrayList);
        replayAll();
        this.time.sleep(5000 / 2);
        Assert.assertTrue(replicatorSourceTask.poll().isEmpty());
        verifyAll();
        resetAll();
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.partitionExists(topicPartition2))).andStubReturn(false);
        this.time.sleep(5000);
        EasyMock.expect(this.context.offsetStorageReader()).andStubReturn(this.offsetReader);
        EasyMock.expect(Integer.valueOf(replicatorSourceTaskConfig.getTopicCreateBackoffMs())).andStubReturn(0);
        EasyMock.expect(replicatorSourceTaskConfig.getTopicRenameFormat()).andStubReturn("dc.${topic}");
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.getTopicAutoCreate())).andStubReturn(true);
        EasyMock.expect(this.consumer.partitionsFor("foo")).andStubReturn(Collections.singletonList(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})));
        EasyMock.expect(replicatorAdminClient.topicConfig("foo")).andStubReturn(properties);
        EasyMock.expect(replicatorSourceTaskConfig.getTopicTimestampType()).andStubReturn(DEFAULT_TIMESTAMP_TYPE.toString());
        EasyMock.expect(Integer.valueOf(replicatorAdminClient2.aliveBrokers())).andStubReturn(1);
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.createTopic("dc.foo", 1, (short) 1, properties))).andReturn(true);
        EasyMock.expect(replicatorAdminClient2.topicMetadata("dc.foo")).andReturn((Object) null);
        EasyMock.expect(this.consumer.paused()).andReturn(Collections.singleton(topicPartition));
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.partitionExists(topicPartition2))).andReturn(true);
        this.consumer.resume(Collections.singleton(topicPartition));
        EasyMock.expectLastCall();
        EasyMock.expect(this.consumer.paused()).andReturn(Collections.emptySet());
        EasyMock.expect(Long.valueOf(this.offsetsReplicator.nextDeadline())).andReturn(Long.MAX_VALUE);
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(ConsumerRecords.empty());
        EasyMock.expect(Boolean.valueOf(this.offsetsReplicator.isDestinationReady())).andReturn(true);
        EasyMock.expect(this.offsetsReplicator.translateCollectedRecords()).andReturn(arrayList);
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.getTopicPreservePartitions())).andStubReturn(true);
        replayAll();
        Assert.assertTrue(replicatorSourceTask.poll().isEmpty());
        verifyAll();
    }

    @Test
    public void testSeekCommittedOffset() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 1);
        TopicPartition topicPartition2 = new TopicPartition("dc.foo", 1);
        List singletonList = Collections.singletonList(topicPartition);
        EasyMock.expect(this.context.offsetStorageReader()).andStubReturn(this.offsetReader);
        ReplicatorAdminClient replicatorAdminClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient replicatorAdminClient2 = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        TranslatorMonitor translatorMonitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = setupTaskConfigMock("dc.${topic}", true, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        EasyMock.expect(this.offsetsReplicator.topic()).andReturn("__consumer_timestamps");
        this.consumer.assign(singletonList);
        EasyMock.expectLastCall();
        replicatorAdminClient2.setInterestedTopics((Set) EasyMock.eq(Collections.singleton("dc.foo")), (ReplicatorAdminClient.TopicMetadataListener) EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        translatorMonitor.setInterestedTranslators((Collection) EasyMock.anyObject(Collection.class), (TranslatorMonitor.TranslatorListener) EasyMock.anyObject(TranslatorMonitor.TranslatorListener.class));
        EasyMock.expectLastCall();
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.partitionExists(topicPartition2))).andStubReturn(true);
        EasyMock.expect(replicatorSourceTaskConfig.getOffsetStart()).andReturn(ReplicatorSourceConnectorConfig.OffsetStart.CONNECT);
        EasyMock.expect(this.offsetReader.offset(Utils.toConnectPartition(topicPartition))).andReturn(Utils.toConnectOffset(50L));
        EasyMock.expect(this.consumer.committed(topicPartition)).andReturn((Object) null);
        this.consumer.seek(topicPartition, 51L);
        EasyMock.expectLastCall();
        EasyMock.expect(Long.valueOf(this.offsetsReplicator.nextDeadline())).andReturn(Long.MAX_VALUE);
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(ConsumerRecords.empty());
        EasyMock.expect(Boolean.valueOf(this.offsetsReplicator.isDestinationReady())).andReturn(true);
        EasyMock.expect(this.offsetsReplicator.translateCollectedRecords()).andReturn(new ArrayList());
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(replicatorSourceTaskConfig, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, this.byteArrayConverter, this.byteArrayConverter, this.metricsGroup);
        replicatorSourceTask.doStart(singletonList);
        Assert.assertTrue(replicatorSourceTask.poll().isEmpty());
        verifyAll();
    }

    @Test
    public void testUseConsumerOffset() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 1);
        TopicPartition topicPartition2 = new TopicPartition("dc.foo", 1);
        List singletonList = Collections.singletonList(topicPartition);
        EasyMock.expect(this.context.offsetStorageReader()).andStubReturn(this.offsetReader);
        ReplicatorAdminClient replicatorAdminClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient replicatorAdminClient2 = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        TranslatorMonitor translatorMonitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = setupTaskConfigMock("dc.${topic}", true, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        EasyMock.expect(this.offsetsReplicator.topic()).andReturn("__consumer_timestamps");
        this.consumer.assign(singletonList);
        EasyMock.expectLastCall();
        replicatorAdminClient2.setInterestedTopics((Set) EasyMock.eq(Collections.singleton("dc.foo")), (ReplicatorAdminClient.TopicMetadataListener) EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        translatorMonitor.setInterestedTranslators((Collection) EasyMock.anyObject(Collection.class), (TranslatorMonitor.TranslatorListener) EasyMock.anyObject(TranslatorMonitor.TranslatorListener.class));
        EasyMock.expectLastCall();
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.partitionExists(topicPartition2))).andStubReturn(true);
        EasyMock.expect(replicatorSourceTaskConfig.getOffsetStart()).andReturn(ReplicatorSourceConnectorConfig.OffsetStart.CONSUMER);
        EasyMock.expect(this.offsetReader.offset(Utils.toConnectPartition(topicPartition))).andReturn((Object) null);
        EasyMock.expect(this.consumer.committed(topicPartition)).andReturn(new OffsetAndMetadata(50L, ""));
        EasyMock.expect(Long.valueOf(this.offsetsReplicator.nextDeadline())).andReturn(Long.MAX_VALUE);
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(ConsumerRecords.empty());
        EasyMock.expect(Boolean.valueOf(this.offsetsReplicator.isDestinationReady())).andReturn(true);
        EasyMock.expect(this.offsetsReplicator.translateCollectedRecords()).andReturn(new ArrayList());
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(replicatorSourceTaskConfig, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, this.byteArrayConverter, this.byteArrayConverter, this.metricsGroup);
        replicatorSourceTask.doStart(singletonList);
        Assert.assertTrue(replicatorSourceTask.poll().isEmpty());
        verifyAll();
    }

    @Test
    public void testOffsetOutOfRangeHandling() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 1);
        TopicPartition topicPartition2 = new TopicPartition("dc.foo", 1);
        List singletonList = Collections.singletonList(topicPartition);
        EasyMock.expect(this.context.offsetStorageReader()).andStubReturn(this.offsetReader);
        ReplicatorAdminClient replicatorAdminClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient replicatorAdminClient2 = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        TranslatorMonitor translatorMonitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = setupTaskConfigMock("dc.${topic}", true, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        EasyMock.expect(this.offsetsReplicator.topic()).andReturn("__consumer_timestamps");
        this.consumer.assign(singletonList);
        EasyMock.expectLastCall();
        replicatorAdminClient2.setInterestedTopics((Set) EasyMock.eq(Collections.singleton("dc.foo")), (ReplicatorAdminClient.TopicMetadataListener) EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        translatorMonitor.setInterestedTranslators((Collection) EasyMock.anyObject(Collection.class), (TranslatorMonitor.TranslatorListener) EasyMock.anyObject(TranslatorMonitor.TranslatorListener.class));
        EasyMock.expectLastCall();
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.partitionExists(topicPartition2))).andStubReturn(true);
        EasyMock.expect(replicatorSourceTaskConfig.getOffsetStart()).andReturn(ReplicatorSourceConnectorConfig.OffsetStart.CONNECT);
        EasyMock.expect(this.offsetReader.offset(Utils.toConnectPartition(topicPartition))).andReturn((Object) null);
        this.consumer.seekToBeginning(Collections.singleton(topicPartition));
        EasyMock.expectLastCall();
        EasyMock.expect(Long.valueOf(this.offsetsReplicator.nextDeadline())).andReturn(Long.MAX_VALUE);
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andThrow(new OffsetOutOfRangeException(Collections.singletonMap(topicPartition, 51L)));
        EasyMock.expect(Boolean.valueOf(this.offsetsReplicator.isDestinationReady())).andReturn(true);
        EasyMock.expect(this.offsetsReplicator.translateCollectedRecords()).andReturn(new ArrayList());
        EasyMock.expectLastCall();
        EasyMock.expect(this.consumer.committed(topicPartition)).andReturn((Object) null);
        this.consumer.seekToBeginning(Collections.singleton(topicPartition));
        EasyMock.expectLastCall();
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(replicatorSourceTaskConfig, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, this.byteArrayConverter, this.byteArrayConverter, this.metricsGroup);
        replicatorSourceTask.doStart(singletonList);
        Assert.assertTrue(replicatorSourceTask.poll().isEmpty());
        verifyAll();
    }

    @Test
    public void testExpandingTopicPartitions() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        TopicPartition topicPartition2 = new TopicPartition("dc.foo", 0);
        List singletonList = Collections.singletonList(topicPartition);
        EasyMock.expect(this.context.offsetStorageReader()).andStubReturn(this.offsetReader);
        ReplicatorAdminClient replicatorAdminClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient replicatorAdminClient2 = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        TranslatorMonitor translatorMonitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = setupTaskConfigMock("dc.${topic}", true, true, 10000, true, 120000, DEFAULT_TIMESTAMP_TYPE);
        EasyMock.expect(this.offsetsReplicator.topic()).andReturn("__consumer_timestamps");
        this.consumer.assign(singletonList);
        EasyMock.expectLastCall();
        EasyMock.expect(this.consumer.paused()).andStubReturn(Collections.emptySet());
        EasyMock.expect(Long.valueOf(this.offsetsReplicator.nextDeadline())).andReturn(Long.MAX_VALUE);
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(ConsumerRecords.empty());
        EasyMock.expect(Boolean.valueOf(this.offsetsReplicator.isDestinationReady())).andReturn(true);
        EasyMock.expect(this.offsetsReplicator.translateCollectedRecords()).andReturn(new ArrayList());
        replicatorAdminClient2.setInterestedTopics((Set) EasyMock.eq(Collections.singleton("dc.foo")), (ReplicatorAdminClient.TopicMetadataListener) EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        translatorMonitor.setInterestedTranslators((Collection) EasyMock.anyObject(Collection.class), (TranslatorMonitor.TranslatorListener) EasyMock.anyObject(TranslatorMonitor.TranslatorListener.class));
        EasyMock.expectLastCall();
        Node node = new Node(0, "localhost", 9092);
        EasyMock.expect(this.consumer.partitionsFor("foo")).andStubReturn(Arrays.asList(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}), new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node})));
        EasyMock.expect(replicatorAdminClient2.topicMetadata("dc.foo")).andStubReturn(new TopicMetadata("dc.foo", 1));
        replicatorAdminClient2.addPartitions("dc.foo", 2);
        EasyMock.expectLastCall();
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.partitionExists(topicPartition2))).andReturn(true);
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.topicExists("dc.foo"))).andReturn(true);
        EasyMock.expect(replicatorSourceTaskConfig.getOffsetStart()).andReturn(ReplicatorSourceConnectorConfig.OffsetStart.CONNECT);
        EasyMock.expect(this.offsetReader.offset(Utils.toConnectPartition(topicPartition))).andReturn((Object) null);
        Properties properties = new Properties();
        EasyMock.expect(replicatorAdminClient.topicConfig("foo")).andReturn(new Properties());
        overrideTimestampType(DEFAULT_TIMESTAMP_TYPE, properties);
        replicatorAdminClient2.changeTopicConfig("dc.foo", properties);
        EasyMock.expect(replicatorSourceTaskConfig.getTopicTimestampType()).andStubReturn(DEFAULT_TIMESTAMP_TYPE.toString());
        EasyMock.expectLastCall();
        EasyMock.expect(replicatorAdminClient2.topicConfig("dc.foo")).andReturn(new Properties());
        EasyMock.expect(this.consumer.committed(topicPartition)).andReturn((Object) null);
        this.consumer.seekToBeginning(Collections.singleton(topicPartition));
        EasyMock.expectLastCall();
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(replicatorSourceTaskConfig, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, this.byteArrayConverter, this.byteArrayConverter, this.metricsGroup);
        replicatorSourceTask.doStart(singletonList);
        Assert.assertTrue(replicatorSourceTask.poll().isEmpty());
        verifyAll();
    }

    @Test
    public void testSkipPartitionExpansionIfNotPartitionsPreserved() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        List singletonList = Collections.singletonList(topicPartition);
        EasyMock.expect(this.context.offsetStorageReader()).andStubReturn(this.offsetReader);
        ReplicatorAdminClient replicatorAdminClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient replicatorAdminClient2 = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        TranslatorMonitor translatorMonitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = setupTaskConfigMock("dc.${topic}", false, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        EasyMock.expect(this.offsetsReplicator.topic()).andReturn("__consumer_timestamps");
        this.consumer.assign(singletonList);
        EasyMock.expectLastCall();
        EasyMock.expect(this.consumer.paused()).andStubReturn(Collections.emptySet());
        EasyMock.expect(Long.valueOf(this.offsetsReplicator.nextDeadline())).andReturn(Long.MAX_VALUE);
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(ConsumerRecords.empty());
        EasyMock.expect(Boolean.valueOf(this.offsetsReplicator.isDestinationReady())).andReturn(true);
        EasyMock.expect(this.offsetsReplicator.translateCollectedRecords()).andReturn(new ArrayList());
        replicatorAdminClient2.setInterestedTopics((Set) EasyMock.eq(Collections.singleton("dc.foo")), (ReplicatorAdminClient.TopicMetadataListener) EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        translatorMonitor.setInterestedTranslators((Collection) EasyMock.anyObject(Collection.class), (TranslatorMonitor.TranslatorListener) EasyMock.anyObject(TranslatorMonitor.TranslatorListener.class));
        EasyMock.expectLastCall();
        Node node = new Node(0, "localhost", 9092);
        EasyMock.expect(this.consumer.partitionsFor("foo")).andStubReturn(Arrays.asList(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}), new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node})));
        EasyMock.expect(replicatorAdminClient2.topicMetadata("dc.foo")).andStubReturn(new TopicMetadata("dc.foo", 1));
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.topicExists("dc.foo"))).andStubReturn(true);
        EasyMock.expect(replicatorSourceTaskConfig.getOffsetStart()).andReturn(ReplicatorSourceConnectorConfig.OffsetStart.CONNECT);
        EasyMock.expect(this.offsetReader.offset(Utils.toConnectPartition(topicPartition))).andReturn((Object) null);
        EasyMock.expect(this.consumer.committed(topicPartition)).andReturn((Object) null);
        this.consumer.seekToBeginning(Collections.singleton(topicPartition));
        EasyMock.expectLastCall();
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(replicatorSourceTaskConfig, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, this.byteArrayConverter, this.byteArrayConverter, this.metricsGroup);
        replicatorSourceTask.doStart(singletonList);
        Assert.assertTrue(replicatorSourceTask.poll().isEmpty());
        verifyAll();
    }

    @Test
    public void testNonExistingTopicPaused() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 1);
        TopicPartition topicPartition2 = new TopicPartition("dc.foo", 1);
        List singletonList = Collections.singletonList(topicPartition);
        EasyMock.expect(this.context.offsetStorageReader()).andStubReturn(this.offsetReader);
        ReplicatorAdminClient replicatorAdminClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient replicatorAdminClient2 = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        TranslatorMonitor translatorMonitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = setupTaskConfigMock("dc.${topic}", true, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        EasyMock.expect(this.offsetsReplicator.topic()).andReturn("__consumer_timestamps");
        this.consumer.assign(singletonList);
        EasyMock.expectLastCall();
        replicatorAdminClient2.setInterestedTopics((Set) EasyMock.eq(Collections.singleton("dc.foo")), (ReplicatorAdminClient.TopicMetadataListener) EasyMock.capture(Capture.newInstance()));
        EasyMock.expectLastCall();
        translatorMonitor.setInterestedTranslators((Collection) EasyMock.anyObject(Collection.class), (TranslatorMonitor.TranslatorListener) EasyMock.anyObject(TranslatorMonitor.TranslatorListener.class));
        EasyMock.expectLastCall();
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.partitionExists(topicPartition2))).andStubReturn(false);
        EasyMock.expect(replicatorSourceTaskConfig.getOffsetStart()).andReturn(ReplicatorSourceConnectorConfig.OffsetStart.CONNECT);
        EasyMock.expect(this.offsetReader.offset(Utils.toConnectPartition("foo", 1))).andReturn((Object) null);
        EasyMock.expect(this.consumer.committed(topicPartition)).andReturn((Object) null);
        this.consumer.pause(Collections.singleton(topicPartition));
        EasyMock.expectLastCall();
        this.consumer.seekToBeginning(Collections.singleton(topicPartition));
        EasyMock.expectLastCall();
        EasyMock.expect(Long.valueOf(this.offsetsReplicator.nextDeadline())).andReturn(Long.MAX_VALUE);
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(ConsumerRecords.empty());
        EasyMock.expect(Boolean.valueOf(this.offsetsReplicator.isDestinationReady())).andReturn(true);
        ArrayList arrayList = new ArrayList();
        EasyMock.expect(this.offsetsReplicator.translateCollectedRecords()).andReturn(arrayList);
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.getTopicPreservePartitions())).andStubReturn(true);
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(replicatorSourceTaskConfig, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, this.byteArrayConverter, this.byteArrayConverter, this.metricsGroup);
        replicatorSourceTask.doStart(singletonList);
        Assert.assertTrue(replicatorSourceTask.poll().isEmpty());
        verifyAll();
        resetAll();
        this.time.sleep(5000L);
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.partitionExists(topicPartition2))).andReturn(true);
        EasyMock.expect(this.context.offsetStorageReader()).andStubReturn(this.offsetReader);
        EasyMock.expect(replicatorSourceTaskConfig.getTopicRenameFormat()).andStubReturn("dc.${topic}");
        EasyMock.expect(this.consumer.paused()).andReturn(Collections.singleton(topicPartition));
        this.consumer.resume(Collections.singleton(topicPartition));
        EasyMock.expectLastCall();
        EasyMock.expect(this.consumer.paused()).andReturn(Collections.emptySet());
        EasyMock.expect(Long.valueOf(this.offsetsReplicator.nextDeadline())).andReturn(Long.MAX_VALUE);
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(ConsumerRecords.empty());
        EasyMock.expect(Boolean.valueOf(this.offsetsReplicator.isDestinationReady())).andReturn(true);
        EasyMock.expect(this.offsetsReplicator.translateCollectedRecords()).andReturn(arrayList);
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.getTopicPreservePartitions())).andStubReturn(true);
        replayAll();
        Assert.assertTrue(replicatorSourceTask.poll().isEmpty());
        verifyAll();
    }

    @Test
    public void testNonExistentPartitionsPausedWhenAutoCreateDisabled() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        TopicPartition topicPartition2 = new TopicPartition("dc.foo", 0);
        List singletonList = Collections.singletonList(topicPartition);
        EasyMock.expect(this.context.offsetStorageReader()).andStubReturn(this.offsetReader);
        ReplicatorAdminClient replicatorAdminClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient replicatorAdminClient2 = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        TranslatorMonitor translatorMonitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = setupTaskConfigMock("dc.${topic}", true, false, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        EasyMock.expect(this.offsetsReplicator.topic()).andReturn("__consumer_timestamps");
        Node node = new Node(0, "localhost", 9092);
        EasyMock.expect(this.consumer.partitionsFor("foo")).andStubReturn(Collections.singletonList(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})));
        EasyMock.expect(replicatorAdminClient2.topicMetadata("dc.foo")).andStubReturn((Object) null);
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.partitionExists(topicPartition2))).andStubReturn(false);
        this.consumer.assign(singletonList);
        EasyMock.expectLastCall();
        replicatorAdminClient2.setInterestedTopics((Set) EasyMock.eq(Collections.singleton("dc.foo")), (ReplicatorAdminClient.TopicMetadataListener) EasyMock.capture(Capture.newInstance()));
        EasyMock.expectLastCall();
        translatorMonitor.setInterestedTranslators((Collection) EasyMock.anyObject(Collection.class), (TranslatorMonitor.TranslatorListener) EasyMock.anyObject(TranslatorMonitor.TranslatorListener.class));
        EasyMock.expectLastCall();
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.partitionExists(topicPartition2))).andStubReturn(false);
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.topicExists("dc.foo"))).andStubReturn(false);
        EasyMock.expect(replicatorSourceTaskConfig.getOffsetStart()).andReturn(ReplicatorSourceConnectorConfig.OffsetStart.CONNECT);
        EasyMock.expect(this.offsetReader.offset(Utils.toConnectPartition("foo", 0))).andReturn((Object) null);
        EasyMock.expect(this.consumer.committed(topicPartition)).andReturn((Object) null);
        this.consumer.pause(Collections.singleton(topicPartition));
        EasyMock.expectLastCall();
        this.consumer.seekToBeginning(Collections.singleton(topicPartition));
        EasyMock.expectLastCall();
        EasyMock.expect(Long.valueOf(this.offsetsReplicator.nextDeadline())).andReturn(Long.MAX_VALUE);
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(ConsumerRecords.empty());
        EasyMock.expect(Boolean.valueOf(this.offsetsReplicator.isDestinationReady())).andReturn(true);
        ArrayList arrayList = new ArrayList();
        EasyMock.expect(this.offsetsReplicator.translateCollectedRecords()).andReturn(arrayList);
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.getTopicPreservePartitions())).andStubReturn(true);
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(replicatorSourceTaskConfig, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, this.byteArrayConverter, this.byteArrayConverter, this.metricsGroup);
        replicatorSourceTask.doStart(singletonList);
        Assert.assertTrue(replicatorSourceTask.poll().isEmpty());
        verifyAll();
        resetAll();
        this.time.sleep(5000L);
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.partitionExists(topicPartition2))).andReturn(true);
        EasyMock.expect(this.context.offsetStorageReader()).andStubReturn(this.offsetReader);
        EasyMock.expect(replicatorSourceTaskConfig.getTopicRenameFormat()).andStubReturn("dc.${topic}");
        EasyMock.expect(this.consumer.paused()).andReturn(Collections.singleton(topicPartition));
        this.consumer.resume(Collections.singleton(topicPartition));
        EasyMock.expectLastCall();
        EasyMock.expect(this.consumer.paused()).andReturn(Collections.emptySet());
        EasyMock.expect(Long.valueOf(this.offsetsReplicator.nextDeadline())).andReturn(Long.MAX_VALUE);
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(ConsumerRecords.empty());
        EasyMock.expect(Boolean.valueOf(this.offsetsReplicator.isDestinationReady())).andReturn(true);
        EasyMock.expect(this.offsetsReplicator.translateCollectedRecords()).andReturn(arrayList);
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.getTopicPreservePartitions())).andStubReturn(true);
        replayAll();
        Assert.assertTrue(replicatorSourceTask.poll().isEmpty());
        verifyAll();
    }

    @Test
    public void testCommitRecord() throws Exception {
        ReplicatorAdminClient replicatorAdminClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient replicatorAdminClient2 = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        TranslatorMonitor translatorMonitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = setupTaskConfigMock("dc.${topic}", true, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        EasyMock.expect(this.offsetsReplicator.topic()).andReturn("__consumer_timestamps");
        TopicPartition topicPartition = new TopicPartition("mytopic", 1);
        ConsumerTimestampsCommitter consumerTimestampsCommitter = new ConsumerTimestampsCommitter("mygroup", this.writer);
        SourceRecord sourceRecord = new SourceRecord(Utils.toConnectPartition(topicPartition.topic(), topicPartition.partition()), (Map) null, (String) null, (Integer) null, (Schema) null, (Object) null, (Schema) null, (Object) null, 100L, (Iterable) null);
        consumerTimestampsCommitter.commitRecord(sourceRecord);
        EasyMock.expectLastCall();
        this.metricsGroup.recordTaskTopicPartitionMetrics(sourceRecord);
        EasyMock.expectLastCall();
        replayAll();
        new ReplicatorSourceTask(replicatorSourceTaskConfig, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, this.byteArrayConverter, this.byteArrayConverter, consumerTimestampsCommitter, this.metricsGroup).commitRecord(sourceRecord, (RecordMetadata) null);
        verifyAll();
    }

    @Test
    public void testCommitSourceOffset() throws Exception {
        ReplicatorAdminClient replicatorAdminClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient replicatorAdminClient2 = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        TranslatorMonitor translatorMonitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = setupTaskConfigMock("dc.${topic}", true, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        EasyMock.expect(this.offsetsReplicator.topic()).andReturn("__consumer_timestamps");
        TopicPartition topicPartition = new TopicPartition("mytopic", 1);
        ConsumerOffsetsTopicCommitter consumerOffsetsTopicCommitter = new ConsumerOffsetsTopicCommitter(this.consumer);
        SourceRecord sourceRecord = new SourceRecord(Utils.toConnectPartition(topicPartition.topic(), topicPartition.partition()), Utils.toConnectOffset(1000L), (String) null, (Integer) null, (Schema) null, (Object) null, (Schema) null, (Object) null, 100L, (Iterable) null);
        consumerOffsetsTopicCommitter.commitRecord(sourceRecord, (RecordMetadata) null);
        EasyMock.expectLastCall();
        consumerOffsetsTopicCommitter.commit();
        EasyMock.expectLastCall();
        this.metricsGroup.recordTaskTopicPartitionMetrics(sourceRecord);
        EasyMock.expectLastCall();
        this.consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(1000 + 1, "")));
        EasyMock.expectLastCall();
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(replicatorSourceTaskConfig, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, this.byteArrayConverter, this.byteArrayConverter, (ConsumerTimestampsCommitter) null, consumerOffsetsTopicCommitter, this.metricsGroup);
        replicatorSourceTask.commitRecord(sourceRecord, (RecordMetadata) null);
        replicatorSourceTask.commit();
        consumerOffsetsTopicCommitter.checkCommit();
        verifyAll();
    }

    @Test
    public void testCommitRecordWhenRecordHasNoTimestamp() throws Exception {
        ReplicatorAdminClient replicatorAdminClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient replicatorAdminClient2 = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        TranslatorMonitor translatorMonitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = setupTaskConfigMock("dc.${topic}", true, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        EasyMock.expect(this.offsetsReplicator.topic()).andReturn("__consumer_timestamps");
        TopicPartition topicPartition = new TopicPartition("mytopic", 1);
        ConsumerTimestampsCommitter consumerTimestampsCommitter = new ConsumerTimestampsCommitter("mygroup", this.writer);
        SourceRecord sourceRecord = new SourceRecord(Utils.toConnectPartition(topicPartition.topic(), topicPartition.partition()), (Map) null, (String) null, (Integer) null, (Schema) null, (Object) null, (Schema) null, (Object) null, (Long) null, (Iterable) null);
        consumerTimestampsCommitter.commitRecord(sourceRecord);
        EasyMock.expectLastCall();
        this.metricsGroup.recordTaskTopicPartitionMetrics(sourceRecord);
        EasyMock.expectLastCall();
        replayAll();
        new ReplicatorSourceTask(replicatorSourceTaskConfig, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, this.byteArrayConverter, this.byteArrayConverter, consumerTimestampsCommitter, this.metricsGroup).commitRecord(sourceRecord, (RecordMetadata) null);
        verifyAll();
    }

    @Test
    public void testCommit() throws Exception {
        ReplicatorAdminClient replicatorAdminClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient replicatorAdminClient2 = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        TranslatorMonitor translatorMonitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = setupTaskConfigMock("dc.${topic}", true, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        EasyMock.expect(this.offsetsReplicator.topic()).andReturn("__consumer_timestamps");
        TopicPartition topicPartition = new TopicPartition("mytopic", 1);
        ConsumerTimestampsCommitter consumerTimestampsCommitter = new ConsumerTimestampsCommitter("mygroup", this.writer);
        new SourceRecord(Utils.toConnectPartition(topicPartition.topic(), topicPartition.partition()), (Map) null, (String) null, (Integer) null, (Schema) null, (Object) null, (Schema) null, (Object) null, 100L, (Iterable) null);
        consumerTimestampsCommitter.commit();
        EasyMock.expectLastCall();
        replayAll();
        new ReplicatorSourceTask(replicatorSourceTaskConfig, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, this.byteArrayConverter, this.byteArrayConverter, consumerTimestampsCommitter, this.metricsGroup).commit();
        verifyAll();
    }

    private List<SourceRecord> expectSimpleReceive(String str, TopicPartition topicPartition, String str2, ConsumerRecords<byte[], byte[]> consumerRecords, TimestampType timestampType) throws InterruptedException, ExecutionException {
        return expectSimpleReceive(str, topicPartition, str2, consumerRecords, timestampType, true);
    }

    private List<SourceRecord> expectSimpleReceive(String str, TopicPartition topicPartition, String str2, ConsumerRecords<byte[], byte[]> consumerRecords, TimestampType timestampType, boolean z) throws InterruptedException, ExecutionException {
        String renameTopic = Utils.renameTopic(str2, str);
        TopicPartition topicPartition2 = new TopicPartition(renameTopic, 0);
        List singletonList = Collections.singletonList(topicPartition);
        EasyMock.expect(this.context.offsetStorageReader()).andStubReturn(this.offsetReader);
        ReplicatorAdminClient replicatorAdminClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient replicatorAdminClient2 = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        TranslatorMonitor translatorMonitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = setupTaskConfigMock(str2, z, true, 10000, true, 120000, DEFAULT_TIMESTAMP_TYPE);
        EasyMock.expect(this.offsetsReplicator.topic()).andReturn("__consumer_timestamps");
        replicatorAdminClient2.setInterestedTopics((Set) EasyMock.eq(Collections.singleton(renameTopic)), (ReplicatorAdminClient.TopicMetadataListener) EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        translatorMonitor.setInterestedTranslators((Collection) EasyMock.anyObject(Collection.class), (TranslatorMonitor.TranslatorListener) EasyMock.anyObject(TranslatorMonitor.TranslatorListener.class));
        EasyMock.expectLastCall();
        this.consumer.assign(singletonList);
        EasyMock.expectLastCall();
        Node node = new Node(0, "localhost", 9092);
        EasyMock.expect(this.consumer.partitionsFor(str)).andReturn(Collections.singletonList(new PartitionInfo(str, 0, node, new Node[]{node}, new Node[]{node})));
        EasyMock.expect(replicatorAdminClient2.topicMetadata(renameTopic)).andReturn(new TopicMetadata(renameTopic, 1));
        if (z) {
            EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.partitionExists(topicPartition2))).andReturn(true);
        }
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.topicExists(renameTopic))).andReturn(true);
        EasyMock.expect(replicatorAdminClient.topicConfig(str)).andReturn(new Properties());
        Properties properties = new Properties();
        TimestampType timestampType2 = timestampType != null ? timestampType : DEFAULT_TIMESTAMP_TYPE;
        overrideTimestampType(timestampType2, properties);
        replicatorAdminClient2.changeTopicConfig(renameTopic, properties);
        EasyMock.expect(replicatorSourceTaskConfig.getTopicTimestampType()).andReturn(timestampType2.toString());
        EasyMock.expectLastCall();
        EasyMock.expect(replicatorAdminClient2.topicConfig(renameTopic)).andReturn(new Properties());
        EasyMock.expect(replicatorSourceTaskConfig.getOffsetStart()).andReturn(ReplicatorSourceConnectorConfig.OffsetStart.CONNECT);
        EasyMock.expect(this.offsetReader.offset(Utils.toConnectPartition(str, 0))).andReturn((Object) null);
        EasyMock.expect(this.consumer.committed(topicPartition)).andReturn((Object) null);
        this.consumer.seekToBeginning(Collections.singleton(topicPartition));
        EasyMock.expectLastCall();
        EasyMock.expect(Long.valueOf(this.offsetsReplicator.nextDeadline())).andReturn(Long.MAX_VALUE);
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(consumerRecords);
        Iterator it = consumerRecords.partitions().iterator();
        while (it.hasNext()) {
            if (((TopicPartition) it.next()).topic().equals("__consumer_timestamps")) {
                EasyMock.expect(this.offsetsReplicator.translateCollectedRecords()).andReturn(new ArrayList());
                EasyMock.expectLastCall();
            }
        }
        EasyMock.expect(Boolean.valueOf(this.offsetsReplicator.isDestinationReady())).andReturn(true);
        EasyMock.expect(this.offsetsReplicator.translateCollectedRecords()).andReturn(new ArrayList());
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.isProvenanceHeaderEnabled())).andReturn(true).times(2);
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(replicatorSourceTaskConfig, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, this.byteArrayConverter, this.byteArrayConverter, this.metricsGroup);
        replicatorSourceTask.doStart(singletonList);
        return replicatorSourceTask.poll();
    }

    private void overrideTimestampType(TimestampType timestampType, Properties properties) {
        properties.setProperty("message.timestamp.type", timestampType.toString());
    }

    private ReplicatorSourceTaskConfig setupTaskConfigMock(String str, boolean z, boolean z2, int i, boolean z3, int i2, TimestampType timestampType) {
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = (ReplicatorSourceTaskConfig) createMock(ReplicatorSourceTaskConfig.class);
        EasyMock.expect(replicatorSourceTaskConfig.getTopicRenameFormat()).andStubReturn(str);
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.getTopicPreservePartitions())).andStubReturn(Boolean.valueOf(z));
        EasyMock.expect(Integer.valueOf(replicatorSourceTaskConfig.getTopicCreateBackoffMs())).andStubReturn(Integer.valueOf(i));
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.getTopicAutoCreate())).andStubReturn(Boolean.valueOf(z2));
        EasyMock.expect(Boolean.valueOf(replicatorSourceTaskConfig.getTopicConfigSync())).andStubReturn(Boolean.valueOf(z3));
        EasyMock.expect(Integer.valueOf(replicatorSourceTaskConfig.getTopicConfigSyncIntervalMs())).andStubReturn(Integer.valueOf(i2));
        EasyMock.expect(replicatorSourceTaskConfig.getTopicTimestampType()).andStubReturn(timestampType.toString());
        return replicatorSourceTaskConfig;
    }

    private void expectTopicCreation(String str, Properties properties, ReplicatorAdminClient replicatorAdminClient) throws InterruptedException, ExecutionException {
        EasyMock.expect(replicatorAdminClient.topicMetadata(str)).andReturn((Object) null);
        EasyMock.expect(Integer.valueOf(replicatorAdminClient.aliveBrokers())).andReturn(1);
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient.createTopic(str, 1, (short) 1, properties))).andReturn(true);
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient.topicExists(str))).andReturn(true);
        EasyMock.expect(replicatorAdminClient.topicConfig(str)).andReturn(properties);
    }

    @Test
    public void testUpdateTopicConfigurationTimeouts() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        TopicPartition topicPartition2 = new TopicPartition("dc.foo", 0);
        List singletonList = Collections.singletonList(topicPartition);
        EasyMock.expect(this.context.offsetStorageReader()).andStubReturn(this.offsetReader);
        ReplicatorAdminClient replicatorAdminClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient replicatorAdminClient2 = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        TranslatorMonitor translatorMonitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = setupTaskConfigMock("dc.${topic}", true, true, 10000, true, 120000, DEFAULT_TIMESTAMP_TYPE);
        EasyMock.expect(this.offsetsReplicator.topic()).andReturn("__consumer_timestamps");
        this.consumer.assign(singletonList);
        EasyMock.expectLastCall();
        EasyMock.expect(Long.valueOf(this.offsetsReplicator.nextDeadline())).andReturn(Long.MAX_VALUE);
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(ConsumerRecords.empty());
        EasyMock.expect(Boolean.valueOf(this.offsetsReplicator.isDestinationReady())).andReturn(true);
        EasyMock.expect(this.offsetsReplicator.translateCollectedRecords()).andReturn(new ArrayList());
        replicatorAdminClient2.setInterestedTopics((Set) EasyMock.eq(Collections.singleton("dc.foo")), (ReplicatorAdminClient.TopicMetadataListener) EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        translatorMonitor.setInterestedTranslators((Collection) EasyMock.anyObject(Collection.class), (TranslatorMonitor.TranslatorListener) EasyMock.anyObject(TranslatorMonitor.TranslatorListener.class));
        EasyMock.expectLastCall();
        Node node = new Node(0, "localhost", 9092);
        EasyMock.expect(this.consumer.partitionsFor("foo")).andStubReturn(Collections.singletonList(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})));
        Properties properties = new Properties();
        properties.put("max.message.bytes", "2000000");
        EasyMock.expect(replicatorSourceTaskConfig.getOffsetStart()).andReturn(ReplicatorSourceConnectorConfig.OffsetStart.CONNECT);
        EasyMock.expect(this.offsetReader.offset(Utils.toConnectPartition("foo", 0))).andReturn((Object) null);
        EasyMock.expect(this.consumer.committed(topicPartition)).andReturn((Object) null);
        this.consumer.seekToBeginning(Collections.singleton(topicPartition));
        EasyMock.expectLastCall();
        EasyMock.expect(replicatorAdminClient2.topicMetadata("dc.foo")).andStubReturn(new TopicMetadata("dc.foo", 1));
        EasyMock.expect(Integer.valueOf(replicatorAdminClient2.aliveBrokers())).andStubReturn(1);
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.partitionExists(topicPartition2))).andReturn(true);
        EasyMock.expect(Boolean.valueOf(replicatorAdminClient2.topicExists("dc.foo"))).andReturn(true);
        EasyMock.expect(replicatorAdminClient.topicConfig("foo")).andReturn(properties);
        EasyMock.expect(replicatorAdminClient2.topicConfig("dc.foo")).andReturn(new Properties());
        replicatorAdminClient2.changeTopicConfig("dc.foo", properties);
        EasyMock.expectLastCall().andThrow(new TimeoutException());
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(replicatorSourceTaskConfig, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, this.byteArrayConverter, this.byteArrayConverter, this.metricsGroup);
        replicatorSourceTask.doStart(singletonList);
        Assert.assertTrue(replicatorSourceTask.poll().isEmpty());
        verifyAll();
    }

    @Test
    public void testStop() throws Exception {
        ReplicatorAdminClient replicatorAdminClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient replicatorAdminClient2 = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        TranslatorMonitor translatorMonitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = setupTaskConfigMock("dc.${topic}", true, true, 10000, true, 120000, DEFAULT_TIMESTAMP_TYPE);
        EasyMock.expect(this.offsetsReplicator.topic()).andReturn("__consumer_timestamps");
        ConsumerTimestampsCommitter consumerTimestampsCommitter = (ConsumerTimestampsCommitter) EasyMock.createMockBuilder(ConsumerTimestampsCommitter.class).withConstructor(new Class[]{String.class, ConsumerTimestampsWriter.class}).withArgs(new Object[]{"dummy-group", this.writer}).createMock();
        this.consumer.wakeup();
        EasyMock.expectLastCall();
        this.consumer.close();
        EasyMock.expectLastCall();
        replicatorAdminClient.close();
        EasyMock.expectLastCall();
        replicatorAdminClient2.close();
        EasyMock.expectLastCall();
        this.writer.close();
        EasyMock.expectLastCall();
        translatorMonitor.close();
        EasyMock.expectLastCall();
        this.metricsGroup.stopMetrics();
        EasyMock.expectLastCall();
        replayAll();
        new ReplicatorSourceTask(replicatorSourceTaskConfig, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, this.byteArrayConverter, this.byteArrayConverter, consumerTimestampsCommitter, this.metricsGroup).stop();
        verifyAll();
    }
}
