package fs2.concurrent;

import cats.UnorderedFoldable$;
import cats.effect.kernel.GenConcurrent;
import cats.effect.kernel.Ref;
import cats.effect.kernel.Resource;
import cats.effect.kernel.Resource$;
import cats.effect.std.Queue;
import cats.effect.std.Queue$;
import cats.syntax.FlatMapOps$;
import cats.syntax.FlattenOps$;
import cats.syntax.MonadOps$;
import cats.syntax.package$all$;
import fs2.Stream;
import fs2.Stream$;
import scala.$less$colon$less$;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.immutable.LongMap;
import scala.collection.immutable.LongMap$;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;

/* compiled from: Topic.scala */
/* loaded from: input_file:fs2/concurrent/Topic$.class */
public final class Topic$ {
    public static final Topic$ MODULE$ = new Topic$();

    public <F, A> F apply(GenConcurrent<F, Throwable> genConcurrent) {
        return (F) package$all$.MODULE$.toFunctorOps(package$all$.MODULE$.catsSyntaxSemigroupal(genConcurrent.ref(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(LongMap$.MODULE$.empty()), BoxesRunTime.boxToLong(1L))), genConcurrent).product(SignallingRef$.MODULE$.apply(BoxesRunTime.boxToInteger(0), genConcurrent)), genConcurrent).map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            final Ref ref = (Ref) tuple2._1();
            final SignallingRef signallingRef = (SignallingRef) tuple2._2();
            return new Topic<F, A>(ref, genConcurrent, signallingRef) { // from class: fs2.concurrent.Topic$$anon$2
                private final Ref state$1;
                private final GenConcurrent F$1;
                private final SignallingRef subscriberCount$1;

                @Override // fs2.concurrent.Topic
                public F publish1(A a) {
                    return (F) package$all$.MODULE$.toFlatMapOps(this.state$1.get(), this.F$1).flatMap(tuple2 -> {
                        if (tuple2 != null) {
                            return ((LongMap) tuple2._1()).foldLeft(this.F$1.unit(), (obj, tuple2) -> {
                                Tuple2 tuple2 = new Tuple2(obj, tuple2);
                                if (tuple2 != null) {
                                    Object _1 = tuple2._1();
                                    Tuple2 tuple22 = (Tuple2) tuple2._2();
                                    if (tuple22 != null) {
                                        Queue queue = (Queue) tuple22._2();
                                        return FlatMapOps$.MODULE$.$greater$greater$extension(package$all$.MODULE$.catsSyntaxFlatMapOps(_1, this.F$1), () -> {
                                            return queue.offer(a);
                                        }, this.F$1);
                                    }
                                }
                                throw new MatchError(tuple2);
                            });
                        }
                        throw new MatchError(tuple2);
                    });
                }

                @Override // fs2.concurrent.Topic
                public Resource<F, Stream<F, A>> subscribeAwait(int i) {
                    return cats.effect.package$.MODULE$.Resource().eval(Queue$.MODULE$.bounded(i, this.F$1)).flatMap(queue -> {
                        return (Resource) package$all$.MODULE$.toFunctorOps(cats.effect.package$.MODULE$.Resource().make(package$all$.MODULE$.catsSyntaxApply(this.state$1.modify(tuple2 -> {
                            if (tuple2 == null) {
                                throw new MatchError(tuple2);
                            }
                            LongMap longMap = (LongMap) tuple2._1();
                            long _2$mcJ$sp = tuple2._2$mcJ$sp();
                            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new Tuple2(longMap.updated(_2$mcJ$sp, queue), BoxesRunTime.boxToLong(_2$mcJ$sp + 1))), BoxesRunTime.boxToLong(_2$mcJ$sp));
                        }), this.F$1).$less$times(this.subscriberCount$1.update(i2 -> {
                            return i2 + 1;
                        })), obj -> {
                            return this.unsubscribe$1(BoxesRunTime.unboxToLong(obj));
                        }, this.F$1), Resource$.MODULE$.catsEffectConcurrentForResource(this.F$1)).as(Stream$.MODULE$.fromQueueUnterminated(queue, Stream$.MODULE$.fromQueueUnterminated$default$2(), this.F$1));
                    });
                }

                @Override // fs2.concurrent.Topic
                public Function1<Stream<F, A>, Stream<F, Nothing$>> publish() {
                    return stream -> {
                        return stream.evalMap(obj -> {
                            return this.publish1(obj);
                        }).drain();
                    };
                }

                @Override // fs2.concurrent.Topic
                public Stream<F, A> subscribe(int i) {
                    return Stream$.MODULE$.resource(subscribeAwait(i), this.F$1).flatten($less$colon$less$.MODULE$.refl());
                }

                @Override // fs2.concurrent.Topic
                public Stream<F, Object> subscribers() {
                    return this.subscriberCount$1.discrete();
                }

                private final Object drainQueue$1(LongMap longMap, long j) {
                    return package$all$.MODULE$.toFoldableOps(longMap.get(j), UnorderedFoldable$.MODULE$.catsTraverseForOption()).traverse_(queue -> {
                        return MonadOps$.MODULE$.iterateUntil$extension(package$all$.MODULE$.catsSyntaxMonad(queue.tryTake()), option -> {
                            return BoxesRunTime.boxToBoolean(option.isEmpty());
                        }, this.F$1);
                    }, this.F$1);
                }

                /* JADX INFO: Access modifiers changed from: private */
                public final Object unsubscribe$1(long j) {
                    return FlatMapOps$.MODULE$.$greater$greater$extension(package$all$.MODULE$.catsSyntaxFlatMapOps(FlattenOps$.MODULE$.flatten$extension(package$all$.MODULE$.catsSyntaxFlatten(this.state$1.modify(tuple2 -> {
                        if (tuple2 == null) {
                            throw new MatchError(tuple2);
                        }
                        LongMap longMap = (LongMap) tuple2._1();
                        return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new Tuple2(longMap.$minus(BoxesRunTime.boxToLong(j)), BoxesRunTime.boxToLong(tuple2._2$mcJ$sp()))), this.drainQueue$1(longMap, j));
                    }), this.F$1), this.F$1), this.F$1), () -> {
                        return this.subscriberCount$1.update(i -> {
                            return i - 1;
                        });
                    }, this.F$1);
                }

                {
                    this.state$1 = ref;
                    this.F$1 = genConcurrent;
                    this.subscriberCount$1 = signallingRef;
                }
            };
        });
    }

    private Topic$() {
    }
}
