package dev.naoh.lettucef.streams;

import cats.effect.kernel.Async;
import cats.effect.kernel.GenConcurrent;
import cats.effect.kernel.Resource;
import cats.effect.package$;
import cats.syntax.FlatMapOps$;
import cats.syntax.package$flatMap$;
import cats.syntax.package$functor$;
import fs2.Compiler$;
import fs2.Compiler$Target$;
import fs2.Stream;
import fs2.concurrent.SignallingRef;
import scala.Function1;
import scala.Tuple2;
import scala.collection.immutable.Map;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Set;
import scala.runtime.BoxesRunTime;

/* compiled from: RedisAutoSubscriber.scala */
/* loaded from: input_file:dev/naoh/lettucef/streams/SubscribeStreamHelper$.class */
public final class SubscribeStreamHelper$ {
    public static final SubscribeStreamHelper$ MODULE$ = new SubscribeStreamHelper$();

    public <F, K, O> Resource<F, Stream<F, O>> resource(Set<K> set, SignallingRef<F, Map<K, Tuple2<Object, Object>>> signallingRef, Function1<Seq<K>, F> function1, Function1<Seq<K>, F> function12, Resource<F, Stream<F, O>> resource, Async<F> async) {
        return package$.MODULE$.Resource().eval(package$.MODULE$.Async().apply(async).memoize(package$functor$.MODULE$.toFunctorOps(update1$1(RedisAutoSubscriber$State$.MODULE$.unsubscribe1(set), function12, signallingRef, async), async).void())).flatMap(obj -> {
            return package$.MODULE$.Resource().make(FlatMapOps$.MODULE$.$greater$greater$extension(package$flatMap$.MODULE$.catsSyntaxFlatMapOps(update1$1(RedisAutoSubscriber$State$.MODULE$.subscribe1(set), function1, signallingRef, async), async), () -> {
                return MODULE$.await(set, signallingRef, async);
            }, async), boxedUnit -> {
                return obj;
            }, async).flatMap(boxedUnit2 -> {
                return resource.map(stream -> {
                    return stream.onFinalize(obj, async);
                });
            });
        });
    }

    public <F, K> F await(Set<K> set, SignallingRef<F, Map<K, Tuple2<Object, Object>>> signallingRef, GenConcurrent<F, Throwable> genConcurrent) {
        Stream discrete = signallingRef.discrete();
        return (F) discrete.takeWhile(map -> {
            return BoxesRunTime.boxToBoolean($anonfun$await$1(set, map));
        }, discrete.takeWhile$default$2()).compile(Compiler$.MODULE$.target(Compiler$Target$.MODULE$.forConcurrent(genConcurrent))).drain();
    }

    private static final Object update1$1(Function1 function1, Function1 function12, SignallingRef signallingRef, Async async) {
        return package$flatMap$.MODULE$.toFlatMapOps(signallingRef.modify(function1), async).flatMap(seq -> {
            return seq.isEmpty() ? package$.MODULE$.Async().apply(async).unit() : function12.apply(seq);
        });
    }

    public static final /* synthetic */ boolean $anonfun$await$3(Tuple2 tuple2) {
        return tuple2._1$mcI$sp() == RedisAutoSubscriber$State$.MODULE$.Subscribed();
    }

    public static final /* synthetic */ boolean $anonfun$await$2(Map map, Object obj) {
        return map.get(obj).exists(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$await$3(tuple2));
        });
    }

    public static final /* synthetic */ boolean $anonfun$await$1(Set set, Map map) {
        return !set.forall(obj -> {
            return BoxesRunTime.boxToBoolean($anonfun$await$2(map, obj));
        });
    }

    private SubscribeStreamHelper$() {
    }
}
