package io.rsocket.micrometer;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.frame.FrameType;
import io.rsocket.plugins.DuplexConnectionInterceptor;
import java.util.Objects;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:BOOT-INF/lib/rsocket-micrometer-1.0.0.jar:io/rsocket/micrometer/MicrometerDuplexConnection.class */
final class MicrometerDuplexConnection implements DuplexConnection {
    private final Counter close;
    private final DuplexConnection delegate;
    private final Counter dispose;
    private final FrameCounters frameCounters;

    /* loaded from: input_file:BOOT-INF/lib/rsocket-micrometer-1.0.0.jar:io/rsocket/micrometer/MicrometerDuplexConnection$FrameCounters.class */
    private static final class FrameCounters implements Consumer<ByteBuf> {
        private final Logger logger;
        private final Counter cancel;
        private final Counter complete;
        private final Counter error;
        private final Counter extension;
        private final Counter keepalive;
        private final Counter lease;
        private final Counter metadataPush;
        private final Counter next;
        private final Counter nextComplete;
        private final Counter payload;
        private final Counter requestChannel;
        private final Counter requestFireAndForget;
        private final Counter requestN;
        private final Counter requestResponse;
        private final Counter requestStream;
        private final Counter resume;
        private final Counter resumeOk;
        private final Counter setup;
        private final Counter unknown;

        private FrameCounters(DuplexConnectionInterceptor.Type type, MeterRegistry meterRegistry, Tag... tagArr) {
            this.logger = LoggerFactory.getLogger(getClass());
            this.cancel = counter(type, meterRegistry, FrameType.CANCEL, tagArr);
            this.complete = counter(type, meterRegistry, FrameType.COMPLETE, tagArr);
            this.error = counter(type, meterRegistry, FrameType.ERROR, tagArr);
            this.extension = counter(type, meterRegistry, FrameType.EXT, tagArr);
            this.keepalive = counter(type, meterRegistry, FrameType.KEEPALIVE, tagArr);
            this.lease = counter(type, meterRegistry, FrameType.LEASE, tagArr);
            this.metadataPush = counter(type, meterRegistry, FrameType.METADATA_PUSH, tagArr);
            this.next = counter(type, meterRegistry, FrameType.NEXT, tagArr);
            this.nextComplete = counter(type, meterRegistry, FrameType.NEXT_COMPLETE, tagArr);
            this.payload = counter(type, meterRegistry, FrameType.PAYLOAD, tagArr);
            this.requestChannel = counter(type, meterRegistry, FrameType.REQUEST_CHANNEL, tagArr);
            this.requestFireAndForget = counter(type, meterRegistry, FrameType.REQUEST_FNF, tagArr);
            this.requestN = counter(type, meterRegistry, FrameType.REQUEST_N, tagArr);
            this.requestResponse = counter(type, meterRegistry, FrameType.REQUEST_RESPONSE, tagArr);
            this.requestStream = counter(type, meterRegistry, FrameType.REQUEST_STREAM, tagArr);
            this.resume = counter(type, meterRegistry, FrameType.RESUME, tagArr);
            this.resumeOk = counter(type, meterRegistry, FrameType.RESUME_OK, tagArr);
            this.setup = counter(type, meterRegistry, FrameType.SETUP, tagArr);
            this.unknown = counter(type, meterRegistry, "UNKNOWN", tagArr);
        }

        private static Counter counter(DuplexConnectionInterceptor.Type type, MeterRegistry meterRegistry, FrameType frameType, Tag... tagArr) {
            return counter(type, meterRegistry, frameType.name(), tagArr);
        }

        private static Counter counter(DuplexConnectionInterceptor.Type type, MeterRegistry meterRegistry, String str, Tag... tagArr) {
            return meterRegistry.counter("rsocket.frame", Tags.of(tagArr).and("connection.type", type.name()).and("frame.type", str));
        }

        @Override // java.util.function.Consumer
        public void accept(ByteBuf byteBuf) {
            FrameType frameType = FrameHeaderCodec.frameType(byteBuf);
            switch (frameType) {
                case SETUP:
                    this.setup.increment();
                    return;
                case LEASE:
                    this.lease.increment();
                    return;
                case KEEPALIVE:
                    this.keepalive.increment();
                    return;
                case REQUEST_RESPONSE:
                    this.requestResponse.increment();
                    return;
                case REQUEST_FNF:
                    this.requestFireAndForget.increment();
                    return;
                case REQUEST_STREAM:
                    this.requestStream.increment();
                    return;
                case REQUEST_CHANNEL:
                    this.requestChannel.increment();
                    return;
                case REQUEST_N:
                    this.requestN.increment();
                    return;
                case CANCEL:
                    this.cancel.increment();
                    return;
                case PAYLOAD:
                    this.payload.increment();
                    return;
                case ERROR:
                    this.error.increment();
                    return;
                case METADATA_PUSH:
                    this.metadataPush.increment();
                    return;
                case RESUME:
                    this.resume.increment();
                    return;
                case RESUME_OK:
                    this.resumeOk.increment();
                    return;
                case NEXT:
                    this.next.increment();
                    return;
                case COMPLETE:
                    this.complete.increment();
                    return;
                case NEXT_COMPLETE:
                    this.nextComplete.increment();
                    return;
                case EXT:
                    this.extension.increment();
                    return;
                default:
                    this.logger.debug("Skipping count of unknown frame type: {}", frameType);
                    this.unknown.increment();
                    return;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MicrometerDuplexConnection(DuplexConnectionInterceptor.Type type, DuplexConnection duplexConnection, MeterRegistry meterRegistry, Tag... tagArr) {
        Objects.requireNonNull(type, "connectionType must not be null");
        this.delegate = (DuplexConnection) Objects.requireNonNull(duplexConnection, "delegate must not be null");
        Objects.requireNonNull(meterRegistry, "meterRegistry must not be null");
        this.close = meterRegistry.counter("rsocket.duplex.connection.close", Tags.of(tagArr).and("connection.type", type.name()));
        this.dispose = meterRegistry.counter("rsocket.duplex.connection.dispose", Tags.of(tagArr).and("connection.type", type.name()));
        this.frameCounters = new FrameCounters(type, meterRegistry, tagArr);
    }

    @Override // io.rsocket.DuplexConnection
    public ByteBufAllocator alloc() {
        return this.delegate.alloc();
    }

    @Override // reactor.core.Disposable
    public void dispose() {
        this.delegate.dispose();
        this.dispose.increment();
    }

    @Override // io.rsocket.Closeable
    public Mono<Void> onClose() {
        Mono<Void> onClose = this.delegate.onClose();
        Counter counter = this.close;
        counter.getClass();
        return onClose.doAfterTerminate(counter::increment);
    }

    @Override // io.rsocket.DuplexConnection
    public Flux<ByteBuf> receive() {
        return this.delegate.receive().doOnNext(this.frameCounters);
    }

    @Override // io.rsocket.DuplexConnection
    public Mono<Void> send(Publisher<ByteBuf> publisher) {
        Objects.requireNonNull(publisher, "frames must not be null");
        return this.delegate.send(Flux.from(publisher).doOnNext(this.frameCounters));
    }
}
