package ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl;

import ch.sourcemotion.vertx.kinesis.consumer.orchestra.ErrorHandling;
import ch.sourcemotion.vertx.kinesis.consumer.orchestra.FetcherOptions;
import ch.sourcemotion.vertx.kinesis.consumer.orchestra.LoadConfiguration;
import ch.sourcemotion.vertx.kinesis.consumer.orchestra.ShardIteratorStrategy;
import ch.sourcemotion.vertx.kinesis.consumer.orchestra.VertxKinesisConsumerOrchestraException;
import ch.sourcemotion.vertx.kinesis.consumer.orchestra.VertxKinesisOrchestraOptions;
import ch.sourcemotion.vertx.kinesis.consumer.orchestra.consumer.KinesisConsumerVerticleOptions;
import ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.ConsumerControlVerticle;
import ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.EventBusAddr;
import ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.cmd.StartConsumersCmd;
import ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.cmd.StopConsumerCmd;
import ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.ext.EventBusExtKt;
import ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.ext.StandardExtKt;
import ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.kinesis.KinesisAsyncClientFactory;
import ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.redis.RedisKeyFactory;
import ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.redis.lua.LuaExecutor;
import ch.sourcemotion.vertx.kinesis.consumer.orchestra.spi.ShardStatePersistenceServiceAsync;
import ch.sourcemotion.vertx.kinesis.consumer.orchestra.spi.ShardStatePersistenceServiceFactory;
import ch.sourcemotion.vertx.redis.client.heimdall.RedisHeimdall;
import ch.sourcemotion.vertx.redis.client.heimdall.RedisHeimdallOptions;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
import io.vertx.core.shareddata.Shareable;
import io.vertx.kotlin.coroutines.CoroutineVerticle;
import io.vertx.redis.client.Redis;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import kotlin.Lazy;
import kotlin.LazyKt;
import kotlin.LazyThreadSafetyMode;
import kotlin.Metadata;
import kotlin.collections.CollectionsKt;
import kotlin.coroutines.CoroutineContext;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlinx.coroutines.BuildersKt;
import kotlinx.coroutines.CoroutineStart;
import mu.KLogging;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;

