package io.confluent.connect.replicator;

import io.confluent.connect.replicator.metrics.ConfluentReplicatorTaskMetricsGroup;
import io.confluent.connect.replicator.offsets.ConsumerOffsetsTopicCommitter;
import io.confluent.connect.replicator.offsets.ConsumerOffsetsTranslator;
import io.confluent.connect.replicator.offsets.ConsumerTimestampsCommitter;
import io.confluent.connect.replicator.offsets.ConsumerTimestampsWriter;
import io.confluent.connect.replicator.util.ByteArrayConverter;
import io.confluent.connect.replicator.util.MockTime;
import io.confluent.connect.replicator.util.ReplicatorAdminClient;
import io.confluent.connect.replicator.util.TranslatorMonitor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.theories.Theories;
import org.junit.runner.RunWith;

@RunWith(Theories.class)
/* loaded from: input_file:io/confluent/connect/replicator/ReplicatorSourceTaskCommitOffsetTest.class */
public class ReplicatorSourceTaskCommitOffsetTest extends EasyMockSupport {
    private MockTime time;
    private static final TimestampType DEFAULT_TIMESTAMP_TYPE = TimestampType.CREATE_TIME;

    @Mock
    private SourceTaskContext context;

    @Mock
    private Consumer consumer;

    @Mock
    private OffsetStorageReader offsetReader;

    @Mock
    private ConsumerOffsetsTranslator offsetsReplicator;

    @Mock
    private ConsumerTimestampsWriter writer;

    @Mock
    private ConfluentReplicatorTaskMetricsGroup metricsGroup;

    @Mock
    private ReplicatorAdminClient sourceClient;

    @Mock
    private ReplicatorAdminClient destClient;

    @Mock
    private TranslatorMonitor monitor;

    @Mock
    private ReplicatorSourceTaskConfig config;
    ConsumerOffsetsTopicCommitter committer;
    private final String taskId = "replicator-1";
    private final Converter byteArrayConverter = new ByteArrayConverter();
    private final int topicCreateBackoffMs = 10000;
    private final int topicConfigSyncIntervalMs = 120000;
    private final String sourceTopic = "foo";
    private final int partition = 1;
    private final String topicRenameFormat = "dc.${topic}";
    private final TopicPartition topicPartition = new TopicPartition("foo", 1);
    private final RecordMetadata recordMetadata = new RecordMetadata(this.topicPartition, 0, 0, 0, 0L, 0, 0);
    private final HashMap<String, String> configOriginals = new HashMap<>();

    @Before
    public void setup() throws Exception {
        this.time = new MockTime();
        this.context = (SourceTaskContext) createMock(SourceTaskContext.class);
        this.consumer = (Consumer) createMock(Consumer.class);
        this.offsetReader = (OffsetStorageReader) createMock(OffsetStorageReader.class);
        this.offsetsReplicator = (ConsumerOffsetsTranslator) createMock(ConsumerOffsetsTranslator.class);
        this.metricsGroup = (ConfluentReplicatorTaskMetricsGroup) createMock(ConfluentReplicatorTaskMetricsGroup.class);
        this.writer = (ConsumerTimestampsWriter) createMock(ConsumerTimestampsWriter.class);
        this.sourceClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        this.destClient = (ReplicatorAdminClient) createMock(ReplicatorAdminClient.class);
        this.monitor = (TranslatorMonitor) createMock(TranslatorMonitor.class);
        this.config = (ReplicatorSourceTaskConfig) createMock(ReplicatorSourceTaskConfig.class);
        this.committer = new ConsumerOffsetsTopicCommitter(this.consumer, true, this.time, 0);
        setupMocks();
    }

