package io.confluent.connect.hdfs;

import io.confluent.connect.avro.AvroData;
import io.confluent.connect.hdfs.avro.AvroDataFileReader;
import io.confluent.connect.hdfs.storage.HdfsStorage;
import io.confluent.connect.hdfs.wal.WAL;
import io.confluent.connect.storage.StorageFactory;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:io/confluent/connect/hdfs/HdfsSinkTaskTest.class */
public class HdfsSinkTaskTest extends TestWithMiniDFSCluster {
    private static final String DIRECTORY1 = "test-topic/partition=" + String.valueOf(12);
    private static final String DIRECTORY2 = "test-topic/partition=" + String.valueOf(13);
    private static final String extension = ".avro";
    private static final String ZERO_PAD_FMT = "%010d";
    private final DataFileReader schemaFileReader = new AvroDataFileReader();

    @Test
    public void testSinkTaskStart() throws Exception {
        setUp();
        createCommittedFiles();
        HdfsSinkTask hdfsSinkTask = new HdfsSinkTask();
        hdfsSinkTask.initialize(this.context);
        hdfsSinkTask.start(this.properties);
        Map offsets = this.context.offsets();
        Assert.assertEquals(offsets.size(), 2L);
        Assert.assertTrue(offsets.containsKey(TOPIC_PARTITION));
        Assert.assertEquals(21L, ((Long) offsets.get(TOPIC_PARTITION)).longValue());
        Assert.assertTrue(offsets.containsKey(TOPIC_PARTITION2));
        Assert.assertEquals(46L, ((Long) offsets.get(TOPIC_PARTITION2)).longValue());
        hdfsSinkTask.stop();
    }

    @Test
    public void testSinkTaskFileSystemIsolation() throws Exception {
        setUp();
        createCommittedFiles();
        Schema createSchema = createSchema();
        Struct createRecord = createRecord(createSchema);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (TopicPartition topicPartition : this.context.assignment()) {
            long j = 0;
            while (true) {
                long j2 = j;
                if (j2 >= 7) {
                    break;
                }
                arrayList.add(new SinkRecord(topicPartition.topic(), topicPartition.partition(), Schema.STRING_SCHEMA, "key", createSchema, createRecord, j2));
                j = j2 + 1;
            }
            long j3 = 7;
            while (true) {
                long j4 = j3;
                if (j4 < 16) {
                    arrayList2.add(new SinkRecord(topicPartition.topic(), topicPartition.partition(), Schema.STRING_SCHEMA, "key", createSchema, createRecord, j4));
                    j3 = j4 + 1;
                }
            }
        }
        HdfsSinkTask hdfsSinkTask = new HdfsSinkTask();
        hdfsSinkTask.initialize(this.context);
        hdfsSinkTask.start(this.properties);
        hdfsSinkTask.put(arrayList);
        FileSystem.get(new URI(this.connectorConfig.getString("hdfs.url")), this.connectorConfig.getHadoopConfiguration()).close();
        hdfsSinkTask.put(arrayList2);
        hdfsSinkTask.stop();
        AvroData avroData = hdfsSinkTask.getAvroData();
        long[] jArr = {-1, 2, 5, 8, 11, 14};
        for (TopicPartition topicPartition2 : this.context.assignment()) {
            String str = topicPartition2.topic() + "/partition=" + String.valueOf(topicPartition2.partition());
            for (int i = 1; i < jArr.length; i++) {
                long j5 = jArr[i - 1] + 1;
                long j6 = jArr[i];
                Collection<Object> readData = this.schemaFileReader.readData(this.connectorConfig.getHadoopConfiguration(), new Path(FileUtils.committedFileName(this.url, this.topicsDir, str, topicPartition2, j5, j6, extension, ZERO_PAD_FMT)));
                Assert.assertEquals(readData.size(), (j6 - j5) + 1);
                Iterator<Object> it = readData.iterator();
                while (it.hasNext()) {
                    Assert.assertEquals(it.next(), avroData.fromConnectData(createSchema, createRecord));
                }
            }
        }
    }

    @Test
    public void testSinkTaskStartNoCommittedFiles() throws Exception {
        setUp();
        HdfsSinkTask hdfsSinkTask = new HdfsSinkTask();
        hdfsSinkTask.initialize(this.context);
        hdfsSinkTask.start(this.properties);
        Assert.assertEquals(0L, this.context.offsets().size());
        hdfsSinkTask.stop();
    }

