package io.confluent.connect.s3;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import io.confluent.connect.s3.format.bytearray.ByteArrayFormat;
import io.confluent.connect.s3.storage.CompressionType;
import io.confluent.connect.s3.storage.S3OutputStream;
import io.confluent.connect.s3.storage.S3Storage;
import io.confluent.connect.s3.util.FileUtils;
import io.confluent.connect.storage.partitioner.DefaultPartitioner;
import io.confluent.connect.storage.partitioner.Partitioner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.converters.ByteArrayConverter;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.powermock.api.mockito.PowerMockito;

/* loaded from: input_file:io/confluent/connect/s3/DataWriterByteArrayTest.class */
public class DataWriterByteArrayTest extends TestWithMockedS3 {
    private static final String ZERO_PAD_FMT = "%010d";
    private ByteArrayConverter converter;
    protected S3Storage storage;
    protected AmazonS3 s3;
    protected Partitioner<FieldSchema> partitioner;
    protected ByteArrayFormat format;
    protected S3SinkTask task;
    protected Map<String, String> localProps = new HashMap();

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.connect.s3.TestWithMockedS3, io.confluent.connect.s3.S3SinkConnectorTestBase
    public Map<String, String> createProps() {
        Map<String, String> createProps = super.createProps();
        createProps.putAll(this.localProps);
        return createProps;
    }

    @Override // io.confluent.connect.s3.TestWithMockedS3, io.confluent.connect.s3.S3SinkConnectorTestBase
    public void setUp() throws Exception {
        super.setUp();
        this.converter = new ByteArrayConverter();
        this.s3 = newS3Client(this.connectorConfig);
        this.storage = new S3Storage(this.connectorConfig, this.url, "kafka.bucket", this.s3);
        this.partitioner = new DefaultPartitioner();
        this.partitioner.configure(this.parsedConfig);
        this.format = new ByteArrayFormat(this.storage);
        this.s3.createBucket("kafka.bucket");
        Assert.assertTrue(this.s3.doesBucketExist("kafka.bucket"));
    }

    @Override // io.confluent.connect.s3.TestWithMockedS3, io.confluent.connect.s3.S3SinkConnectorTestBase
    @After
    public void tearDown() throws Exception {
        super.tearDown();
        this.localProps.clear();
    }

    @Test
    public void testBufferOverflowFix() throws Exception {
        this.localProps.put("format.class", ByteArrayFormat.class.getName());
        setUp();
        ((S3SinkConnectorConfig) PowerMockito.doReturn(5).when(this.connectorConfig)).getPartSize();
        S3OutputStream s3OutputStream = new S3OutputStream("kafka.bucket", this.connectorConfig, this.s3);
        s3OutputStream.write(new byte[]{65, 66, 67, 68, 69});
        s3OutputStream.write(70);
    }

    @Test
    public void testNoSchema() throws Exception {
        this.localProps.put("format.class", ByteArrayFormat.class.getName());
        setUp();
        this.task = new S3SinkTask(this.connectorConfig, this.context, this.storage, this.partitioner, this.format, SYSTEM_TIME);
        List<SinkRecord> createByteArrayRecordsWithoutSchema = createByteArrayRecordsWithoutSchema(7 * this.context.assignment().size(), 0L, this.context.assignment());
        this.task.put(createByteArrayRecordsWithoutSchema);
        this.task.close(this.context.assignment());
        this.task.stop();
        verify(createByteArrayRecordsWithoutSchema, new long[]{0, 3, 6}, this.context.assignment(), ".bin");
    }

    @Test
    public void testGzipCompression() throws Exception {
        CompressionType compressionType = CompressionType.GZIP;
        this.localProps.put("format.class", ByteArrayFormat.class.getName());
        this.localProps.put("s3.compression.type", compressionType.name);
        setUp();
        this.task = new S3SinkTask(this.connectorConfig, this.context, this.storage, this.partitioner, this.format, SYSTEM_TIME);
        List<SinkRecord> createByteArrayRecordsWithoutSchema = createByteArrayRecordsWithoutSchema(7 * this.context.assignment().size(), 0L, this.context.assignment());
        this.task.put(createByteArrayRecordsWithoutSchema);
        this.task.close(this.context.assignment());
        this.task.stop();
        verify(createByteArrayRecordsWithoutSchema, new long[]{0, 3, 6}, this.context.assignment(), ".bin.gz");
    }

    @Test
    public void testBestGzipCompression() throws Exception {
        CompressionType compressionType = CompressionType.GZIP;
        this.localProps.put("format.class", ByteArrayFormat.class.getName());
        this.localProps.put("s3.compression.type", compressionType.name);
        this.localProps.put("s3.compression.level", String.valueOf(9));
        setUp();
        this.task = new S3SinkTask(this.connectorConfig, this.context, this.storage, this.partitioner, this.format, SYSTEM_TIME);
        List<SinkRecord> createByteArrayRecordsWithoutSchema = createByteArrayRecordsWithoutSchema(7 * this.context.assignment().size(), 0L, this.context.assignment());
        this.task.put(createByteArrayRecordsWithoutSchema);
        this.task.close(this.context.assignment());
        this.task.stop();
        verify(createByteArrayRecordsWithoutSchema, new long[]{0, 3, 6}, this.context.assignment(), ".bin.gz");
    }