    public void setupMocks() throws Exception {
        setupTaskConfigMock();
        EasyMock.expect(Boolean.valueOf(this.offsetsReplicator.isDestinationReady())).andStubReturn(true);
        EasyMock.expect(this.offsetsReplicator.translateCollectedRecords()).andStubReturn(Collections.emptyList());
        EasyMock.expect(Long.valueOf(this.offsetsReplicator.nextDeadline())).andStubReturn(Long.MAX_VALUE);
        EasyMock.expect(this.offsetsReplicator.topic()).andStubReturn("__consumer_timestamps");
        EasyMock.expect(this.sourceClient.clusterId()).andStubReturn("srcClusterId");
        EasyMock.expect(this.destClient.clusterId()).andStubReturn("destClusterId");
        this.consumer.assign((Collection) EasyMock.anyObject(List.class));
        EasyMock.expectLastCall();
        this.destClient.setInterestedTopics((Set) EasyMock.anyObject(Set.class), (ReplicatorAdminClient.TopicMetadataListener) EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        this.monitor.setInterestedTranslators((Collection) EasyMock.anyObject(Collection.class), (TranslatorMonitor.TranslatorListener) EasyMock.anyObject(TranslatorMonitor.TranslatorListener.class));
        EasyMock.expectLastCall();
        this.metricsGroup.recordTaskTopicPartitionMetrics((SourceRecord) EasyMock.anyObject(SourceRecord.class));
        EasyMock.expectLastCall().anyTimes();
    }

    private void setupTaskConfigMock() {
        EasyMock.expect(this.config.getTopicRenameFormat()).andStubReturn("dc.${topic}");
        EasyMock.expect(Boolean.valueOf(this.config.getTopicPreservePartitions())).andStubReturn(true);
        EasyMock.expect(Integer.valueOf(this.config.getTopicCreateBackoffMs())).andStubReturn(10000);
        EasyMock.expect(Boolean.valueOf(this.config.getTopicAutoCreate())).andStubReturn(true);
        EasyMock.expect(Boolean.valueOf(this.config.getTopicConfigSync())).andStubReturn(false);
        EasyMock.expect(Integer.valueOf(this.config.getTopicConfigSyncIntervalMs())).andStubReturn(120000);
        EasyMock.expect(this.config.getTopicTimestampType()).andStubReturn(DEFAULT_TIMESTAMP_TYPE.toString());
        EasyMock.expect(Integer.valueOf(this.config.getOffsetTopicCommitBatchPeriodMs())).andStubReturn(0);
        EasyMock.expect(Boolean.valueOf(this.config.isProvenanceHeaderEnabled())).andStubReturn(true);
        EasyMock.expect(this.config.getTaskId()).andStubReturn("replicator-0");
        EasyMock.expect(this.config.getSourceKeyConverter()).andStubReturn(new ByteArrayConverter());
        EasyMock.expect(this.config.getSourceValueConverter()).andStubReturn(new ByteArrayConverter());
        EasyMock.expect(this.config.getSourceHeaderConverter()).andStubReturn(new ByteArrayConverter());
        EasyMock.expect(this.config.originalsStrings()).andStubReturn((Object) null);
        EasyMock.expect(this.config.getString(EasyMock.anyString())).andStubReturn((Object) null);
        EasyMock.expect(Boolean.valueOf(this.config.isOffsetTimestampsCommitEnabled())).andStubReturn(false);
        EasyMock.expect(Boolean.valueOf(this.config.isOffsetTopicCommitEnabled())).andStubReturn(true);
        EasyMock.expect(this.config.getSchemaRegistryTopic()).andStubReturn((Object) null);
        EasyMock.expect(this.offsetsReplicator.topic()).andStubReturn("__consumer_timestamps");
        EasyMock.expect(Integer.valueOf(this.config.getOffsetTranslatorBatchPeriodMs())).andStubReturn(0);
        EasyMock.expect(Integer.valueOf(this.config.getOffsetTranslatorBatchSize())).andStubReturn(0);
        EasyMock.expect(this.config.getPartitionAssignment()).andStubReturn(new ConsumerPartitionAssignor.Assignment(Collections.emptyList()));
    }

    private ConsumerRecords<byte[], byte[]> createConsumerRecords(long j, int i) {
        RecordHeaders recordHeaders = new RecordHeaders();
        recordHeaders.add(new RecordHeader("my_header", "some_value".getBytes()));
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(new ConsumerRecord("foo", 1, j + i2, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, new byte[0], new byte[0], recordHeaders));
        }
        return new ConsumerRecords<>(Collections.singletonMap(this.topicPartition, arrayList));
    }