/* compiled from: ConsumerControlVerticle.kt */
@Metadata(mv = {1, 4, 2}, bv = {1, VertxKinesisOrchestraOptions.DEFAULT_FETCHER_METRICS_ENABLED, 3}, k = 1, d1 = {"��\u0088\u0001\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0010\b\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010 \n\u0002\u0018\u0002\n\u0002\b\u0004\b��\u0018�� ?2\u00020\u0001:\u0002?@B\u0005¢\u0006\u0002\u0010\u0002J\b\u0010#\u001a\u00020$H\u0002J\u0018\u0010%\u001a\u00020&2\u0006\u0010'\u001a\u00020\u00052\u0006\u0010(\u001a\u00020)H\u0002J\b\u0010*\u001a\u00020+H\u0002J\b\u0010,\u001a\u00020+H\u0002J\u0016\u0010-\u001a\u00020+2\f\u0010.\u001a\b\u0012\u0004\u0012\u0002000/H\u0002J\u0016\u00101\u001a\u00020+2\f\u0010.\u001a\b\u0012\u0004\u0012\u0002020/H\u0002J\u0011\u00103\u001a\u00020+H\u0094@ø\u0001��¢\u0006\u0002\u00104J)\u00105\u001a\u00020+2\u0006\u00106\u001a\u00020\u00062\u0006\u00107\u001a\u0002082\u0006\u00109\u001a\u00020&H\u0082@ø\u0001��¢\u0006\u0002\u0010:J)\u0010;\u001a\f\u0012\u0004\u0012\u00020\u00050<j\u0002`=*\f\u0012\u0004\u0012\u00020\u00050<j\u0002`=H\u0082@ø\u0001��¢\u0006\u0002\u0010>R*\u0010\u0003\u001a\u001e\u0012\u0004\u0012\u00020\u0005\u0012\u0004\u0012\u00020\u00060\u0004j\u000e\u0012\u0004\u0012\u00020\u0005\u0012\u0004\u0012\u00020\u0006`\u0007X\u0082\u0004¢\u0006\u0002\n��R\u001b\u0010\b\u001a\u00020\t8BX\u0082\u0084\u0002¢\u0006\f\n\u0004\b\f\u0010\r\u001a\u0004\b\n\u0010\u000bR\u001b\u0010\u000e\u001a\u00020\u000f8BX\u0082\u0084\u0002¢\u0006\f\n\u0004\b\u0012\u0010\r\u001a\u0004\b\u0010\u0010\u0011R#\u0010\u0013\u001a\n \u0015*\u0004\u0018\u00010\u00140\u00148BX\u0082\u0084\u0002¢\u0006\f\n\u0004\b\u0018\u0010\r\u001a\u0004\b\u0016\u0010\u0017R\u001b\u0010\u0019\u001a\u00020\u001a8BX\u0082\u0084\u0002¢\u0006\f\n\u0004\b\u001d\u0010\r\u001a\u0004\b\u001b\u0010\u001cR\u001b\u0010\u001e\u001a\u00020\u001f8BX\u0082\u0084\u0002¢\u0006\f\n\u0004\b\"\u0010\r\u001a\u0004\b \u0010!\u0082\u0002\u0004\n\u0002\b\u0019¨\u0006A"}, d2 = {"Lch/sourcemotion/vertx/kinesis/consumer/orchestra/impl/ConsumerControlVerticle;", "Lio/vertx/kotlin/coroutines/CoroutineVerticle;", "()V", "consumerDeploymentIds", "Ljava/util/HashMap;", "Lch/sourcemotion/vertx/kinesis/consumer/orchestra/impl/ShardId;", "", "Lkotlin/collections/HashMap;", "consumerDeploymentLock", "Lch/sourcemotion/vertx/kinesis/consumer/orchestra/impl/ConsumerDeploymentLock;", "getConsumerDeploymentLock", "()Lch/sourcemotion/vertx/kinesis/consumer/orchestra/impl/ConsumerDeploymentLock;", "consumerDeploymentLock$delegate", "Lkotlin/Lazy;", "kinesisClient", "Lsoftware/amazon/awssdk/services/kinesis/KinesisAsyncClient;", "getKinesisClient", "()Lsoftware/amazon/awssdk/services/kinesis/KinesisAsyncClient;", "kinesisClient$delegate", "options", "Lch/sourcemotion/vertx/kinesis/consumer/orchestra/impl/ConsumerControlVerticle$Options;", "kotlin.jvm.PlatformType", "getOptions", "()Lch/sourcemotion/vertx/kinesis/consumer/orchestra/impl/ConsumerControlVerticle$Options;", "options$delegate", "redis", "Lio/vertx/redis/client/Redis;", "getRedis", "()Lio/vertx/redis/client/Redis;", "redis$delegate", "shardStatePersistence", "Lch/sourcemotion/vertx/kinesis/consumer/orchestra/spi/ShardStatePersistenceServiceAsync;", "getShardStatePersistence", "()Lch/sourcemotion/vertx/kinesis/consumer/orchestra/spi/ShardStatePersistenceServiceAsync;", "shardStatePersistence$delegate", "consumerCapacity", "", "createConsumerVerticleOptions", "Lch/sourcemotion/vertx/kinesis/consumer/orchestra/consumer/KinesisConsumerVerticleOptions;", "shardId", "shardIteratorStrategy", "Lch/sourcemotion/vertx/kinesis/consumer/orchestra/ShardIteratorStrategy;", "logAndNotifyAboutActiveConsumers", "", "notifyAboutActiveConsumers", "onStartConsumersCmd", "msg", "Lio/vertx/core/eventbus/Message;", "Lch/sourcemotion/vertx/kinesis/consumer/orchestra/impl/cmd/StartConsumersCmd;", "onStopConsumerCmd", "Lch/sourcemotion/vertx/kinesis/consumer/orchestra/impl/cmd/StopConsumerCmd;", "start", "(Lkotlin/coroutines/Continuation;)Ljava/lang/Object;", "startConsumer", "consumerVerticleClass", "consumerVerticleConfig", "Lio/vertx/core/json/JsonObject;", "consumerOptions", "(Ljava/lang/String;Lio/vertx/core/json/JsonObject;Lch/sourcemotion/vertx/kinesis/consumer/orchestra/consumer/KinesisConsumerVerticleOptions;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;", "filterUnavailableShardIds", "", "Lch/sourcemotion/vertx/kinesis/consumer/orchestra/impl/ShardIdList;", "(Ljava/util/List;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;", "Companion", "Options", "vertx-kinesis-consumer-orchestra"})
/* loaded from: input_file:ch/sourcemotion/vertx/kinesis/consumer/orchestra/impl/ConsumerControlVerticle.class */
public final class ConsumerControlVerticle extends CoroutineVerticle {
    private final HashMap<ShardId, String> consumerDeploymentIds = new HashMap<>();
    private final Lazy options$delegate = LazyKt.lazy(LazyThreadSafetyMode.NONE, new Function0<Options>() { // from class: ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.ConsumerControlVerticle$options$2
        public final ConsumerControlVerticle.Options invoke() {
            JsonObject config;
            config = ConsumerControlVerticle.this.getConfig();
            return (ConsumerControlVerticle.Options) config.mapTo(ConsumerControlVerticle.Options.class);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        {
            super(0);
        }
    });
    private final Lazy redis$delegate = LazyKt.lazy(LazyThreadSafetyMode.NONE, new Function0<RedisHeimdall>() { // from class: ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.ConsumerControlVerticle$redis$2
        @NotNull
        public final RedisHeimdall invoke() {
            ConsumerControlVerticle.Options options;
            RedisHeimdall.Companion companion = RedisHeimdall.Companion;
            Vertx vertx = ConsumerControlVerticle.this.getVertx();
            options = ConsumerControlVerticle.this.getOptions();
            return companion.createLight(vertx, options.getRedisHeimdallOptions());
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        {
            super(0);
        }
    });
    private final Lazy shardStatePersistence$delegate = LazyKt.lazy(LazyThreadSafetyMode.NONE, new Function0<ShardStatePersistenceServiceAsync>() { // from class: ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.ConsumerControlVerticle$shardStatePersistence$2
        @NotNull
        public final ShardStatePersistenceServiceAsync invoke() {
            return ShardStatePersistenceServiceFactory.INSTANCE.createAsyncShardStatePersistenceService$vertx_kinesis_consumer_orchestra(ConsumerControlVerticle.this.getVertx());
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        {
            super(0);
        }
    });
    private final Lazy kinesisClient$delegate = LazyKt.lazy(LazyThreadSafetyMode.NONE, new Function0<KinesisAsyncClient>() { // from class: ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.ConsumerControlVerticle$kinesisClient$2
        @NotNull
        public final KinesisAsyncClient invoke() {
            Context context;
            Shareable shareable = (Shareable) SharedData.INSTANCE.getLocalSharedMap(ConsumerControlVerticle.this.getVertx()).get(KinesisAsyncClientFactory.SHARED_DATA_REF);
            if (shareable == null) {
                throw new VertxKinesisConsumerOrchestraException("No shared instance of " + KinesisAsyncClientFactory.class.getName() + " under reference: \"" + KinesisAsyncClientFactory.SHARED_DATA_REF + "\" found", null, 2, null);
            }
            Intrinsics.checkNotNullExpressionValue(shareable, "getLocalSharedMap<T>(ver…ce\\\" found\"\n            )");
            context = ConsumerControlVerticle.this.getContext();
            return ((KinesisAsyncClientFactory) shareable).createKinesisAsyncClient(context);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        {
            super(0);
        }
    });
    private final Lazy consumerDeploymentLock$delegate = LazyKt.lazy(LazyThreadSafetyMode.NONE, new Function0<ConsumerDeploymentLock>() { // from class: ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.ConsumerControlVerticle$consumerDeploymentLock$2
        @NotNull
        public final ConsumerDeploymentLock invoke() {
            Redis redis;
            Redis redis2;
            ConsumerControlVerticle.Options options;
            ConsumerControlVerticle.Options options2;
            ConsumerControlVerticle.Options options3;
            redis = ConsumerControlVerticle.this.getRedis();
            redis2 = ConsumerControlVerticle.this.getRedis();
            LuaExecutor luaExecutor = new LuaExecutor(redis2);
            options = ConsumerControlVerticle.this.getOptions();
            RedisKeyFactory redisKeyFactory = new RedisKeyFactory(options.getClusterName());
            options2 = ConsumerControlVerticle.this.getOptions();
            Duration ofMillis = Duration.ofMillis(options2.getConsumerDeploymentLockExpiration());
            Intrinsics.checkNotNullExpressionValue(ofMillis, "Duration.ofMillis(option…DeploymentLockExpiration)");
            options3 = ConsumerControlVerticle.this.getOptions();
            Duration ofMillis2 = Duration.ofMillis(options3.getConsumerDeploymentLockRetryInterval());
            Intrinsics.checkNotNullExpressionValue(ofMillis2, "Duration.ofMillis(option…loymentLockRetryInterval)");
            return new ConsumerDeploymentLock(redis, luaExecutor, redisKeyFactory, ofMillis, ofMillis2);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        {
            super(0);
        }
    });

    @NotNull
    private static final Companion Companion = new Companion(null);

    /* JADX INFO: Access modifiers changed from: private */
    /* compiled from: ConsumerControlVerticle.kt */
    @Metadata(mv = {1, 4, 2}, bv = {1, VertxKinesisOrchestraOptions.DEFAULT_FETCHER_METRICS_ENABLED, 3}, k = 1, d1 = {"��\f\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\b\u0082\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002¨\u0006\u0003"}, d2 = {"Lch/sourcemotion/vertx/kinesis/consumer/orchestra/impl/ConsumerControlVerticle$Companion;", "Lmu/KLogging;", "()V", "vertx-kinesis-consumer-orchestra"})
    /* loaded from: input_file:ch/sourcemotion/vertx/kinesis/consumer/orchestra/impl/ConsumerControlVerticle$Companion.class */
    public static final class Companion extends KLogging {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    /* compiled from: ConsumerControlVerticle.kt */
    @JsonIgnoreProperties(ignoreUnknown = true)
    @Metadata(mv = {1, 4, 2}, bv = {1, VertxKinesisOrchestraOptions.DEFAULT_FETCHER_METRICS_ENABLED, 3}, k = 1, d1 = {"��D\n\u0002\u0018\u0002\n\u0002\u0010��\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\t\n\u0002\b\u0002\n\u0002\u0010\u000e\n��\n\u0002\u0010$\n\u0002\b\u0019\b\u0001\u0018��2\u00020\u0001Bu\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\u0006\u0010\b\u001a\u00020\t\u0012\u0006\u0010\n\u001a\u00020\u000b\u0012\u0006\u0010\f\u001a\u00020\r\u0012\u0006\u0010\u000e\u001a\u00020\u000f\u0012\u0006\u0010\u0010\u001a\u00020\u000f\u0012\u0006\u0010\u0011\u001a\u00020\u0012\u0012\u0012\u0010\u0013\u001a\u000e\u0012\u0004\u0012\u00020\u0012\u0012\u0004\u0012\u00020\u00010\u0014\u0012\n\b\u0002\u0010\u0015\u001a\u0004\u0018\u00010\u0012\u0012\u0006\u0010\u0016\u001a\u00020\u000f¢\u0006\u0002\u0010\u0017R\u0011\u0010\u0002\u001a\u00020\u0003¢\u0006\b\n��\u001a\u0004\b\u0018\u0010\u0019R\u0011\u0010\u000e\u001a\u00020\u000f¢\u0006\b\n��\u001a\u0004\b\u001a\u0010\u001bR\u0011\u0010\u0010\u001a\u00020\u000f¢\u0006\b\n��\u001a\u0004\b\u001c\u0010\u001bR\u0011\u0010\u0011\u001a\u00020\u0012¢\u0006\b\n��\u001a\u0004\b\u001d\u0010\u001eR\u001d\u0010\u0013\u001a\u000e\u0012\u0004\u0012\u00020\u0012\u0012\u0004\u0012\u00020\u00010\u0014¢\u0006\b\n��\u001a\u0004\b\u001f\u0010 R\u0011\u0010\f\u001a\u00020\r¢\u0006\b\n��\u001a\u0004\b!\u0010\"R\u0011\u0010\u0006\u001a\u00020\u0007¢\u0006\b\n��\u001a\u0004\b#\u0010$R\u0011\u0010\n\u001a\u00020\u000b¢\u0006\b\n��\u001a\u0004\b%\u0010&R\u0011\u0010\u0004\u001a\u00020\u0005¢\u0006\b\n��\u001a\u0004\b'\u0010(R\u0013\u0010\u0015\u001a\u0004\u0018\u00010\u0012¢\u0006\b\n��\u001a\u0004\b)\u0010\u001eR\u0011\u0010\b\u001a\u00020\t¢\u0006\b\n��\u001a\u0004\b*\u0010+R\u0011\u0010\u0016\u001a\u00020\u000f¢\u0006\b\n��\u001a\u0004\b,\u0010\u001b¨\u0006-"}, d2 = {"Lch/sourcemotion/vertx/kinesis/consumer/orchestra/impl/ConsumerControlVerticle$Options;", "", "clusterName", "Lch/sourcemotion/vertx/kinesis/consumer/orchestra/impl/OrchestraClusterName;", "redisHeimdallOptions", "Lch/sourcemotion/vertx/redis/client/heimdall/RedisHeimdallOptions;", "fetcherOptions", "Lch/sourcemotion/vertx/kinesis/consumer/orchestra/FetcherOptions;", "shardIteratorStrategy", "Lch/sourcemotion/vertx/kinesis/consumer/orchestra/ShardIteratorStrategy;", "loadConfiguration", "Lch/sourcemotion/vertx/kinesis/consumer/orchestra/LoadConfiguration;", "errorHandling", "Lch/sourcemotion/vertx/kinesis/consumer/orchestra/ErrorHandling;", "consumerDeploymentLockExpiration", "", "consumerDeploymentLockRetryInterval", "consumerVerticleClass", "", "consumerVerticleConfig", "", "sequenceNumberImportAddress", "shardProgressExpirationMillis", "(Lch/sourcemotion/vertx/kinesis/consumer/orchestra/impl/OrchestraClusterName;Lch/sourcemotion/vertx/redis/client/heimdall/RedisHeimdallOptions;Lch/sourcemotion/vertx/kinesis/consumer/orchestra/FetcherOptions;Lch/sourcemotion/vertx/kinesis/consumer/orchestra/ShardIteratorStrategy;Lch/sourcemotion/vertx/kinesis/consumer/orchestra/LoadConfiguration;Lch/sourcemotion/vertx/kinesis/consumer/orchestra/ErrorHandling;JJLjava/lang/String;Ljava/util/Map;Ljava/lang/String;J)V", "getClusterName", "()Lch/sourcemotion/vertx/kinesis/consumer/orchestra/impl/OrchestraClusterName;", "getConsumerDeploymentLockExpiration", "()J", "getConsumerDeploymentLockRetryInterval", "getConsumerVerticleClass", "()Ljava/lang/String;", "getConsumerVerticleConfig", "()Ljava/util/Map;", "getErrorHandling", "()Lch/sourcemotion/vertx/kinesis/consumer/orchestra/ErrorHandling;", "getFetcherOptions", "()Lch/sourcemotion/vertx/kinesis/consumer/orchestra/FetcherOptions;", "getLoadConfiguration", "()Lch/sourcemotion/vertx/kinesis/consumer/orchestra/LoadConfiguration;", "getRedisHeimdallOptions", "()Lch/sourcemotion/vertx/redis/client/heimdall/RedisHeimdallOptions;", "getSequenceNumberImportAddress", "getShardIteratorStrategy", "()Lch/sourcemotion/vertx/kinesis/consumer/orchestra/ShardIteratorStrategy;", "getShardProgressExpirationMillis", "vertx-kinesis-consumer-orchestra"})
    /* loaded from: input_file:ch/sourcemotion/vertx/kinesis/consumer/orchestra/impl/ConsumerControlVerticle$Options.class */
    public static final class Options {

        @NotNull
        private final OrchestraClusterName clusterName;

        @NotNull
        private final RedisHeimdallOptions redisHeimdallOptions;

        @NotNull
        private final FetcherOptions fetcherOptions;

        @NotNull
        private final ShardIteratorStrategy shardIteratorStrategy;

        @NotNull
        private final LoadConfiguration loadConfiguration;

        @NotNull
        private final ErrorHandling errorHandling;
        private final long consumerDeploymentLockExpiration;
        private final long consumerDeploymentLockRetryInterval;

        @NotNull
        private final String consumerVerticleClass;

        @NotNull
        private final Map<String, Object> consumerVerticleConfig;

        @Nullable
        private final String sequenceNumberImportAddress;
        private final long shardProgressExpirationMillis;

        @NotNull
        public final OrchestraClusterName getClusterName() {
            return this.clusterName;
        }

        @NotNull
        public final RedisHeimdallOptions getRedisHeimdallOptions() {
            return this.redisHeimdallOptions;
        }

        @NotNull
        public final FetcherOptions getFetcherOptions() {
            return this.fetcherOptions;
        }

        @NotNull
        public final ShardIteratorStrategy getShardIteratorStrategy() {
            return this.shardIteratorStrategy;
        }

        @NotNull
        public final LoadConfiguration getLoadConfiguration() {
            return this.loadConfiguration;
        }

        @NotNull
        public final ErrorHandling getErrorHandling() {
            return this.errorHandling;
        }

        public final long getConsumerDeploymentLockExpiration() {
            return this.consumerDeploymentLockExpiration;
        }

        public final long getConsumerDeploymentLockRetryInterval() {
            return this.consumerDeploymentLockRetryInterval;
        }

        @NotNull
        public final String getConsumerVerticleClass() {
            return this.consumerVerticleClass;
        }

        @NotNull
        public final Map<String, Object> getConsumerVerticleConfig() {
            return this.consumerVerticleConfig;
        }

        @Nullable
        public final String getSequenceNumberImportAddress() {
            return this.sequenceNumberImportAddress;
        }

        public final long getShardProgressExpirationMillis() {
            return this.shardProgressExpirationMillis;
        }

        public Options(@NotNull OrchestraClusterName orchestraClusterName, @NotNull RedisHeimdallOptions redisHeimdallOptions, @NotNull FetcherOptions fetcherOptions, @NotNull ShardIteratorStrategy shardIteratorStrategy, @NotNull LoadConfiguration loadConfiguration, @NotNull ErrorHandling errorHandling, long j, long j2, @NotNull String str, @NotNull Map<String, ? extends Object> map, @Nullable String str2, long j3) {
            Intrinsics.checkNotNullParameter(orchestraClusterName, "clusterName");
            Intrinsics.checkNotNullParameter(redisHeimdallOptions, "redisHeimdallOptions");
            Intrinsics.checkNotNullParameter(fetcherOptions, "fetcherOptions");
            Intrinsics.checkNotNullParameter(shardIteratorStrategy, "shardIteratorStrategy");
            Intrinsics.checkNotNullParameter(loadConfiguration, "loadConfiguration");
            Intrinsics.checkNotNullParameter(errorHandling, "errorHandling");
            Intrinsics.checkNotNullParameter(str, "consumerVerticleClass");
            Intrinsics.checkNotNullParameter(map, "consumerVerticleConfig");
            this.clusterName = orchestraClusterName;
            this.redisHeimdallOptions = redisHeimdallOptions;
            this.fetcherOptions = fetcherOptions;
            this.shardIteratorStrategy = shardIteratorStrategy;
            this.loadConfiguration = loadConfiguration;
            this.errorHandling = errorHandling;
            this.consumerDeploymentLockExpiration = j;
            this.consumerDeploymentLockRetryInterval = j2;
            this.consumerVerticleClass = str;
            this.consumerVerticleConfig = map;
            this.sequenceNumberImportAddress = str2;
            this.shardProgressExpirationMillis = j3;
        }

        public /* synthetic */ Options(OrchestraClusterName orchestraClusterName, RedisHeimdallOptions redisHeimdallOptions, FetcherOptions fetcherOptions, ShardIteratorStrategy shardIteratorStrategy, LoadConfiguration loadConfiguration, ErrorHandling errorHandling, long j, long j2, String str, Map map, String str2, long j3, int i, DefaultConstructorMarker defaultConstructorMarker) {
            this(orchestraClusterName, redisHeimdallOptions, fetcherOptions, shardIteratorStrategy, loadConfiguration, errorHandling, j, j2, str, map, (i & 1024) != 0 ? (String) null : str2, j3);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final Options getOptions() {
        return (Options) this.options$delegate.getValue();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final Redis getRedis() {
        return (Redis) this.redis$delegate.getValue();
    }

    private final ShardStatePersistenceServiceAsync getShardStatePersistence() {
        return (ShardStatePersistenceServiceAsync) this.shardStatePersistence$delegate.getValue();
    }

    private final KinesisAsyncClient getKinesisClient() {
        return (KinesisAsyncClient) this.kinesisClient$delegate.getValue();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final ConsumerDeploymentLock getConsumerDeploymentLock() {
        return (ConsumerDeploymentLock) this.consumerDeploymentLock$delegate.getValue();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Failed to find 'out' block for switch in B:7:0x0042. Please report as an issue. */
    /* JADX WARN: Removed duplicated region for block: B:15:0x010e  */
    /* JADX WARN: Removed duplicated region for block: B:19:0x00b1  */
    /* JADX WARN: Removed duplicated region for block: B:20:0x0111  */
    /* JADX WARN: Removed duplicated region for block: B:21:0x0154  */
    /* JADX WARN: Removed duplicated region for block: B:8:0x005c  */
    @org.jetbrains.annotations.Nullable
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public java.lang.Object start(@org.jetbrains.annotations.NotNull kotlin.coroutines.Continuation<? super kotlin.Unit> r7) {
        /*
            Method dump skipped, instructions count: 350
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.ConsumerControlVerticle.start(kotlin.coroutines.Continuation):java.lang.Object");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void onStopConsumerCmd(final Message<StopConsumerCmd> message) {
        final ShardId shardId = ((StopConsumerCmd) message.body()).getShardId();
        String remove = this.consumerDeploymentIds.remove(shardId);
        if (StandardExtKt.isNotNullOrBlank(remove)) {
            getVertx().undeploy(remove, new Handler<AsyncResult<Void>>() { // from class: ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.ConsumerControlVerticle$onStopConsumerCmd$1
                public final void handle(AsyncResult<Void> asyncResult) {
                    ConsumerControlVerticle.Companion companion;
                    if (!asyncResult.succeeded()) {
                        message.fail(2, asyncResult.cause().getMessage());
                        return;
                    }
                    companion = ConsumerControlVerticle.Companion;
                    companion.getLogger().info(new Function0<Object>() { // from class: ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.ConsumerControlVerticle$onStopConsumerCmd$1.1
                        @Nullable
                        public final Object invoke() {
                            return "Consumer for shard \"" + ShardId.this + "\" stopped";
                        }

                        {
                            super(0);
                        }
                    });
                    EventBusExtKt.ack(message);
                }
            });
        } else {
            Companion.getLogger().info(new Function0<Object>() { // from class: ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.ConsumerControlVerticle$onStopConsumerCmd$2
                @Nullable
                public final Object invoke() {
                    return "Unable to stop consumer for shard \"" + ShardId.this + "\", because no known consumer for this shard";
                }

                /* JADX INFO: Access modifiers changed from: package-private */
                {
                    super(0);
                }
            });
            message.fail(1, "Unknown consumer");
        }
        logAndNotifyAboutActiveConsumers();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void onStartConsumersCmd(Message<StartConsumersCmd> message) {
        final StartConsumersCmd startConsumersCmd = (StartConsumersCmd) message.body();
        if (startConsumersCmd.getShardIds().isEmpty()) {
            EventBusExtKt.ack(message);
        } else {
            if (consumerCapacity() > 0) {
                BuildersKt.launch$default(this, (CoroutineContext) null, (CoroutineStart) null, new ConsumerControlVerticle$onStartConsumersCmd$2(this, startConsumersCmd, message, null), 3, (Object) null);
                return;
            }
            Companion.getLogger().warn(new Function0<Object>() { // from class: ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.ConsumerControlVerticle$onStartConsumersCmd$1
                @Nullable
                public final Object invoke() {
                    return "Consumer control unable to start consumer(s) for shards " + CollectionsKt.joinToString$default(StartConsumersCmd.this.getShardIds(), (CharSequence) null, (CharSequence) null, (CharSequence) null, 0, (CharSequence) null, (Function1) null, 63, (Object) null) + " because no consumer capacity left";
                }

                /* JADX INFO: Access modifiers changed from: package-private */
                {
                    super(0);
                }
            });
            logAndNotifyAboutActiveConsumers();
            message.fail(1, "No consumer capacity left");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Can't wrap try/catch for region: R(13:1|(2:3|(11:5|6|7|8|16|17|(1:19)|20|(1:22)|23|24))|32|6|7|8|16|17|(0)|20|(0)|23|24) */
    /* JADX WARN: Code restructure failed: missing block: B:30:0x00e4, code lost:
    
        r25 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:31:0x00e6, code lost:
    
        r0 = kotlin.Result.Companion;
        r24 = kotlin.Result.constructor-impl(kotlin.ResultKt.createFailure(r25));
     */
    /* JADX WARN: Removed duplicated region for block: B:19:0x010a  */
    /* JADX WARN: Removed duplicated region for block: B:22:0x0152  */
    /* JADX WARN: Removed duplicated region for block: B:26:0x00b5  */
    /* JADX WARN: Removed duplicated region for block: B:28:0x018a  */
    /* JADX WARN: Removed duplicated region for block: B:9:0x005c  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public final /* synthetic */ java.lang.Object startConsumer(java.lang.String r18, io.vertx.core.json.JsonObject r19, ch.sourcemotion.vertx.kinesis.consumer.orchestra.consumer.KinesisConsumerVerticleOptions r20, kotlin.coroutines.Continuation<? super kotlin.Unit> r21) {
        /*
            Method dump skipped, instructions count: 404
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.ConsumerControlVerticle.startConsumer(java.lang.String, io.vertx.core.json.JsonObject, ch.sourcemotion.vertx.kinesis.consumer.orchestra.consumer.KinesisConsumerVerticleOptions, kotlin.coroutines.Continuation):java.lang.Object");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void logAndNotifyAboutActiveConsumers() {
        if (this.consumerDeploymentIds.keySet().isEmpty()) {
            Companion.getLogger().info(new Function0<Object>() { // from class: ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.ConsumerControlVerticle$logAndNotifyAboutActiveConsumers$1
                @Nullable
                public final Object invoke() {
                    ConsumerControlVerticle.Options options;
                    int consumerCapacity;
                    StringBuilder append = new StringBuilder().append("Currently no shards get consumed on stream \"");
                    options = ConsumerControlVerticle.this.getOptions();
                    StringBuilder append2 = append.append(options.getClusterName().getStreamName()).append("\". ").append("This VKCO instance has to capacity to consume ");
                    consumerCapacity = ConsumerControlVerticle.this.consumerCapacity();
                    return append2.append(consumerCapacity).append(" shards").toString();
                }

                /* JADX INFO: Access modifiers changed from: package-private */
                {
                    super(0);
                }
            });
        } else {
            Companion.getLogger().info(new Function0<Object>() { // from class: ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.ConsumerControlVerticle$logAndNotifyAboutActiveConsumers$2
                @Nullable
                public final Object invoke() {
                    HashMap hashMap;
                    ConsumerControlVerticle.Options options;
                    int consumerCapacity;
                    StringBuilder append = new StringBuilder().append("Currently the shards ");
                    hashMap = ConsumerControlVerticle.this.consumerDeploymentIds;
                    Set keySet = hashMap.keySet();
                    Intrinsics.checkNotNullExpressionValue(keySet, "consumerDeploymentIds.keys");
                    StringBuilder append2 = append.append(CollectionsKt.joinToString$default(keySet, (CharSequence) null, (CharSequence) null, (CharSequence) null, 0, (CharSequence) null, (Function1) null, 63, (Object) null)).append(' ').append("are consumed on stream \"");
                    options = ConsumerControlVerticle.this.getOptions();
                    StringBuilder append3 = append2.append(options.getClusterName().getStreamName()).append("\". ").append("This VKCO instance has to capacity to consume ");
                    consumerCapacity = ConsumerControlVerticle.this.consumerCapacity();
                    return append3.append(consumerCapacity).append(" further shards").toString();
                }

                /* JADX INFO: Access modifiers changed from: package-private */
                {
                    super(0);
                }
            });
        }
        notifyAboutActiveConsumers();
    }

    private final void notifyAboutActiveConsumers() {
        EventBus eventBus = getVertx().eventBus();
        EventBusAddr.INSTANCE.getDetection();
        eventBus.send(EventBusAddr.Detection.consumedShardCountNotification, Integer.valueOf(this.consumerDeploymentIds.size()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Failed to find 'out' block for switch in B:7:0x0043. Please report as an issue. */
    /* JADX WARN: Removed duplicated region for block: B:16:0x00e1 A[LOOP:0: B:14:0x00d7->B:16:0x00e1, LOOP_END] */
    /* JADX WARN: Removed duplicated region for block: B:20:0x013b  */
    /* JADX WARN: Removed duplicated region for block: B:24:0x018e  */
    /* JADX WARN: Removed duplicated region for block: B:29:0x01e8  */
    /* JADX WARN: Removed duplicated region for block: B:39:0x0093  */
    /* JADX WARN: Removed duplicated region for block: B:40:0x013e  */
    /* JADX WARN: Removed duplicated region for block: B:41:0x0191  */
    /* JADX WARN: Removed duplicated region for block: B:42:0x0221  */
    /* JADX WARN: Removed duplicated region for block: B:8:0x0060  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public final /* synthetic */ java.lang.Object filterUnavailableShardIds(java.util.List<ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.ShardId> r7, kotlin.coroutines.Continuation<? super java.util.List<ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.ShardId>> r8) {
        /*
            Method dump skipped, instructions count: 555
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.ConsumerControlVerticle.filterUnavailableShardIds(java.util.List, kotlin.coroutines.Continuation):java.lang.Object");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final int consumerCapacity() {
        return getOptions().getLoadConfiguration().getMaxShardsCount() - this.consumerDeploymentIds.size();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final KinesisConsumerVerticleOptions createConsumerVerticleOptions(ShardId shardId, ShardIteratorStrategy shardIteratorStrategy) {
        return new KinesisConsumerVerticleOptions(shardId, getOptions().getClusterName(), shardIteratorStrategy, getOptions().getErrorHandling(), getOptions().getSequenceNumberImportAddress(), getOptions().getShardProgressExpirationMillis(), getOptions().getFetcherOptions());
    }
}
