package pl.allegro.tech.mongomigrationstream.core.synchronization;

import com.mongodb.client.model.changestream.ChangeStreamDocument;
import io.github.oshai.kotlinlogging.KLogger;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import kotlin.Metadata;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.internal.Intrinsics;
import org.bson.BsonDocument;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import pl.allegro.tech.mongomigrationstream.core.metrics.MigrationMetrics;
import pl.allegro.tech.mongomigrationstream.core.mongo.DbCollection;
import pl.allegro.tech.mongomigrationstream.core.mongo.SourceToDestination;
import pl.allegro.tech.mongomigrationstream.core.state.StateEvent;
import pl.allegro.tech.mongomigrationstream.core.state.StateInfo;

/* compiled from: ChangeStreamDocumentSubscriber.kt */
@Metadata(mv = {1, 8, 0}, k = 1, xi = 48, d1 = {"��P\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n\u0002\b\u0002\n\u0002\u0010\u0003\n\u0002\b\u0005\b��\u0018��2\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u00030\u00020\u0001B%\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\u0006\u0010\b\u001a\u00020\t\u0012\u0006\u0010\n\u001a\u00020\u000b¢\u0006\u0002\u0010\fJ\b\u0010\u0014\u001a\u00020\u0015H\u0016J\u0010\u0010\u0016\u001a\u00020\u00152\u0006\u0010\u0017\u001a\u00020\u0018H\u0016J\u0016\u0010\u0019\u001a\u00020\u00152\f\u0010\u001a\u001a\b\u0012\u0004\u0012\u00020\u00030\u0002H\u0016J\u0010\u0010\u001b\u001a\u00020\u00152\u0006\u0010\u001c\u001a\u00020\u0013H\u0016R\u0016\u0010\r\u001a\n \u000f*\u0004\u0018\u00010\u000e0\u000eX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0010\u001a\u00020\u0011X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\b\u001a\u00020\tX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0012\u001a\u00020\u0013X\u0082.¢\u0006\u0002\n��¨\u0006\u001d"}, d2 = {"Lpl/allegro/tech/mongomigrationstream/core/synchronization/ChangeStreamDocumentSubscriber;", "Lorg/reactivestreams/Subscriber;", "Lcom/mongodb/client/model/changestream/ChangeStreamDocument;", "Lorg/bson/BsonDocument;", "sourceToDestination", "Lpl/allegro/tech/mongomigrationstream/core/mongo/SourceToDestination;", "stateInfo", "Lpl/allegro/tech/mongomigrationstream/core/state/StateInfo;", "eventConsumer", "Lpl/allegro/tech/mongomigrationstream/core/synchronization/EventConsumer;", "meterRegistry", "Lio/micrometer/core/instrument/MeterRegistry;", "(Lpl/allegro/tech/mongomigrationstream/core/mongo/SourceToDestination;Lpl/allegro/tech/mongomigrationstream/core/state/StateInfo;Lpl/allegro/tech/mongomigrationstream/core/synchronization/EventConsumer;Lio/micrometer/core/instrument/MeterRegistry;)V", "counter", "Lio/micrometer/core/instrument/Counter;", "kotlin.jvm.PlatformType", "dbCollection", "Lpl/allegro/tech/mongomigrationstream/core/mongo/DbCollection;", "subscription", "Lorg/reactivestreams/Subscription;", "onComplete", "", "onError", "cause", "", "onNext", "rawEvent", "onSubscribe", "s", "mongo-migration-stream-core"})
/* loaded from: input_file:pl/allegro/tech/mongomigrationstream/core/synchronization/ChangeStreamDocumentSubscriber.class */
public final class ChangeStreamDocumentSubscriber implements Subscriber<ChangeStreamDocument<BsonDocument>> {

    @NotNull
    private final SourceToDestination sourceToDestination;

    @NotNull
    private final StateInfo stateInfo;

    @NotNull
    private final EventConsumer eventConsumer;
    private Subscription subscription;

