package software.amazon.kinesis.connectors.flink.util;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import software.amazon.kinesis.connectors.flink.FlinkKinesisException;
import software.amazon.kinesis.connectors.flink.config.ConsumerConfigConstants;
import software.amazon.kinesis.connectors.flink.internals.publisher.fanout.FanOutRecordPublisherConfiguration;
import software.amazon.kinesis.connectors.flink.internals.publisher.fanout.StreamConsumerRegistrar;
import software.amazon.kinesis.connectors.flink.proxy.FullJitterBackoff;
import software.amazon.kinesis.connectors.flink.proxy.KinesisProxyV2Factory;

@Internal
/* loaded from: input_file:software/amazon/kinesis/connectors/flink/util/StreamConsumerRegistrarUtil.class */
public class StreamConsumerRegistrarUtil {

    @Internal
    /* loaded from: input_file:software/amazon/kinesis/connectors/flink/util/StreamConsumerRegistrarUtil$FlinkKinesisStreamConsumerRegistrarException.class */
    public static class FlinkKinesisStreamConsumerRegistrarException extends FlinkKinesisException {
        public FlinkKinesisStreamConsumerRegistrarException(String str, Throwable th) {
            super(str, th);
        }
    }

    public static void eagerlyRegisterStreamConsumers(Properties properties, List<String> list) {
        if (AwsV2Util.isUsingEfoRecordPublisher(properties) && AwsV2Util.isEagerEfoRegistrationType(properties)) {
            registerStreamConsumers(properties, list);
        }
    }

    public static void lazilyRegisterStreamConsumers(Properties properties, List<String> list) {
        if (AwsV2Util.isUsingEfoRecordPublisher(properties) && AwsV2Util.isLazyEfoRegistrationType(properties)) {
            registerStreamConsumers(properties, list);
        }
    }

    public static void deregisterStreamConsumers(Properties properties, List<String> list) {
        if (!AwsV2Util.isUsingEfoRecordPublisher(properties) || AwsV2Util.isNoneEfoRegistrationType(properties)) {
            return;
        }
        StreamConsumerRegistrar createStreamConsumerRegistrar = createStreamConsumerRegistrar(properties, list);
        try {
            deregisterStreamConsumers(createStreamConsumerRegistrar, properties, list);
        } finally {
            createStreamConsumerRegistrar.close();
        }
    }

    private static void registerStreamConsumers(Properties properties, List<String> list) {
        StreamConsumerRegistrar createStreamConsumerRegistrar = createStreamConsumerRegistrar(properties, list);
        try {
            registerStreamConsumers(createStreamConsumerRegistrar, properties, list);
        } finally {
            createStreamConsumerRegistrar.close();
        }
    }

    @VisibleForTesting
    static void registerStreamConsumers(StreamConsumerRegistrar streamConsumerRegistrar, Properties properties, List<String> list) {
        String property = properties.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
        for (String str : list) {
            try {
                properties.setProperty(ConsumerConfigConstants.efoConsumerArn(str), streamConsumerRegistrar.registerStreamConsumer(str, property));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new FlinkKinesisStreamConsumerRegistrarException("Error registering stream: " + str, e);
            } catch (ExecutionException e2) {
                throw new FlinkKinesisStreamConsumerRegistrarException("Error registering stream: " + str, e2);
            }
        }
    }

    @VisibleForTesting
    static void deregisterStreamConsumers(StreamConsumerRegistrar streamConsumerRegistrar, Properties properties, List<String> list) {
        if (!AwsV2Util.isUsingEfoRecordPublisher(properties) || AwsV2Util.isNoneEfoRegistrationType(properties)) {
            return;
        }
        for (String str : list) {
            try {
                streamConsumerRegistrar.deregisterStreamConsumer(str);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new FlinkKinesisStreamConsumerRegistrarException("Error registering stream: " + str, e);
            } catch (ExecutionException e2) {
                throw new FlinkKinesisStreamConsumerRegistrarException("Error deregistering stream: " + str, e2);
            }
        }
    }

    private static StreamConsumerRegistrar createStreamConsumerRegistrar(Properties properties, List<String> list) {
        FullJitterBackoff fullJitterBackoff = new FullJitterBackoff();
        return new StreamConsumerRegistrar(KinesisProxyV2Factory.createKinesisProxyV2(properties), new FanOutRecordPublisherConfiguration(properties, list), fullJitterBackoff);
    }
}
