package pl.allegro.tech.mongomigrationstream.infrastructure.detector;

import com.mongodb.client.MongoDatabase;
import io.github.oshai.kotlinlogging.KLogger;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import kotlin.Metadata;
import kotlin.NoWhenBranchMatchedException;
import kotlin.Result;
import kotlin.ResultKt;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.SourceDebugExtension;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import pl.allegro.tech.mongomigrationstream.configuration.ApplicationProperties;
import pl.allegro.tech.mongomigrationstream.configuration.GeneralProperties;
import pl.allegro.tech.mongomigrationstream.core.concurrency.MigrationExecutors;
import pl.allegro.tech.mongomigrationstream.core.detector.CollectionCountSynchronizationDetector;
import pl.allegro.tech.mongomigrationstream.core.detector.DetectionResult;
import pl.allegro.tech.mongomigrationstream.core.detector.HashSynchronizationDetector;
import pl.allegro.tech.mongomigrationstream.core.detector.QueueSizeSynchronizationDetector;
import pl.allegro.tech.mongomigrationstream.core.detector.SynchronizationDetector;
import pl.allegro.tech.mongomigrationstream.core.detector.handler.DetectionResultHandler;
import pl.allegro.tech.mongomigrationstream.core.mongo.SourceToDestination;
import pl.allegro.tech.mongomigrationstream.core.queue.EventQueue;
import pl.allegro.tech.mongomigrationstream.core.synchronization.ChangeEvent;
import pl.allegro.tech.mongomigrationstream.infrastructure.mongo.MongoDbClients;

/* compiled from: SynchronizationDetectorFactory.kt */
@Metadata(mv = {1, 8, 0}, k = 1, xi = 48, d1 = {"��\\\n\u0002\u0018\u0002\n\u0002\u0010��\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010$\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\"\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0004\b��\u0018��2\u00020\u0001B7\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0018\u0010\u0006\u001a\u0014\u0012\u0004\u0012\u00020\b\u0012\n\u0012\b\u0012\u0004\u0012\u00020\n0\t0\u0007\u0012\u0006\u0010\u000b\u001a\u00020\f¢\u0006\u0002\u0010\rJ\u0016\u0010\u0014\u001a\b\u0012\u0004\u0012\u00020\u00160\u00152\u0006\u0010\u0002\u001a\u00020\u0003H\u0002J$\u0010\u0017\u001a\u00020\u00182\f\u0010\u0019\u001a\b\u0012\u0004\u0012\u00020\u00160\u00152\f\u0010\u001a\u001a\b\u0012\u0004\u0012\u00020\u001b0\u0015H\u0002J\b\u0010\u001c\u001a\u00020\u0018H\u0002J\u0006\u0010\u001d\u001a\u00020\u0018J\u0006\u0010\u001e\u001a\u00020\u0018R\u000e\u0010\u000e\u001a\u00020\u000fX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u000b\u001a\u00020\fX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R \u0010\u0006\u001a\u0014\u0012\u0004\u0012\u00020\b\u0012\n\u0012\b\u0012\u0004\u0012\u00020\n0\t0\u0007X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0010\u001a\u00020\u0011X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0012\u001a\u00020\u0013X\u0082\u0004¢\u0006\u0002\n��¨\u0006\u001f"}, d2 = {"Lpl/allegro/tech/mongomigrationstream/infrastructure/detector/SynchronizationDetectorFactory;", "", "properties", "Lpl/allegro/tech/mongomigrationstream/configuration/ApplicationProperties;", "mongoDbClients", "Lpl/allegro/tech/mongomigrationstream/infrastructure/mongo/MongoDbClients;", "queues", "", "Lpl/allegro/tech/mongomigrationstream/core/mongo/SourceToDestination;", "Lpl/allegro/tech/mongomigrationstream/core/queue/EventQueue;", "Lpl/allegro/tech/mongomigrationstream/core/synchronization/ChangeEvent;", "meterRegistry", "Lio/micrometer/core/instrument/MeterRegistry;", "(Lpl/allegro/tech/mongomigrationstream/configuration/ApplicationProperties;Lpl/allegro/tech/mongomigrationstream/infrastructure/mongo/MongoDbClients;Ljava/util/Map;Lio/micrometer/core/instrument/MeterRegistry;)V", "finishedTransfers", "Ljava/util/concurrent/atomic/AtomicInteger;", "scheduledSynchronizationDetectorExecutor", "Ljava/util/concurrent/ScheduledExecutorService;", "synchronizationDetectorExecutor", "Ljava/util/concurrent/ExecutorService;", "createDetectors", "", "Lpl/allegro/tech/mongomigrationstream/core/detector/SynchronizationDetector;", "detectAndHandleSynchronization", "", "detectors", "handlers", "Lpl/allegro/tech/mongomigrationstream/core/detector/handler/DetectionResultHandler;", "startDetectingSynchronization", "stopDetectingSynchronization", "tryToStartDetectingSynchronization", "mongo-migration-stream-core"})
@SourceDebugExtension({"SMAP\nSynchronizationDetectorFactory.kt\nKotlin\n*S Kotlin\n*F\n+ 1 SynchronizationDetectorFactory.kt\npl/allegro/tech/mongomigrationstream/infrastructure/detector/SynchronizationDetectorFactory\n+ 2 _Collections.kt\nkotlin/collections/CollectionsKt___CollectionsKt\n+ 3 fake.kt\nkotlin/jvm/internal/FakeKt\n*L\n1#1,116:1\n1549#2:117\n1620#2,3:118\n1360#2:121\n1446#2,5:122\n1855#2:128\n1855#2,2:129\n1856#2:131\n1549#2:132\n1620#2,3:133\n1#3:127\n*S KotlinDebug\n*F\n+ 1 SynchronizationDetectorFactory.kt\npl/allegro/tech/mongomigrationstream/infrastructure/detector/SynchronizationDetectorFactory\n*L\n66#1:117\n66#1:118,3\n67#1:121\n67#1:122,5\n70#1:128\n71#1:129,2\n70#1:131\n96#1:132\n96#1:133,3\n*E\n"})
/* loaded from: input_file:pl/allegro/tech/mongomigrationstream/infrastructure/detector/SynchronizationDetectorFactory.class */
public final class SynchronizationDetectorFactory {