    @Test
    public void testSinkTaskStartSomeCommittedFiles() throws Exception {
        setUp();
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(FileUtils.tempFileName(this.url, this.topicsDir, DIRECTORY1, extension));
        arrayList.add(FileUtils.tempFileName(this.url, this.topicsDir, DIRECTORY1, extension));
        hashMap.put(TOPIC_PARTITION, arrayList);
        HashMap hashMap2 = new HashMap();
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(FileUtils.committedFileName(this.url, this.topicsDir, DIRECTORY1, TOPIC_PARTITION, 100L, 200L, extension, ZERO_PAD_FMT));
        arrayList2.add(FileUtils.committedFileName(this.url, this.topicsDir, DIRECTORY1, TOPIC_PARTITION, 201L, 300L, extension, ZERO_PAD_FMT));
        hashMap2.put(TOPIC_PARTITION, arrayList2);
        Iterator<TopicPartition> it = hashMap.keySet().iterator();
        while (it.hasNext()) {
            Iterator<String> it2 = hashMap.get(it.next()).iterator();
            while (it2.hasNext()) {
                this.fs.createNewFile(new Path(it2.next()));
            }
        }
        createWALs(hashMap, hashMap2);
        HdfsSinkTask hdfsSinkTask = new HdfsSinkTask();
        hdfsSinkTask.initialize(this.context);
        hdfsSinkTask.start(this.properties);
        Map offsets = this.context.offsets();
        Assert.assertEquals(1L, offsets.size());
        Assert.assertTrue(offsets.containsKey(TOPIC_PARTITION));
        Assert.assertEquals(301L, ((Long) offsets.get(TOPIC_PARTITION)).longValue());
        hdfsSinkTask.stop();
    }

    @Test
    public void testSinkTaskStartWithRecovery() throws Exception {
        setUp();
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(FileUtils.tempFileName(this.url, this.topicsDir, DIRECTORY1, extension));
        arrayList.add(FileUtils.tempFileName(this.url, this.topicsDir, DIRECTORY1, extension));
        hashMap.put(TOPIC_PARTITION, arrayList);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(FileUtils.tempFileName(this.url, this.topicsDir, DIRECTORY2, extension));
        arrayList2.add(FileUtils.tempFileName(this.url, this.topicsDir, DIRECTORY2, extension));
        hashMap.put(TOPIC_PARTITION2, arrayList2);
        HashMap hashMap2 = new HashMap();
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(FileUtils.committedFileName(this.url, this.topicsDir, DIRECTORY1, TOPIC_PARTITION, 100L, 200L, extension, ZERO_PAD_FMT));
        arrayList3.add(FileUtils.committedFileName(this.url, this.topicsDir, DIRECTORY1, TOPIC_PARTITION, 201L, 300L, extension, ZERO_PAD_FMT));
        hashMap2.put(TOPIC_PARTITION, arrayList3);
        ArrayList arrayList4 = new ArrayList();
        arrayList4.add(FileUtils.committedFileName(this.url, this.topicsDir, DIRECTORY2, TOPIC_PARTITION2, 400L, 500L, extension, ZERO_PAD_FMT));
        arrayList4.add(FileUtils.committedFileName(this.url, this.topicsDir, DIRECTORY2, TOPIC_PARTITION2, 501L, 800L, extension, ZERO_PAD_FMT));
        hashMap2.put(TOPIC_PARTITION2, arrayList4);
        Iterator<TopicPartition> it = hashMap.keySet().iterator();
        while (it.hasNext()) {
            Iterator<String> it2 = hashMap.get(it.next()).iterator();
            while (it2.hasNext()) {
                this.fs.createNewFile(new Path(it2.next()));
            }
        }
        createWALs(hashMap, hashMap2);
        HdfsSinkTask hdfsSinkTask = new HdfsSinkTask();
        hdfsSinkTask.initialize(this.context);
        hdfsSinkTask.start(this.properties);
        Map offsets = this.context.offsets();
        Assert.assertEquals(2L, offsets.size());
        Assert.assertTrue(offsets.containsKey(TOPIC_PARTITION));
        Assert.assertEquals(301L, ((Long) offsets.get(TOPIC_PARTITION)).longValue());
        Assert.assertTrue(offsets.containsKey(TOPIC_PARTITION2));
        Assert.assertEquals(801L, ((Long) offsets.get(TOPIC_PARTITION2)).longValue());
        hdfsSinkTask.stop();
    }