    @Test
    public void testCustomExtensionAndLineSeparator() throws Exception {
        this.localProps.put("format.class", ByteArrayFormat.class.getName());
        this.localProps.put("format.bytearray.separator", "SEPARATOR");
        this.localProps.put("format.bytearray.extension", ".customExtensionForTest");
        setUp();
        this.task = new S3SinkTask(this.connectorConfig, this.context, this.storage, this.partitioner, this.format, SYSTEM_TIME);
        List<SinkRecord> createByteArrayRecordsWithoutSchema = createByteArrayRecordsWithoutSchema(7 * this.context.assignment().size(), 0L, this.context.assignment());
        this.task.put(createByteArrayRecordsWithoutSchema);
        this.task.close(this.context.assignment());
        this.task.stop();
        verify(createByteArrayRecordsWithoutSchema, new long[]{0, 3, 6}, this.context.assignment(), ".customExtensionForTest");
    }

    protected List<SinkRecord> createByteArrayRecordsWithoutSchema(int i, long j, Set<TopicPartition> set) {
        ArrayList arrayList = new ArrayList();
        long j2 = j;
        long j3 = 0;
        while (j3 < i) {
            Iterator<TopicPartition> it = set.iterator();
            while (it.hasNext()) {
                arrayList.add(new SinkRecord("test-topic", it.next().partition(), (Schema) null, "key", (Schema) null, ("{\"schema\":{\"type\":\"struct\",\"fields\":[ {\"type\":\"boolean\",\"optional\":true,\"field\":\"booleanField\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"intField\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"longField\"},{\"type\":\"string\",\"optional\":false,\"field\":\"stringField\"}],\"payload\":{\"booleanField\":\"true\",\"intField\":" + String.valueOf(12) + ",\"longField\":" + String.valueOf(12) + ",\"stringField\":str" + String.valueOf(12) + "}}").getBytes(), j2));
                long j4 = j3 + 1;
                j3 = 0;
                if (j4 >= i) {
                    break;
                }
            }
            j2++;
        }
        return arrayList;
    }

    protected String getDirectory(String str, int i) {
        return this.partitioner.generatePartitionedPath(str, "partition=" + String.valueOf(i));
    }

    protected List<String> getExpectedFiles(long[] jArr, TopicPartition topicPartition, String str) {
        ArrayList arrayList = new ArrayList();
        for (int i = 1; i < jArr.length; i++) {
            arrayList.add(FileUtils.fileKeyToCommit(this.topicsDir, getDirectory(topicPartition.topic(), topicPartition.partition()), topicPartition, jArr[i - 1], str, ZERO_PAD_FMT));
        }
        return arrayList;
    }

    protected void verifyFileListing(long[] jArr, Set<TopicPartition> set, String str) throws IOException {
        ArrayList arrayList = new ArrayList();
        Iterator<TopicPartition> it = set.iterator();
        while (it.hasNext()) {
            arrayList.addAll(getExpectedFiles(jArr, it.next(), str));
        }
        verifyFileListing(arrayList);
    }

    protected void verifyFileListing(List<String> list) throws IOException {
        List<S3ObjectSummary> listObjects = listObjects("kafka.bucket", null, this.s3);
        ArrayList arrayList = new ArrayList();
        Iterator<S3ObjectSummary> it = listObjects.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getKey());
        }
        Collections.sort(arrayList);
        Collections.sort(list);
        Assert.assertThat(arrayList, CoreMatchers.is(list));
    }

    protected void verifyContents(List<SinkRecord> list, int i, Collection<Object> collection) throws IOException {
        Iterator<Object> it = collection.iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            Assert.assertArrayEquals((byte[]) list.get(i2).value(), (byte[]) it.next());
        }
    }

    protected void verify(List<SinkRecord> list, long[] jArr, Set<TopicPartition> set, String str) throws IOException {
        verify(list, jArr, set, str, false);
    }

    protected void verify(List<SinkRecord> list, long[] jArr, Set<TopicPartition> set, String str, boolean z) throws IOException {
        if (!z) {
            verifyFileListing(jArr, set, str);
        }
        for (TopicPartition topicPartition : set) {
            int i = 0;
            for (int i2 = 1; i2 < jArr.length; i2++) {
                long j = jArr[i2 - 1];
                long j2 = jArr[i2] - j;
                FileUtils.fileKeyToCommit(this.topicsDir, getDirectory(topicPartition.topic(), topicPartition.partition()), topicPartition, j, str, ZERO_PAD_FMT);
                verifyContents(list, i, readRecords(this.topicsDir, getDirectory(topicPartition.topic(), topicPartition.partition()), topicPartition, j, str, ZERO_PAD_FMT, "kafka.bucket", this.s3));
                i = (int) (i + j2);
            }
        }
    }
}
