package io.confluent.connect.hdfs;

import io.confluent.common.utils.MockTime;
import io.confluent.connect.hdfs.utils.Data;
import io.confluent.connect.hdfs.utils.MemoryFormat;
import io.confluent.connect.hdfs.utils.MemoryRecordWriter;
import io.confluent.connect.hdfs.utils.MemoryStorage;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/confluent/connect/hdfs/FailureRecoveryTest.class */
public class FailureRecoveryTest extends HdfsSinkConnectorTestBase {
    private static final String ZERO_PAD_FMT = "%010d";
    private static final String extension = "";
    private Map<String, String> localProps = new HashMap();
    private MockTime time;

    @Override // io.confluent.connect.hdfs.HdfsSinkConnectorTestBase
    @Before
    public void setUp() throws Exception {
        this.time = new MockTime();
        this.time.sleep(System.currentTimeMillis());
        super.setUp();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.connect.hdfs.HdfsSinkConnectorTestBase
    public Map<String, String> createProps() {
        Map<String, String> createProps = super.createProps();
        createProps.put("storage.class", MemoryStorage.class.getName());
        createProps.put("format.class", MemoryFormat.class.getName());
        createProps.putAll(this.localProps);
        return createProps;
    }

    @Test
    public void testCommitFailure() {
        ArrayList<SinkRecord> createRecords = createRecords(12, 0, 7);
        DataWriter dataWriter = new DataWriter(this.connectorConfig, this.context, this.avroData, this.time);
        dataWriter.getStorage().setFailure(MemoryStorage.Failure.appendFailure);
        dataWriter.write(createRecords);
        Assert.assertEquals(this.context.timeout(), this.connectorConfig.getLong("retry.backoff.ms").longValue());
        Map<String, List<Object>> data = Data.getData();
        String logFileName = FileUtils.logFileName(this.url, this.logsDir, TOPIC_PARTITION);
        Assert.assertEquals((Object) null, data.get(logFileName));
        dataWriter.write(new ArrayList());
        Assert.assertEquals((Object) null, data.get(logFileName));
        this.time.sleep(this.context.timeout());
        dataWriter.write(new ArrayList());
        Assert.assertEquals(6L, data.get(logFileName).size());
        dataWriter.close();
        dataWriter.stop();
    }

    @Test
    public void testRotateAppendFailure() throws Exception {
        this.localProps.put("rotate.schedule.interval.ms", String.valueOf(TimeUnit.MINUTES.toMillis(10L)));
        setUp();
        Schema createSchema = createSchema();
        Struct createRecord = createRecord(createSchema);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 7) {
                break;
            }
            (j2 < 4 ? arrayList : arrayList2).add(new SinkRecord("test-topic", 12, Schema.STRING_SCHEMA, "key", createSchema, createRecord, j2));
            j = j2 + 1;
        }
        DataWriter dataWriter = new DataWriter(this.connectorConfig, this.context, this.avroData, this.time);
        MemoryStorage storage = dataWriter.getStorage();
        dataWriter.recover(TOPIC_PARTITION);
        dataWriter.write(arrayList);
        this.time.sleep(2 * ((Long) this.connectorConfig.get("rotate.schedule.interval.ms")).longValue());
        storage.setFailure(MemoryStorage.Failure.appendFailure);
        Data.logContents("Before failure");
        dataWriter.write(new ArrayList());
        Data.logContents("After failure");
        Assert.assertEquals(this.context.timeout(), this.connectorConfig.getLong("retry.backoff.ms").longValue());
        this.time.sleep(this.context.timeout());
        storage.setFailure(null);
        dataWriter.write(arrayList2);
        Data.logContents("After test");
        long[] jArr = {-1, 2, 3, 6};
        for (int i = 1; i < jArr.length; i++) {
            long j3 = jArr[i - 1] + 1;
            long j4 = jArr[i];
            String committedFileName = FileUtils.committedFileName(this.url, this.topicsDir.getOrDefault("test-topic", "topics"), "test-topic/partition=12", TOPIC_PARTITION, j3, j4, extension, ZERO_PAD_FMT);
            long j5 = (j4 - j3) + 1;
            TestCase.assertNotNull(committedFileName + " should have been created", Data.getData().get(committedFileName));
            Assert.assertEquals(committedFileName + " should contain a full batch of records", j5, r0.size());
        }
        dataWriter.close();
        dataWriter.stop();
    }

    @Test
    public void testWriterFailureMultiPartitions() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(createRecord(12, 0));
        arrayList.add(createRecord(13, 0));
        DataWriter dataWriter = new DataWriter(this.connectorConfig, this.context, this.avroData, this.time);
        dataWriter.write(arrayList);
        arrayList.clear();
        arrayList.addAll(createRecords(12, 1, 6));
        arrayList.addAll(createRecords(13, 1, 6));
        ((MemoryRecordWriter) dataWriter.getWriters(TOPIC_PARTITION).get("partition=12")).setFailure(MemoryRecordWriter.Failure.writeFailure);
        dataWriter.write(arrayList);
        Assert.assertEquals(this.context.timeout(), this.connectorConfig.getLong("retry.backoff.ms").longValue());
        Map<String, List<Object>> data = Data.getData();
        long[] jArr = {-1, 2, 5};
        for (int i = 1; i < jArr.length; i++) {
            long j = jArr[i - 1] + 1;
            Assert.assertEquals((jArr[i] - j) + 1, data.get(FileUtils.committedFileName(this.url, this.topicsDir.get(TOPIC_PARTITION2.topic()), "test-topic/partition=13", TOPIC_PARTITION2, j, r0, extension, ZERO_PAD_FMT)).size());
        }
        dataWriter.write(new ArrayList());
        Assert.assertEquals(this.context.timeout(), this.connectorConfig.getLong("retry.backoff.ms").longValue());
        List<Object> list = data.get((String) dataWriter.getTempFileNames(TOPIC_PARTITION).get("partition=12"));
        Assert.assertEquals(1L, list.size());
        for (int i2 = 0; i2 < list.size(); i2++) {
            Assert.assertEquals(createRecord(12, i2), list.get(i2));
        }
        this.time.sleep(this.context.timeout());
        dataWriter.write(arrayList.subList(6, 12));
        Assert.assertEquals(3L, list.size());
        for (int i3 = 0; i3 < list.size(); i3++) {
            Assert.assertEquals(createRecord(12, i3), list.get(i3));
        }
        dataWriter.write(new ArrayList());
        dataWriter.close();
        dataWriter.stop();
    }

    @Test
    public void testWriterFailure() {
        HdfsSinkConnectorConfig hdfsSinkConnectorConfig = new HdfsSinkConnectorConfig(this.properties);
        ArrayList<SinkRecord> createRecords = createRecords(12, 0, 1);
        DataWriter dataWriter = new DataWriter(hdfsSinkConnectorConfig, this.context, this.avroData, this.time);
        dataWriter.write(createRecords);
        ArrayList<SinkRecord> createRecords2 = createRecords(12, 1, 6);
        ((MemoryRecordWriter) dataWriter.getWriters(TOPIC_PARTITION).get("partition=12")).setFailure(MemoryRecordWriter.Failure.writeFailure);
        dataWriter.write(createRecords2);
        Assert.assertEquals(this.context.timeout(), hdfsSinkConnectorConfig.getLong("retry.backoff.ms").longValue());
        dataWriter.write(new ArrayList());
        Map<String, List<Object>> data = Data.getData();
        List<Object> list = data.get((String) dataWriter.getTempFileNames(TOPIC_PARTITION).get("partition=12"));
        Assert.assertEquals(1L, list.size());
        Assert.assertEquals(createRecord(12, 0), list.get(0));
        this.time.sleep(this.context.timeout());
        dataWriter.write(new ArrayList());
        List<Object> list2 = data.get((String) dataWriter.getTempFileNames(TOPIC_PARTITION).get("partition=12"));
        Assert.assertEquals(1L, list2.size());
        Assert.assertEquals(createRecord(12, 6), list2.get(0));
        dataWriter.write(new ArrayList());
        dataWriter.close();
        dataWriter.stop();
    }

    @Test
    public void testCloseFailure() throws Exception {
        HdfsSinkConnectorConfig hdfsSinkConnectorConfig = new HdfsSinkConnectorConfig(this.properties);
        ArrayList<SinkRecord> createRecords = createRecords(12, 0, 1);
        DataWriter dataWriter = new DataWriter(hdfsSinkConnectorConfig, this.context, this.avroData, this.time);
        dataWriter.write(createRecords);
        ArrayList<SinkRecord> createRecords2 = createRecords(12, 1, 6);
        Map writers = dataWriter.getWriters(TOPIC_PARTITION);
        ((MemoryRecordWriter) writers.get("partition=12")).setFailure(MemoryRecordWriter.Failure.closeFailure);
        dataWriter.write(createRecords2);
        Assert.assertEquals(this.context.timeout(), hdfsSinkConnectorConfig.getLong("retry.backoff.ms").longValue());
        dataWriter.write(createRecords(12, 0, 7));
        this.time.sleep(this.context.timeout());
        dataWriter.write(new ArrayList());
        List<Object> list = Data.getData().get((String) dataWriter.getTempFileNames(TOPIC_PARTITION).get("partition=12"));
        Assert.assertEquals(1L, list.size());
        Assert.assertEquals(createRecord(12, 6), list.get(0));
        ((MemoryRecordWriter) writers.get("partition=12")).setFailure(MemoryRecordWriter.Failure.closeFailure);
        dataWriter.write(createRecords(12, 7, 2));
        this.time.sleep(this.context.timeout());
        dataWriter.write(new ArrayList());
        Assert.assertEquals(6L, ((Long) dataWriter.getCommittedOffsets().get(new TopicPartition("test-topic", 12))).longValue());
        dataWriter.close();
        dataWriter.stop();
    }

    private SinkRecord createRecord(int i, int i2) {
        Schema createSchema = createSchema();
        return new SinkRecord("test-topic", i, Schema.STRING_SCHEMA, "key", createSchema, createRecord(createSchema), i2);
    }

    private ArrayList<SinkRecord> createRecords(int i, int i2, int i3) {
        ArrayList<SinkRecord> arrayList = new ArrayList<>();
        for (int i4 = i2; i4 < i3 + i2; i4++) {
            arrayList.add(createRecord(i, i4));
        }
        return arrayList;
    }
}
