package dev.naoh.lettucef.streams;

import cats.effect.kernel.Async;
import cats.effect.kernel.Resource;
import cats.effect.package$;
import cats.effect.std.Dispatcher;
import cats.syntax.package$flatMap$;
import cats.syntax.package$functor$;
import dev.naoh.lettucef.api.models.pubsub.RedisPushed;
import dev.naoh.lettucef.core.RedisPubSubF;
import fs2.Stream;
import fs2.Stream$;
import fs2.concurrent.Channel;
import fs2.concurrent.Channel$;
import fs2.concurrent.SignallingRef;
import io.lettuce.core.pubsub.RedisPubSubListener;
import scala.$less$colon$less$;
import scala.Function1;
import scala.MatchError;
import scala.Tuple2;
import scala.collection.immutable.Map;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Set;
import scala.runtime.BoxesRunTime;

/* compiled from: ManagedPubSubF.scala */
/* loaded from: input_file:dev/naoh/lettucef/streams/ManagedPubSubF.class */
public class ManagedPubSubF<F, K, V> {
    private final RedisPubSubF<F, K, V> underlying;
    public final Dispatcher<F> dev$naoh$lettucef$streams$ManagedPubSubF$$dispatcher;
    public final SignallingRef<F, Map<K, Tuple2<Object, Object>>> dev$naoh$lettucef$streams$ManagedPubSubF$$eState;
    public final SignallingRef<F, Map<K, Tuple2<Object, Object>>> dev$naoh$lettucef$streams$ManagedPubSubF$$pState;
    public final Async<F> dev$naoh$lettucef$streams$ManagedPubSubF$$evidence$1;

    /* compiled from: ManagedPubSubF.scala */
    /* loaded from: input_file:dev/naoh/lettucef/streams/ManagedPubSubF$VoidListener.class */
    public interface VoidListener<K, V> extends RedisPubSubListener<K, V> {
        default void message(K k, V v) {
        }

        default void message(K k, K k2, V v) {
        }

        default void subscribed(K k, long j) {
        }

        default void psubscribed(K k, long j) {
        }

        default void unsubscribed(K k, long j) {
        }

        default void punsubscribed(K k, long j) {
        }
    }

    public static <F, K, V> Resource<F, ManagedPubSubF<F, K, V>> create(Resource<F, RedisPubSubF<F, K, V>> resource, Dispatcher<F> dispatcher, Async<F> async) {
        return ManagedPubSubF$.MODULE$.create(resource, dispatcher, async);
    }

    public ManagedPubSubF(RedisPubSubF<F, K, V> redisPubSubF, Dispatcher<F> dispatcher, SignallingRef<F, Map<K, Tuple2<Object, Object>>> signallingRef, SignallingRef<F, Map<K, Tuple2<Object, Object>>> signallingRef2, Async<F> async) {
        this.underlying = redisPubSubF;
        this.dev$naoh$lettucef$streams$ManagedPubSubF$$dispatcher = dispatcher;
        this.dev$naoh$lettucef$streams$ManagedPubSubF$$eState = signallingRef;
        this.dev$naoh$lettucef$streams$ManagedPubSubF$$pState = signallingRef2;
        this.dev$naoh$lettucef$streams$ManagedPubSubF$$evidence$1 = async;
    }

    public Resource<F, Stream<F, RedisPushed.Message<K, V>>> subscribeAwait(Seq<K> seq) {
        Set<K> set = seq.toSet();
        return ManagedPubSubF$.MODULE$.dev$naoh$lettucef$streams$ManagedPubSubF$$$stream(set, this.dev$naoh$lettucef$streams$ManagedPubSubF$$eState, seq2 -> {
            return dev$naoh$lettucef$streams$ManagedPubSubF$$emitSubscribe(seq2);
        }, seq3 -> {
            return dev$naoh$lettucef$streams$ManagedPubSubF$$emitUnsubscribe(seq3);
        }, startPush(channel -> {
            return ManagedPubSubF$.MODULE$.dev$naoh$lettucef$streams$ManagedPubSubF$$$messageSender(set, channel, this.dev$naoh$lettucef$streams$ManagedPubSubF$$dispatcher);
        }), this.dev$naoh$lettucef$streams$ManagedPubSubF$$evidence$1);
    }

    public Stream<F, RedisPushed.Message<K, V>> subscribe(Seq<K> seq) {
        return Stream$.MODULE$.resource(subscribeAwait(seq), this.dev$naoh$lettucef$streams$ManagedPubSubF$$evidence$1).flatten($less$colon$less$.MODULE$.refl());
    }