    @Test
    public void testSinkTaskPut() throws Exception {
        setUp();
        HdfsSinkTask hdfsSinkTask = new HdfsSinkTask();
        Schema createSchema = createSchema();
        Struct createRecord = createRecord(createSchema);
        ArrayList arrayList = new ArrayList();
        for (TopicPartition topicPartition : this.context.assignment()) {
            long j = 0;
            while (true) {
                long j2 = j;
                if (j2 < 7) {
                    arrayList.add(new SinkRecord(topicPartition.topic(), topicPartition.partition(), Schema.STRING_SCHEMA, "key", createSchema, createRecord, j2));
                    j = j2 + 1;
                }
            }
        }
        hdfsSinkTask.initialize(this.context);
        hdfsSinkTask.start(this.properties);
        hdfsSinkTask.put(arrayList);
        hdfsSinkTask.stop();
        AvroData avroData = hdfsSinkTask.getAvroData();
        long[] jArr = {-1, 2, 5};
        for (TopicPartition topicPartition2 : this.context.assignment()) {
            String str = topicPartition2.topic() + "/partition=" + String.valueOf(topicPartition2.partition());
            for (int i = 1; i < jArr.length; i++) {
                long j3 = jArr[i - 1] + 1;
                long j4 = jArr[i];
                Collection<Object> readData = this.schemaFileReader.readData(this.connectorConfig.getHadoopConfiguration(), new Path(FileUtils.committedFileName(this.url, this.topicsDir, str, topicPartition2, j3, j4, extension, ZERO_PAD_FMT)));
                Assert.assertEquals(readData.size(), (j4 - j3) + 1);
                Iterator<Object> it = readData.iterator();
                while (it.hasNext()) {
                    Assert.assertEquals(it.next(), avroData.fromConnectData(createSchema, createRecord));
                }
            }
        }
    }

    @Test
    public void testSinkTaskPutPrimitive() throws Exception {
        setUp();
        HdfsSinkTask hdfsSinkTask = new HdfsSinkTask();
        Schema schema = Schema.INT32_SCHEMA;
        ArrayList arrayList = new ArrayList();
        for (TopicPartition topicPartition : this.context.assignment()) {
            long j = 0;
            while (true) {
                long j2 = j;
                if (j2 < 7) {
                    arrayList.add(new SinkRecord(topicPartition.topic(), topicPartition.partition(), Schema.STRING_SCHEMA, "key", schema, 12, j2));
                    j = j2 + 1;
                }
            }
        }
        hdfsSinkTask.initialize(this.context);
        hdfsSinkTask.start(this.properties);
        hdfsSinkTask.put(arrayList);
        hdfsSinkTask.stop();
        long[] jArr = {-1, 2, 5};
        for (TopicPartition topicPartition2 : this.context.assignment()) {
            String str = topicPartition2.topic() + "/partition=" + String.valueOf(topicPartition2.partition());
            for (int i = 1; i < jArr.length; i++) {
                long j3 = jArr[i - 1] + 1;
                long j4 = jArr[i];
                Collection<Object> readData = this.schemaFileReader.readData(this.connectorConfig.getHadoopConfiguration(), new Path(FileUtils.committedFileName(this.url, this.topicsDir, str, topicPartition2, j3, j4, extension, ZERO_PAD_FMT)));
                Assert.assertEquals(readData.size(), (j4 - j3) + 1);
                Iterator<Object> it = readData.iterator();
                while (it.hasNext()) {
                    Assert.assertEquals(it.next(), 12);
                }
            }
        }
    }

    private void createCommittedFiles() throws IOException {
        String committedFileName = FileUtils.committedFileName(this.url, this.topicsDir, DIRECTORY1, TOPIC_PARTITION, 0L, 10L, extension, ZERO_PAD_FMT);
        String committedFileName2 = FileUtils.committedFileName(this.url, this.topicsDir, DIRECTORY1, TOPIC_PARTITION, 11L, 20L, extension, ZERO_PAD_FMT);
        String committedFileName3 = FileUtils.committedFileName(this.url, this.topicsDir, DIRECTORY1, TOPIC_PARTITION2, 21L, 40L, extension, ZERO_PAD_FMT);
        String committedFileName4 = FileUtils.committedFileName(this.url, this.topicsDir, DIRECTORY1, TOPIC_PARTITION2, 41L, 45L, extension, ZERO_PAD_FMT);
        this.fs.createNewFile(new Path(committedFileName));
        this.fs.createNewFile(new Path(committedFileName2));
        this.fs.createNewFile(new Path(committedFileName3));
        this.fs.createNewFile(new Path(committedFileName4));
    }

    private void createWALs(Map<TopicPartition, List<String>> map, Map<TopicPartition, List<String>> map2) throws Exception {
        HdfsStorage createStorage = StorageFactory.createStorage(this.connectorConfig.getClass("storage.class"), HdfsSinkConnectorConfig.class, this.connectorConfig, this.url);
        for (TopicPartition topicPartition : map.keySet()) {
            WAL wal = createStorage.wal(this.logsDir, topicPartition);
            List<String> list = map.get(topicPartition);
            List<String> list2 = map2.get(topicPartition);
            wal.append("BEGIN", "");
            for (int i = 0; i < list.size(); i++) {
                wal.append(list.get(i), list2.get(i));
            }
            wal.append("END", "");
            wal.close();
        }
    }
}
