package io.confluent.connect.s3.integration;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import io.confluent.common.utils.IntegrationTest;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/connect/s3/integration/BaseConnectorIT.class */
public abstract class BaseConnectorIT {
    protected static final int MAX_TASKS = 3;
    protected static AmazonS3 S3Client;
    protected EmbeddedConnectCluster connect;
    protected Map<String, String> props;
    private static final Logger log = LoggerFactory.getLogger(BaseConnectorIT.class);
    private static final long CONNECTOR_STARTUP_DURATION_MS = TimeUnit.MINUTES.toMillis(1);
    private static final long S3_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(60);
    protected static final String TEST_BUCKET_NAME = "connect-s3-integration-testing-" + System.currentTimeMillis();

    @Before
    public void setup() {
        startConnect();
    }

    @After
    public void close() {
        this.connect.stop();
    }

    protected void startConnect() {
        this.connect = new EmbeddedConnectCluster.Builder().name("s3-connect-cluster").build();
        this.connect.start();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long waitForConnectorToStart(String str, int i) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            return assertConnectorAndTasksRunning(str, i).orElse(false).booleanValue();
        }, CONNECTOR_STARTUP_DURATION_MS, "Connector tasks did not start in time.");
        return System.currentTimeMillis();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long waitForFilesInBucket(String str, int i) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            return assertFileCountInBucket(str, i).orElse(false).booleanValue();
        }, S3_TIMEOUT_MS, "Files not written to S3 bucket in time.");
        return System.currentTimeMillis();
    }

    protected Optional<Boolean> assertConnectorAndTasksRunning(String str, int i) {
        try {
            ConnectorStateInfo connectorStatus = this.connect.connectorStatus(str);
            return Optional.of(Boolean.valueOf(connectorStatus != null && connectorStatus.tasks().size() >= i && connectorStatus.connector().state().equals(AbstractStatus.State.RUNNING.toString()) && connectorStatus.tasks().stream().allMatch(taskState -> {
                return taskState.state().equals(AbstractStatus.State.RUNNING.toString());
            })));
        } catch (Exception e) {
            log.warn("Could not check connector state info.");
            return Optional.empty();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Optional<Boolean> assertFileCountInBucket(String str, int i) {
        try {
            return Optional.of(Boolean.valueOf(getBucketFileCount(str) == i));
        } catch (Exception e) {
            log.warn("Could not check file count in bucket: {}", str);
            return Optional.empty();
        }
    }

    private int getBucketFileCount(String str) {
        ListObjectsV2Result listObjectsV2;
        int i = 0;
        ListObjectsV2Request withBucketName = new ListObjectsV2Request().withBucketName(str);
        do {
            listObjectsV2 = S3Client.listObjectsV2(withBucketName);
            i += listObjectsV2.getKeyCount();
            withBucketName.setContinuationToken(listObjectsV2.getNextContinuationToken());
        } while (listObjectsV2.isTruncated());
        return i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<String> getExpectedFilenames(String str, int i, int i2, long j, String str2) {
        int i3 = ((int) j) / i2;
        ArrayList arrayList = new ArrayList();
        int i4 = 0;
        while (true) {
            int i5 = i4;
            if (i5 >= i3 * i2) {
                return arrayList;
            }
            arrayList.add(String.format("topics/%s/partition=%d/%s+%d+%010d.%s", str, Integer.valueOf(i), str, Integer.valueOf(i), Integer.valueOf(i5), str2));
            i4 = i5 + i2;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean fileNamesValid(String str, List<String> list) {
        return list.equals(getBucketFileNames(str));
    }

    private List<String> getBucketFileNames(String str) {
        ListObjectsV2Result listObjectsV2;
        ArrayList arrayList = new ArrayList();
        ListObjectsV2Request withBucketName = new ListObjectsV2Request().withBucketName(str);
        do {
            listObjectsV2 = S3Client.listObjectsV2(withBucketName);
            Iterator it = listObjectsV2.getObjectSummaries().iterator();
            while (it.hasNext()) {
                arrayList.add(((S3ObjectSummary) it.next()).getKey());
            }
            withBucketName.setContinuationToken(listObjectsV2.getNextContinuationToken());
        } while (listObjectsV2.isTruncated());
        return arrayList;
    }
}