    public Resource<F, Stream<F, RedisPushed.PMessage<K, V>>> psubscribeAwait(Seq<K> seq) {
        Set<K> set = seq.toSet();
        return ManagedPubSubF$.MODULE$.dev$naoh$lettucef$streams$ManagedPubSubF$$$stream(set, this.dev$naoh$lettucef$streams$ManagedPubSubF$$pState, seq2 -> {
            return dev$naoh$lettucef$streams$ManagedPubSubF$$emitPSubscribe(seq2);
        }, seq3 -> {
            return dev$naoh$lettucef$streams$ManagedPubSubF$$emitPUnsubscribe(seq3);
        }, startPush(channel -> {
            return ManagedPubSubF$.MODULE$.dev$naoh$lettucef$streams$ManagedPubSubF$$$pmessageSender(set, channel, this.dev$naoh$lettucef$streams$ManagedPubSubF$$dispatcher);
        }), this.dev$naoh$lettucef$streams$ManagedPubSubF$$evidence$1);
    }

    public Stream<F, RedisPushed.PMessage<K, V>> psubscribe(Seq<K> seq) {
        return Stream$.MODULE$.resource(psubscribeAwait(seq), this.dev$naoh$lettucef$streams$ManagedPubSubF$$evidence$1).flatten($less$colon$less$.MODULE$.refl());
    }

    public F addListener(RedisPubSubListener<K, V> redisPubSubListener) {
        return (F) this.underlying.addListener(redisPubSubListener);
    }

    public F removeListener(RedisPubSubListener<K, V> redisPubSubListener) {
        return (F) this.underlying.removeListener(redisPubSubListener);
    }

    public Resource<F, F> setListener(RedisPubSubListener<K, V> redisPubSubListener) {
        return this.underlying.setListener(redisPubSubListener);
    }

    private <O> Resource<F, Stream<F, O>> startPush(Function1<Channel<F, O>, RedisPubSubListener<K, V>> function1) {
        return package$.MODULE$.Resource().make(Channel$.MODULE$.unbounded(this.dev$naoh$lettucef$streams$ManagedPubSubF$$evidence$1), channel -> {
            return package$functor$.MODULE$.toFunctorOps(channel.close(), this.dev$naoh$lettucef$streams$ManagedPubSubF$$evidence$1).void();
        }, this.dev$naoh$lettucef$streams$ManagedPubSubF$$evidence$1).flatMap(channel2 -> {
            return setListener((RedisPubSubListener) function1.apply(channel2)).map(obj -> {
                return channel2.stream().onFinalize(obj, this.dev$naoh$lettucef$streams$ManagedPubSubF$$evidence$1);
            });
        });
    }

    public Resource<F, ManagedPubSubF<F, K, V>> dev$naoh$lettucef$streams$ManagedPubSubF$$init() {
        return package$.MODULE$.Resource().eval(package$.MODULE$.Async().apply(this.dev$naoh$lettucef$streams$ManagedPubSubF$$evidence$1).delay(this::init$$anonfun$1)).flatMap(redisPubSubListener -> {
            return setListener(redisPubSubListener).map(obj -> {
                return this;
            });
        });
    }

