package zio.kafka.client;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import scala.Function2;
import scala.MatchError;
import scala.UninitializedFieldError;
import scala.runtime.BoxedUnit;
import scala.runtime.Nothing$;
import zio.Schedule;
import zio.Schedule$;
import zio.ZIO;
import zio.ZManaged;
import zio.clock.Clock;
import zio.kafka.client.diagnostics.Diagnostics;
import zio.kafka.client.diagnostics.Diagnostics$NoOp$;
import zio.kafka.client.internal.ConsumerAccess$;
import zio.kafka.client.internal.Runloop$;
import zio.kafka.client.internal.Runloop$Deps$;
import zio.kafka.client.serde.Deserializer;
import zio.stream.ZSink;
import zio.stream.ZSink$;
import zio.stream.ZStream$;
import zio.stream.ZStreamChunk;

/* compiled from: Consumer.scala */
/* loaded from: input_file:zio/kafka/client/Consumer$.class */
public final class Consumer$ {
    public static Consumer$ MODULE$;
    private final ZSink<Object, Nothing$, Nothing$, Offset, OffsetBatch> offsetBatches;
    private volatile byte bitmap$init$0;

    static {
        new Consumer$();
    }

    public ZSink<Object, Nothing$, Nothing$, Offset, OffsetBatch> offsetBatches() {
        if (((byte) (this.bitmap$init$0 & 1)) == 0) {
            throw new UninitializedFieldError("Uninitialized field: /root/project/src/main/scala/zio/kafka/client/Consumer.scala: 190");
        }
        ZSink<Object, Nothing$, Nothing$, Offset, OffsetBatch> zSink = this.offsetBatches;
        return this.offsetBatches;
    }

    public ZManaged<Clock, Throwable, Consumer> make(ConsumerSettings consumerSettings, Diagnostics diagnostics) {
        return ConsumerAccess$.MODULE$.make(consumerSettings).flatMap(consumerAccess -> {
            return Runloop$Deps$.MODULE$.make(consumerAccess, consumerSettings.pollInterval(), consumerSettings.pollTimeout(), diagnostics, consumerSettings.offsetRetrieval()).flatMap(deps -> {
                return Runloop$.MODULE$.apply(deps).map(runloop -> {
                    return new Consumer(consumerAccess, consumerSettings, runloop);
                });
            });
        });
    }

    public Diagnostics make$default$2() {
        return Diagnostics$NoOp$.MODULE$;
    }

    public <R, R1, K, V> ZIO<R, Throwable, BoxedUnit> consumeWith(ConsumerSettings consumerSettings, Subscription subscription, Deserializer<R1, K> deserializer, Deserializer<R1, V> deserializer2, Schedule<Clock, Object, Object> schedule, Function2<K, V, ZIO<R, Nothing$, BoxedUnit>> function2) {
        return ZStream$.MODULE$.managed(make(consumerSettings, make$default$2())).flatMap(consumer -> {
            return ZStream$.MODULE$.fromEffect(consumer.subscribe(subscription)).flatMap(boxedUnit -> {
                return consumer.partitionedStream(deserializer, deserializer2).flatMapPar(Integer.MAX_VALUE, consumerSettings.perPartitionChunkPrefetch(), tuple2 -> {
                    if (tuple2 != null) {
                        return ((ZStreamChunk) tuple2._2()).mapM(committableRecord -> {
                            if (committableRecord == null) {
                                throw new MatchError(committableRecord);
                            }
                            ConsumerRecord record = committableRecord.record();
                            Offset offset = committableRecord.offset();
                            return ((ZIO) function2.apply(record.key(), record.value())).as(() -> {
                                return offset;
                            });
                        }).flattenChunks();
                    }
                    throw new MatchError(tuple2);
                });
            });
        }).aggregateAsync(offsetBatches()).mapM(offsetBatch -> {
            return offsetBatch.commitOrRetry(schedule);
        }).runDrain();
    }

    public <R, R1, K, V> Schedule<Clock, Object, Object> consumeWith$default$5() {
        return Schedule$.MODULE$.exponential(zio.duration.package$.MODULE$.durationInt(1).second(), Schedule$.MODULE$.exponential$default$2()).$amp$amp(Schedule$.MODULE$.recurs(3));
    }

    private Consumer$() {
        MODULE$ = this;
        this.offsetBatches = ZSink$.MODULE$.foldLeft(OffsetBatch$.MODULE$.empty(), (offsetBatch, offset) -> {
            return offsetBatch.merge(offset);
        });
        this.bitmap$init$0 = (byte) (this.bitmap$init$0 | 1);
    }
}