    @NotNull
    private final DbCollection dbCollection;
    private final Counter counter;

    public ChangeStreamDocumentSubscriber(@NotNull SourceToDestination sourceToDestination, @NotNull StateInfo stateInfo, @NotNull EventConsumer eventConsumer, @NotNull MeterRegistry meterRegistry) {
        Intrinsics.checkNotNullParameter(sourceToDestination, "sourceToDestination");
        Intrinsics.checkNotNullParameter(stateInfo, "stateInfo");
        Intrinsics.checkNotNullParameter(eventConsumer, "eventConsumer");
        Intrinsics.checkNotNullParameter(meterRegistry, "meterRegistry");
        this.sourceToDestination = sourceToDestination;
        this.stateInfo = stateInfo;
        this.eventConsumer = eventConsumer;
        this.dbCollection = this.sourceToDestination.getSource();
        this.counter = Counter.builder(MigrationMetrics.CHANGE_EVENT_COUNTER).description("Stores how many change event were processed by ChangeStreamDocumentSubscriber from source Mongo change stream").tags(new String[]{"database", this.dbCollection.getDbName(), "collection", this.dbCollection.getCollectionName()}).register(meterRegistry);
    }

    public void onSubscribe(@NotNull Subscription subscription) {
        KLogger kLogger;
        Intrinsics.checkNotNullParameter(subscription, "s");
        kLogger = ChangeStreamDocumentSubscriberKt.logger;
        kLogger.info(new Function0<Object>() { // from class: pl.allegro.tech.mongomigrationstream.core.synchronization.ChangeStreamDocumentSubscriber$onSubscribe$1
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(0);
            }

            @Nullable
            public final Object invoke() {
                DbCollection dbCollection;
                dbCollection = ChangeStreamDocumentSubscriber.this.dbCollection;
                return "Starting subscriber of dbCollection: [" + dbCollection + "]";
            }
        });
        this.subscription = subscription;
        Subscription subscription2 = this.subscription;
        if (subscription2 == null) {
            Intrinsics.throwUninitializedPropertyAccessException("subscription");
            subscription2 = null;
        }
        subscription2.request(Long.MAX_VALUE);
    }

    public void onNext(@NotNull ChangeStreamDocument<BsonDocument> changeStreamDocument) {
        Intrinsics.checkNotNullParameter(changeStreamDocument, "rawEvent");
        this.counter.increment();
        this.eventConsumer.saveEventToLocalQueue$mongo_migration_stream_core(ChangeEvent.Companion.fromMongoChangeStreamDocument(changeStreamDocument));
    }

    public void onError(@NotNull final Throwable th) {
        KLogger kLogger;
        Intrinsics.checkNotNullParameter(th, "cause");
        kLogger = ChangeStreamDocumentSubscriberKt.logger;
        kLogger.error(new Function0<Object>() { // from class: pl.allegro.tech.mongomigrationstream.core.synchronization.ChangeStreamDocumentSubscriber$onError$1
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(0);
            }

            @Nullable
            public final Object invoke() {
                return th.getMessage();
            }
        });
        this.stateInfo.notifyStateChange$mongo_migration_stream_core(new StateEvent.FailedEvent(this.sourceToDestination, th));
        onComplete();
    }

    public void onComplete() {
        KLogger kLogger;
        kLogger = ChangeStreamDocumentSubscriberKt.logger;
        kLogger.info(new Function0<Object>() { // from class: pl.allegro.tech.mongomigrationstream.core.synchronization.ChangeStreamDocumentSubscriber$onComplete$1
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(0);
            }

            @Nullable
            public final Object invoke() {
                DbCollection dbCollection;
                dbCollection = ChangeStreamDocumentSubscriber.this.dbCollection;
                return "Cancelling subscriber of dbCollection: [" + dbCollection + "]";
            }
        });
        Subscription subscription = this.subscription;
        if (subscription == null) {
            Intrinsics.throwUninitializedPropertyAccessException("subscription");
            subscription = null;
        }
        subscription.cancel();
    }
}
