package ch.sourcemotion.vertx.kinesis.consumer.orchestra.consumer.fetching;

import ch.sourcemotion.vertx.kinesis.consumer.orchestra.EnhancedFanOutOptions;
import ch.sourcemotion.vertx.kinesis.consumer.orchestra.FetcherOptions;
import ch.sourcemotion.vertx.kinesis.consumer.orchestra.VertxKinesisConsumerOrchestraException;
import ch.sourcemotion.vertx.kinesis.consumer.orchestra.VertxKinesisOrchestraOptions;
import ch.sourcemotion.vertx.kinesis.consumer.orchestra.consumer.FetchPosition;
import ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.OrchestraClusterName;
import ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.ShardId;
import ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.SharedData;
import ch.sourcemotion.vertx.kinesis.consumer.orchestra.impl.kinesis.NettyKinesisAsyncClientFactory;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Tags;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.shareddata.Shareable;
import io.vertx.micrometer.backends.BackendRegistries;
import kotlin.Metadata;
import kotlin.Result;
import kotlin.ResultKt;
import kotlin.Unit;
import kotlin.coroutines.Continuation;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.internal.Intrinsics;
import kotlinx.coroutines.CoroutineScope;
import mu.KLogging;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;

/* compiled from: Fetcher.kt */
@Metadata(mv = {1, 4, 2}, bv = {1, VertxKinesisOrchestraOptions.DEFAULT_FETCHER_METRICS_ENABLED, 3}, k = 1, d1 = {"��\u001a\n\u0002\u0018\u0002\n\u0002\u0010��\n��\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\u0002\n\u0002\b\u0004\b`\u0018�� \n2\u00020\u0001:\u0001\nJ\u0011\u0010\u0006\u001a\u00020\u0007H¦@ø\u0001��¢\u0006\u0002\u0010\bJ\u0011\u0010\t\u001a\u00020\u0007H¦@ø\u0001��¢\u0006\u0002\u0010\bR\u0012\u0010\u0002\u001a\u00020\u0003X¦\u0004¢\u0006\u0006\u001a\u0004\b\u0004\u0010\u0005\u0082\u0002\u0004\n\u0002\b\u0019¨\u0006\u000b"}, d2 = {"Lch/sourcemotion/vertx/kinesis/consumer/orchestra/consumer/fetching/Fetcher;", "", "streamReader", "Lch/sourcemotion/vertx/kinesis/consumer/orchestra/consumer/fetching/RecordBatchStreamReader;", "getStreamReader", "()Lch/sourcemotion/vertx/kinesis/consumer/orchestra/consumer/fetching/RecordBatchStreamReader;", "start", "", "(Lkotlin/coroutines/Continuation;)Ljava/lang/Object;", "stop", "Companion", "vertx-kinesis-consumer-orchestra"})
/* loaded from: input_file:ch/sourcemotion/vertx/kinesis/consumer/orchestra/consumer/fetching/Fetcher.class */
public interface Fetcher {

    @NotNull
    public static final Companion Companion = Companion.$$INSTANCE;

    /* compiled from: Fetcher.kt */
    @Metadata(mv = {1, 4, 2}, bv = {1, VertxKinesisOrchestraOptions.DEFAULT_FETCHER_METRICS_ENABLED, 3}, k = 1, d1 = {"��L\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000e\n��\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002J\u0018\u0010\u0003\u001a\u00020\u00042\u0006\u0010\u0005\u001a\u00020\u00062\u0006\u0010\u0007\u001a\u00020\u0004H\u0002J>\u0010\b\u001a\u00020\t2\u0006\u0010\u0005\u001a\u00020\u00062\u0006\u0010\n\u001a\u00020\u000b2\u0006\u0010\f\u001a\u00020\r2\u0006\u0010\u000e\u001a\u00020\u000f2\u0006\u0010\u0010\u001a\u00020\u00112\u0006\u0010\u0012\u001a\u00020\u00132\u0006\u0010\u0014\u001a\u00020\u0004J\u001e\u0010\u0015\u001a\u0004\u0018\u00010\u0016*\u00020\u000b2\u0006\u0010\u0017\u001a\u00020\u00182\u0006\u0010\u0012\u001a\u00020\u0013H\u0002¨\u0006\u0019"}, d2 = {"Lch/sourcemotion/vertx/kinesis/consumer/orchestra/consumer/fetching/Fetcher$Companion;", "Lmu/KLogging;", "()V", "getFinalClient", "Lsoftware/amazon/awssdk/services/kinesis/KinesisAsyncClient;", "vertx", "Lio/vertx/core/Vertx;", "defaultClient", "of", "Lch/sourcemotion/vertx/kinesis/consumer/orchestra/consumer/fetching/Fetcher;", "fetcherOptions", "Lch/sourcemotion/vertx/kinesis/consumer/orchestra/FetcherOptions;", "clusterName", "Lch/sourcemotion/vertx/kinesis/consumer/orchestra/impl/OrchestraClusterName;", "startFetchPosition", "Lch/sourcemotion/vertx/kinesis/consumer/orchestra/consumer/FetchPosition;", "scope", "Lkotlinx/coroutines/CoroutineScope;", "shardId", "Lch/sourcemotion/vertx/kinesis/consumer/orchestra/impl/ShardId;", "vertxClient", "metricsCounterOf", "Lio/micrometer/core/instrument/Counter;", "streamName", "", "vertx-kinesis-consumer-orchestra"})
    /* loaded from: input_file:ch/sourcemotion/vertx/kinesis/consumer/orchestra/consumer/fetching/Fetcher$Companion.class */
    public static final class Companion extends KLogging {
        static final /* synthetic */ Companion $$INSTANCE = new Companion();