    @NotNull
    private final ApplicationProperties properties;

    @NotNull
    private final MongoDbClients mongoDbClients;

    @NotNull
    private final Map<SourceToDestination, EventQueue<ChangeEvent>> queues;

    @NotNull
    private final MeterRegistry meterRegistry;

    @NotNull
    private final AtomicInteger finishedTransfers;

    @NotNull
    private final ExecutorService synchronizationDetectorExecutor;

    @NotNull
    private final ScheduledExecutorService scheduledSynchronizationDetectorExecutor;

    /* JADX WARN: Multi-variable type inference failed */
    public SynchronizationDetectorFactory(@NotNull ApplicationProperties applicationProperties, @NotNull MongoDbClients mongoDbClients, @NotNull Map<SourceToDestination, ? extends EventQueue<ChangeEvent>> map, @NotNull MeterRegistry meterRegistry) {
        Intrinsics.checkNotNullParameter(applicationProperties, "properties");
        Intrinsics.checkNotNullParameter(mongoDbClients, "mongoDbClients");
        Intrinsics.checkNotNullParameter(map, "queues");
        Intrinsics.checkNotNullParameter(meterRegistry, "meterRegistry");
        this.properties = applicationProperties;
        this.mongoDbClients = mongoDbClients;
        this.queues = map;
        this.meterRegistry = meterRegistry;
        this.finishedTransfers = new AtomicInteger(0);
        this.synchronizationDetectorExecutor = MigrationExecutors.INSTANCE.createSynchronizationDetectorExecutor();
        this.scheduledSynchronizationDetectorExecutor = MigrationExecutors.INSTANCE.createScheduledSynchronizationDetectorExecutor();
    }

    public final void tryToStartDetectingSynchronization() {
        this.finishedTransfers.getAndUpdate((v1) -> {
            return tryToStartDetectingSynchronization$lambda$0(r1, v1);
        });
    }

    private final void startDetectingSynchronization() {
        KLogger kLogger;
        kLogger = SynchronizationDetectorFactoryKt.logger;
        kLogger.info(new Function0<Object>() { // from class: pl.allegro.tech.mongomigrationstream.infrastructure.detector.SynchronizationDetectorFactory$startDetectingSynchronization$1
            @Nullable
            public final Object invoke() {
                return "Starting detecting synchronization...";
            }
        });
        Set<SynchronizationDetector> createDetectors = createDetectors(this.properties);
        Set<DetectionResultHandler> synchronizationHandlers = this.properties.getGeneralProperties().getSynchronizationHandlers();
        this.scheduledSynchronizationDetectorExecutor.scheduleAtFixedRate(() -> {
            startDetectingSynchronization$lambda$1(r1, r2, r3);
        }, 0L, 10L, TimeUnit.SECONDS);
    }