    private RedisPubSubListener<K, V> stateHandler() {
        return new VoidListener<K, V>(this) { // from class: dev.naoh.lettucef.streams.ManagedPubSubF$$anon$1
            private final ManagedPubSubF $outer;

            {
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }

            @Override // dev.naoh.lettucef.streams.ManagedPubSubF.VoidListener
            public /* bridge */ /* synthetic */ void message(Object obj, Object obj2) {
                message(obj, obj2);
            }

            @Override // dev.naoh.lettucef.streams.ManagedPubSubF.VoidListener
            public /* bridge */ /* synthetic */ void message(Object obj, Object obj2, Object obj3) {
                message(obj, obj2, obj3);
            }

            @Override // dev.naoh.lettucef.streams.ManagedPubSubF.VoidListener
            public void subscribed(Object obj, long j) {
                this.$outer.dev$naoh$lettucef$streams$ManagedPubSubF$$dispatcher.unsafeRunSync(package$flatMap$.MODULE$.toFlatMapOps(this.$outer.dev$naoh$lettucef$streams$ManagedPubSubF$$eState.modify(ManagedPubSubF$State$.MODULE$.subscribed(obj)), this.$outer.dev$naoh$lettucef$streams$ManagedPubSubF$$evidence$1).flatMap(obj2 -> {
                    return subscribed$$anonfun$1(obj, BoxesRunTime.unboxToBoolean(obj2));
                }));
            }

            @Override // dev.naoh.lettucef.streams.ManagedPubSubF.VoidListener
            public void unsubscribed(Object obj, long j) {
                this.$outer.dev$naoh$lettucef$streams$ManagedPubSubF$$dispatcher.unsafeRunSync(package$flatMap$.MODULE$.toFlatMapOps(this.$outer.dev$naoh$lettucef$streams$ManagedPubSubF$$eState.modify(ManagedPubSubF$State$.MODULE$.unsubscribed(obj)), this.$outer.dev$naoh$lettucef$streams$ManagedPubSubF$$evidence$1).flatMap(obj2 -> {
                    return unsubscribed$$anonfun$1(obj, BoxesRunTime.unboxToBoolean(obj2));
                }));
            }

            @Override // dev.naoh.lettucef.streams.ManagedPubSubF.VoidListener
            public void psubscribed(Object obj, long j) {
                this.$outer.dev$naoh$lettucef$streams$ManagedPubSubF$$dispatcher.unsafeRunSync(package$flatMap$.MODULE$.toFlatMapOps(this.$outer.dev$naoh$lettucef$streams$ManagedPubSubF$$pState.modify(ManagedPubSubF$State$.MODULE$.subscribed(obj)), this.$outer.dev$naoh$lettucef$streams$ManagedPubSubF$$evidence$1).flatMap(obj2 -> {
                    return psubscribed$$anonfun$1(obj, BoxesRunTime.unboxToBoolean(obj2));
                }));
            }

            @Override // dev.naoh.lettucef.streams.ManagedPubSubF.VoidListener
            public void punsubscribed(Object obj, long j) {
                this.$outer.dev$naoh$lettucef$streams$ManagedPubSubF$$dispatcher.unsafeRunSync(package$flatMap$.MODULE$.toFlatMapOps(this.$outer.dev$naoh$lettucef$streams$ManagedPubSubF$$pState.modify(ManagedPubSubF$State$.MODULE$.unsubscribed(obj)), this.$outer.dev$naoh$lettucef$streams$ManagedPubSubF$$evidence$1).flatMap(obj2 -> {
                    return punsubscribed$$anonfun$1(obj, BoxesRunTime.unboxToBoolean(obj2));
                }));
            }

            private final /* synthetic */ Object subscribed$$anonfun$1(Object obj, boolean z) {
                if (true == z) {
                    return package$.MODULE$.Async().apply(this.$outer.dev$naoh$lettucef$streams$ManagedPubSubF$$evidence$1).unit();
                }
                if (false == z) {
                    return this.$outer.dev$naoh$lettucef$streams$ManagedPubSubF$$emitUnsubscribe(scala.package$.MODULE$.Nil().$colon$colon(obj));
                }
                throw new MatchError(BoxesRunTime.boxToBoolean(z));
            }

            private final /* synthetic */ Object unsubscribed$$anonfun$1(Object obj, boolean z) {
                if (true == z) {
                    return package$.MODULE$.Async().apply(this.$outer.dev$naoh$lettucef$streams$ManagedPubSubF$$evidence$1).unit();
                }
                if (false == z) {
                    return this.$outer.dev$naoh$lettucef$streams$ManagedPubSubF$$emitSubscribe(scala.package$.MODULE$.Nil().$colon$colon(obj));
                }
                throw new MatchError(BoxesRunTime.boxToBoolean(z));
            }

            private final /* synthetic */ Object psubscribed$$anonfun$1(Object obj, boolean z) {
                if (true == z) {
                    return package$.MODULE$.Async().apply(this.$outer.dev$naoh$lettucef$streams$ManagedPubSubF$$evidence$1).unit();
                }
                if (false == z) {
                    return this.$outer.dev$naoh$lettucef$streams$ManagedPubSubF$$emitPUnsubscribe(scala.package$.MODULE$.Nil().$colon$colon(obj));
                }
                throw new MatchError(BoxesRunTime.boxToBoolean(z));
            }

            private final /* synthetic */ Object punsubscribed$$anonfun$1(Object obj, boolean z) {
                if (true == z) {
                    return package$.MODULE$.Async().apply(this.$outer.dev$naoh$lettucef$streams$ManagedPubSubF$$evidence$1).unit();
                }
                if (false == z) {
                    return this.$outer.dev$naoh$lettucef$streams$ManagedPubSubF$$emitPSubscribe(scala.package$.MODULE$.Nil().$colon$colon(obj));
                }
                throw new MatchError(BoxesRunTime.boxToBoolean(z));
            }
        };
    }

    public F dev$naoh$lettucef$streams$ManagedPubSubF$$emitSubscribe(Seq<K> seq) {
        return (F) this.underlying.subscribe(seq);
    }

    public F dev$naoh$lettucef$streams$ManagedPubSubF$$emitUnsubscribe(Seq<K> seq) {
        return (F) this.underlying.unsubscribe(seq);
    }

    public F dev$naoh$lettucef$streams$ManagedPubSubF$$emitPSubscribe(Seq<K> seq) {
        return (F) this.underlying.psubscribe(seq);
    }

    public F dev$naoh$lettucef$streams$ManagedPubSubF$$emitPUnsubscribe(Seq<K> seq) {
        return (F) this.underlying.punsubscribe(seq);
    }

    private final RedisPubSubListener init$$anonfun$1() {
        return stateHandler();
    }
}