    private ConsumerRecords<byte[], byte[]> createConsumerRecords(long j, List<Boolean> list) {
        Headers recordHeaders = new RecordHeaders();
        recordHeaders.add(new RecordHeader("my_header", "some_value".getBytes()));
        Headers recordHeaders2 = new RecordHeaders();
        recordHeaders2.add(new RecordHeader("__replicator_id", "some_value".getBytes()));
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < list.size(); i++) {
            arrayList.add(new ConsumerRecord("foo", 1, j + i, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, new byte[0], new byte[0], list.get(i).booleanValue() ? recordHeaders2 : recordHeaders));
        }
        return new ConsumerRecords<>(Collections.singletonMap(this.topicPartition, arrayList));
    }

    @Test
    public void testSingleCommittedRecord() throws Exception {
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(createConsumerRecords(100L, 1));
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andStubReturn(ConsumerRecords.empty());
        this.consumer.commitSync(Collections.emptyMap());
        EasyMock.expectLastCall();
        this.consumer.commitSync(Collections.singletonMap(this.topicPartition, new OffsetAndMetadata(100 + 1, "")));
        EasyMock.expectLastCall();
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(this.config, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, this.sourceClient, this.destClient, this.monitor, this.byteArrayConverter, this.byteArrayConverter, (ConsumerTimestampsCommitter) null, this.committer, this.metricsGroup);
        replicatorSourceTask.start(this.configOriginals);
        List poll = replicatorSourceTask.poll();
        Assert.assertEquals(1L, poll.size());
        replicatorSourceTask.commitRecord((SourceRecord) poll.get(0), this.recordMetadata);
        replicatorSourceTask.poll();
        verifyAll();
    }

    @Test
    public void testSingleRecordSkippedByConnect() throws Exception {
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(createConsumerRecords(100L, 1));
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andStubReturn(ConsumerRecords.empty());
        this.consumer.commitSync(Collections.emptyMap());
        EasyMock.expectLastCall();
        this.consumer.commitSync(Collections.singletonMap(this.topicPartition, new OffsetAndMetadata(100 + 1, "")));
        EasyMock.expectLastCall();
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(this.config, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, this.sourceClient, this.destClient, this.monitor, this.byteArrayConverter, this.byteArrayConverter, (ConsumerTimestampsCommitter) null, this.committer, this.metricsGroup);
        replicatorSourceTask.start(this.configOriginals);
        List poll = replicatorSourceTask.poll();
        Assert.assertEquals(1L, poll.size());
        replicatorSourceTask.commitRecord((SourceRecord) poll.get(0), (RecordMetadata) null);
        replicatorSourceTask.poll();
        verifyAll();
    }

    @Test
    public void testTwoRecordsVariation1() throws Exception {
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(createConsumerRecords(100L, 2));
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andStubReturn(ConsumerRecords.empty());
        this.consumer.commitSync(Collections.emptyMap());
        EasyMock.expectLastCall();
        this.consumer.commitSync(Collections.singletonMap(this.topicPartition, new OffsetAndMetadata(100 + 1, "")));
        EasyMock.expectLastCall();
        this.consumer.commitSync(Collections.singletonMap(this.topicPartition, new OffsetAndMetadata(100 + 2, "")));
        EasyMock.expectLastCall();
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(this.config, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, this.sourceClient, this.destClient, this.monitor, this.byteArrayConverter, this.byteArrayConverter, (ConsumerTimestampsCommitter) null, this.committer, this.metricsGroup);
        replicatorSourceTask.start(this.configOriginals);
        List poll = replicatorSourceTask.poll();
        Assert.assertEquals(2L, poll.size());
        replicatorSourceTask.commitRecord((SourceRecord) poll.get(0), this.recordMetadata);
        replicatorSourceTask.poll();
        replicatorSourceTask.commitRecord((SourceRecord) poll.get(1), this.recordMetadata);
        replicatorSourceTask.poll();
        verifyAll();
    }

    @Test
    public void testTwoRecordsVariation2() throws Exception {
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(createConsumerRecords(100L, 2));
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andStubReturn(ConsumerRecords.empty());
        this.consumer.commitSync(Collections.emptyMap());
        EasyMock.expectLastCall().times(2);
        this.consumer.commitSync(Collections.singletonMap(this.topicPartition, new OffsetAndMetadata(100 + 2, "")));
        EasyMock.expectLastCall();
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(this.config, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, this.sourceClient, this.destClient, this.monitor, this.byteArrayConverter, this.byteArrayConverter, (ConsumerTimestampsCommitter) null, this.committer, this.metricsGroup);
        replicatorSourceTask.start(this.configOriginals);
        List poll = replicatorSourceTask.poll();
        Assert.assertEquals(2L, poll.size());
        replicatorSourceTask.commitRecord((SourceRecord) poll.get(0), (RecordMetadata) null);
        replicatorSourceTask.poll();
        replicatorSourceTask.commitRecord((SourceRecord) poll.get(1), (RecordMetadata) null);
        replicatorSourceTask.poll();
        verifyAll();
    }

    @Test
    public void testTwoRecordsVariation3() throws Exception {
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(createConsumerRecords(100L, 2));
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andStubReturn(ConsumerRecords.empty());
        this.consumer.commitSync(Collections.emptyMap());
        EasyMock.expectLastCall().times(2);
        this.consumer.commitSync(Collections.singletonMap(this.topicPartition, new OffsetAndMetadata(100 + 2, "")));
        EasyMock.expectLastCall();
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(this.config, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, this.sourceClient, this.destClient, this.monitor, this.byteArrayConverter, this.byteArrayConverter, (ConsumerTimestampsCommitter) null, this.committer, this.metricsGroup);
        replicatorSourceTask.start(this.configOriginals);
        List poll = replicatorSourceTask.poll();
        Assert.assertEquals(2L, poll.size());
        replicatorSourceTask.commitRecord((SourceRecord) poll.get(0), (RecordMetadata) null);
        replicatorSourceTask.poll();
        replicatorSourceTask.commitRecord((SourceRecord) poll.get(1), this.recordMetadata);
        replicatorSourceTask.poll();
        verifyAll();
    }

    @Test
    public void testTwoRecordsVariation4() throws Exception {
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(createConsumerRecords(100L, 2));
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andStubReturn(ConsumerRecords.empty());
        this.consumer.commitSync(Collections.emptyMap());
        EasyMock.expectLastCall();
        this.consumer.commitSync(Collections.singletonMap(this.topicPartition, new OffsetAndMetadata(100 + 1, "")));
        EasyMock.expectLastCall();
        this.consumer.commitSync(Collections.singletonMap(this.topicPartition, new OffsetAndMetadata(100 + 2, "")));
        EasyMock.expectLastCall();
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(this.config, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, this.sourceClient, this.destClient, this.monitor, this.byteArrayConverter, this.byteArrayConverter, (ConsumerTimestampsCommitter) null, this.committer, this.metricsGroup);
        replicatorSourceTask.start(this.configOriginals);
        List poll = replicatorSourceTask.poll();
        Assert.assertEquals(2L, poll.size());
        replicatorSourceTask.commitRecord((SourceRecord) poll.get(0), this.recordMetadata);
        replicatorSourceTask.poll();
        replicatorSourceTask.commitRecord((SourceRecord) poll.get(1), (RecordMetadata) null);
        replicatorSourceTask.poll();
        verifyAll();
    }

    @Test
    public void testSingleProvenanceRecord() throws Exception {
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(createConsumerRecords(100L, Collections.singletonList(true)));
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andStubReturn(ConsumerRecords.empty());
        this.consumer.commitSync(Collections.emptyMap());
        EasyMock.expectLastCall();
        this.consumer.commitSync(Collections.singletonMap(this.topicPartition, new OffsetAndMetadata(100 + 1, "")));
        EasyMock.expectLastCall();
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(this.config, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, this.sourceClient, this.destClient, this.monitor, this.byteArrayConverter, this.byteArrayConverter, (ConsumerTimestampsCommitter) null, this.committer, this.metricsGroup);
        replicatorSourceTask.start(this.configOriginals);
        Assert.assertEquals(0L, replicatorSourceTask.poll().size());
        replicatorSourceTask.poll();
        verifyAll();
    }

    @Test
    public void testProvenanceRecordBlockedByUncommittedRecord() throws Exception {
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(createConsumerRecords(100L, Arrays.asList(false, true)));
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(ConsumerRecords.empty());
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andReturn(createConsumerRecords(100 + 2, Arrays.asList(true)));
        EasyMock.expect(this.consumer.poll(EasyMock.anyLong())).andStubReturn(ConsumerRecords.empty());
        this.consumer.commitSync(Collections.emptyMap());
        EasyMock.expectLastCall();
        this.consumer.commitSync(Collections.singletonMap(this.topicPartition, new OffsetAndMetadata(100 + 1, "")));
        EasyMock.expectLastCall().times(2);
        this.consumer.commitSync(Collections.singletonMap(this.topicPartition, new OffsetAndMetadata(100 + 3, "")));
        EasyMock.expectLastCall();
        replayAll();
        ReplicatorSourceTask replicatorSourceTask = new ReplicatorSourceTask(this.config, this.context, "replicator-1", this.time, this.consumer, this.offsetsReplicator, this.sourceClient, this.destClient, this.monitor, this.byteArrayConverter, this.byteArrayConverter, (ConsumerTimestampsCommitter) null, this.committer, this.metricsGroup);
        replicatorSourceTask.start(this.configOriginals);
        List poll = replicatorSourceTask.poll();
        Assert.assertEquals(1L, poll.size());
        replicatorSourceTask.commitRecord((SourceRecord) poll.get(0), this.recordMetadata);
        Assert.assertEquals(0L, replicatorSourceTask.poll().size());
        Assert.assertEquals(0L, replicatorSourceTask.poll().size());
        Assert.assertEquals(0L, replicatorSourceTask.poll().size());
        verifyAll();
    }
}