    private final void detectAndHandleSynchronization(Set<? extends SynchronizationDetector> set, Set<? extends DetectionResultHandler> set2) {
        Object obj;
        Object obj2;
        KLogger kLogger;
        KLogger kLogger2;
        try {
            Result.Companion companion = Result.Companion;
            SynchronizationDetectorFactory synchronizationDetectorFactory = this;
            Set<? extends SynchronizationDetector> set3 = set;
            ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(set3, 10));
            for (SynchronizationDetector synchronizationDetector : set3) {
                arrayList.add(CompletableFuture.supplyAsync(() -> {
                    return detectAndHandleSynchronization$lambda$5$lambda$3$lambda$2(r0);
                }, synchronizationDetectorFactory.synchronizationDetectorExecutor));
            }
            ArrayList arrayList2 = arrayList;
            ArrayList arrayList3 = new ArrayList();
            Iterator it = arrayList2.iterator();
            while (it.hasNext()) {
                Object join = ((CompletableFuture) it.next()).join();
                Intrinsics.checkNotNullExpressionValue(join, "it.join()");
                CollectionsKt.addAll(arrayList3, (Set) join);
            }
            obj = Result.constructor-impl(arrayList3);
        } catch (Throwable th) {
            Result.Companion companion2 = Result.Companion;
            obj = Result.constructor-impl(ResultKt.createFailure(th));
        }
        Object obj3 = obj;
        Throwable th2 = Result.exceptionOrNull-impl(obj3);
        if (th2 != null) {
            kLogger2 = SynchronizationDetectorFactoryKt.logger;
            kLogger2.warn(th2, new Function0<Object>() { // from class: pl.allegro.tech.mongomigrationstream.infrastructure.detector.SynchronizationDetectorFactory$detectAndHandleSynchronization$2$1
                @Nullable
                public final Object invoke() {
                    return "Cannot detect synchronization";
                }
            });
        }
        if (Result.isSuccess-impl(obj3)) {
            Result.Companion companion3 = Result.Companion;
            List list = (List) obj3;
            for (DetectionResultHandler detectionResultHandler : set2) {
                Iterator it2 = list.iterator();
                while (it2.hasNext()) {
                    detectionResultHandler.handle((DetectionResult) it2.next());
                }
            }
            obj2 = Result.constructor-impl(Unit.INSTANCE);
        } else {
            obj2 = Result.constructor-impl(obj3);
        }
        Throwable th3 = Result.exceptionOrNull-impl(obj2);
        if (th3 != null) {
            kLogger = SynchronizationDetectorFactoryKt.logger;
            kLogger.warn(th3, new Function0<Object>() { // from class: pl.allegro.tech.mongomigrationstream.infrastructure.detector.SynchronizationDetectorFactory$detectAndHandleSynchronization$4$1
                @Nullable
                public final Object invoke() {
                    return "Cannot handle synchronization";
                }
            });
        }
    }

    private final Set<SynchronizationDetector> createDetectors(ApplicationProperties applicationProperties) {
        SynchronizationDetector synchronizationDetector;
        MongoDatabase sourceDatabase = this.mongoDbClients.getSourceDatabase();
        MongoDatabase destinationDatabase = this.mongoDbClients.getDestinationDatabase();
        Set<SourceToDestination> sourceToDestinationMapping = applicationProperties.getSourceToDestinationMapping();
        SynchronizationDetector hashSynchronizationDetector = new HashSynchronizationDetector(sourceDatabase, destinationDatabase, sourceToDestinationMapping, this.synchronizationDetectorExecutor);
        SynchronizationDetector queueSizeSynchronizationDetector = new QueueSizeSynchronizationDetector(sourceToDestinationMapping, this.queues, this.synchronizationDetectorExecutor, this.meterRegistry);
        SynchronizationDetector collectionCountSynchronizationDetector = new CollectionCountSynchronizationDetector(sourceDatabase, destinationDatabase, sourceToDestinationMapping, this.synchronizationDetectorExecutor, this.meterRegistry);
        Set<GeneralProperties.SynchronizationDetectorType> synchronizationDetectors = applicationProperties.getGeneralProperties().getSynchronizationDetectors();
        ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(synchronizationDetectors, 10));
        for (GeneralProperties.SynchronizationDetectorType synchronizationDetectorType : synchronizationDetectors) {
            if (Intrinsics.areEqual(synchronizationDetectorType, GeneralProperties.DbHashSynchronizationDetectorType.INSTANCE)) {
                synchronizationDetector = hashSynchronizationDetector;
            } else if (Intrinsics.areEqual(synchronizationDetectorType, GeneralProperties.CollectionCountSynchronizationDetectorType.INSTANCE)) {
                synchronizationDetector = collectionCountSynchronizationDetector;
            } else {
                if (!Intrinsics.areEqual(synchronizationDetectorType, GeneralProperties.QueueSizeSynchronizationDetectorType.INSTANCE)) {
                    throw new NoWhenBranchMatchedException();
                }
                synchronizationDetector = queueSizeSynchronizationDetector;
            }
            arrayList.add(synchronizationDetector);
        }
        return CollectionsKt.toSet(arrayList);
    }

    public final void stopDetectingSynchronization() {
        KLogger kLogger;
        KLogger kLogger2;
        KLogger kLogger3;
        KLogger kLogger4;
        try {
            try {
                this.scheduledSynchronizationDetectorExecutor.shutdown();
                this.synchronizationDetectorExecutor.shutdown();
                kLogger4 = SynchronizationDetectorFactoryKt.logger;
                kLogger4.info(new Function0<Object>() { // from class: pl.allegro.tech.mongomigrationstream.infrastructure.detector.SynchronizationDetectorFactory$stopDetectingSynchronization$2
                    @Nullable
                    public final Object invoke() {
                        return "Shut down SynchronizationDetectorFactory";
                    }
                });
            } catch (Throwable th) {
                kLogger = SynchronizationDetectorFactoryKt.logger;
                kLogger.warn(th, new Function0<Object>() { // from class: pl.allegro.tech.mongomigrationstream.infrastructure.detector.SynchronizationDetectorFactory$stopDetectingSynchronization$1
                    @Nullable
                    public final Object invoke() {
                        return "Exception while shutting down SynchronizationDetectorFactory";
                    }
                });
                kLogger2 = SynchronizationDetectorFactoryKt.logger;
                kLogger2.info(new Function0<Object>() { // from class: pl.allegro.tech.mongomigrationstream.infrastructure.detector.SynchronizationDetectorFactory$stopDetectingSynchronization$2
                    @Nullable
                    public final Object invoke() {
                        return "Shut down SynchronizationDetectorFactory";
                    }
                });
            }
        } catch (Throwable th2) {
            kLogger3 = SynchronizationDetectorFactoryKt.logger;
            kLogger3.info(new Function0<Object>() { // from class: pl.allegro.tech.mongomigrationstream.infrastructure.detector.SynchronizationDetectorFactory$stopDetectingSynchronization$2
                @Nullable
                public final Object invoke() {
                    return "Shut down SynchronizationDetectorFactory";
                }
            });
            throw th2;
        }
    }

    private static final int tryToStartDetectingSynchronization$lambda$0(SynchronizationDetectorFactory synchronizationDetectorFactory, int i) {
        Intrinsics.checkNotNullParameter(synchronizationDetectorFactory, "this$0");
        if (i + 1 == synchronizationDetectorFactory.properties.getSourceToDestinationMapping().size()) {
            synchronizationDetectorFactory.startDetectingSynchronization();
        }
        return i + 1;
    }

    private static final void startDetectingSynchronization$lambda$1(SynchronizationDetectorFactory synchronizationDetectorFactory, Set set, Set set2) {
        Intrinsics.checkNotNullParameter(synchronizationDetectorFactory, "this$0");
        Intrinsics.checkNotNullParameter(set, "$detectors");
        Intrinsics.checkNotNullParameter(set2, "$handlers");
        synchronizationDetectorFactory.detectAndHandleSynchronization(set, set2);
    }

    private static final Set detectAndHandleSynchronization$lambda$5$lambda$3$lambda$2(SynchronizationDetector synchronizationDetector) {
        Intrinsics.checkNotNullParameter(synchronizationDetector, "$it");
        return synchronizationDetector.detect();
    }
}