        @NotNull
        public final Fetcher of(@NotNull Vertx vertx, @NotNull FetcherOptions fetcherOptions, @NotNull OrchestraClusterName orchestraClusterName, @NotNull FetchPosition fetchPosition, @NotNull CoroutineScope coroutineScope, @NotNull ShardId shardId, @NotNull KinesisAsyncClient kinesisAsyncClient) {
            Intrinsics.checkNotNullParameter(vertx, "vertx");
            Intrinsics.checkNotNullParameter(fetcherOptions, "fetcherOptions");
            Intrinsics.checkNotNullParameter(orchestraClusterName, "clusterName");
            Intrinsics.checkNotNullParameter(fetchPosition, "startFetchPosition");
            Intrinsics.checkNotNullParameter(coroutineScope, "scope");
            Intrinsics.checkNotNullParameter(shardId, "shardId");
            Intrinsics.checkNotNullParameter(kinesisAsyncClient, "vertxClient");
            KinesisAsyncClient finalClient = getFinalClient(vertx, kinesisAsyncClient);
            EnhancedFanOutOptions enhancedFanOut = fetcherOptions.getEnhancedFanOut();
            return enhancedFanOut != null ? new EnhancedFanoutFetcher(fetcherOptions, enhancedFanOut, orchestraClusterName, fetchPosition.getSequenceNumber(), coroutineScope, shardId, finalClient, metricsCounterOf(fetcherOptions, orchestraClusterName.getStreamName(), shardId)) : new DynamicRecordFetcher(fetcherOptions, fetchPosition, coroutineScope, orchestraClusterName.getStreamName(), shardId, finalClient);
        }

        private final KinesisAsyncClient getFinalClient(Vertx vertx, KinesisAsyncClient kinesisAsyncClient) {
            Object obj;
            Object obj2;
            Companion companion;
            Shareable shareable;
            try {
                Result.Companion companion2 = Result.Companion;
                companion = this;
                shareable = (Shareable) SharedData.INSTANCE.getLocalSharedMap(vertx).get(NettyKinesisAsyncClientFactory.SHARED_DATA_REF);
            } catch (Throwable th) {
                Result.Companion companion3 = Result.Companion;
                obj = Result.constructor-impl(ResultKt.createFailure(th));
            }
            if (shareable == null) {
                throw new VertxKinesisConsumerOrchestraException("No shared instance of " + NettyKinesisAsyncClientFactory.class.getName() + " under reference: \"" + NettyKinesisAsyncClientFactory.SHARED_DATA_REF + "\" found", null, 2, null);
            }
            Intrinsics.checkNotNullExpressionValue(shareable, "getLocalSharedMap<T>(ver…ce\\\" found\"\n            )");
            Context orCreateContext = vertx.getOrCreateContext();
            Intrinsics.checkNotNullExpressionValue(orCreateContext, "vertx.orCreateContext");
            KinesisAsyncClient createKinesisAsyncClient = ((NettyKinesisAsyncClientFactory) shareable).createKinesisAsyncClient(orCreateContext);
            companion.getLogger().info(new Function0<Object>() { // from class: ch.sourcemotion.vertx.kinesis.consumer.orchestra.consumer.fetching.Fetcher$Companion$getFinalClient$1$1$1
                @Nullable
                public final Object invoke() {
                    return "Using AWS Netty Kinesis client to fetch records";
                }
            });
            obj = Result.constructor-impl(createKinesisAsyncClient);
            Object obj3 = obj;
            if (Result.exceptionOrNull-impl(obj3) == null) {
                obj2 = obj3;
            } else {
                getLogger().info(new Function0<Object>() { // from class: ch.sourcemotion.vertx.kinesis.consumer.orchestra.consumer.fetching.Fetcher$Companion$getFinalClient$2$1$1
                    @Nullable
                    public final Object invoke() {
                        return "Using Vert.x Kinesis client to fetch records";
                    }
                });
                obj2 = kinesisAsyncClient;
            }
            return (KinesisAsyncClient) obj2;
        }

        private final Counter metricsCounterOf(FetcherOptions fetcherOptions, String str, ShardId shardId) {
            if (!fetcherOptions.getMetricsEnabled() || fetcherOptions.getMetricName() == null) {
                return null;
            }
            return (fetcherOptions.getMetricRegistryName() != null ? BackendRegistries.getNow(fetcherOptions.getMetricRegistryName()) : BackendRegistries.getDefaultNow()).counter(fetcherOptions.getMetricName(), Tags.of(new String[]{"stream", str, "shard", String.valueOf(shardId)}));
        }

        private Companion() {
        }
    }

    @NotNull
    RecordBatchStreamReader getStreamReader();

    @Nullable
    Object start(@NotNull Continuation<? super Unit> continuation);

    @Nullable
    Object stop(@NotNull Continuation<? super Unit> continuation);
}
