/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.gateway.rsocket.core;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.ResponderRSocket;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.logging.Level;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.cloud.gateway.rsocket.autoconfigure.GatewayRSocketProperties;
import org.springframework.cloud.gateway.rsocket.core.GatewayExchange;
import org.springframework.cloud.gateway.rsocket.core.GatewayFilterChain;
import org.springframework.cloud.gateway.rsocket.core.PendingRequestRSocket;
import org.springframework.cloud.gateway.rsocket.registry.LoadBalancedRSocket;
import org.springframework.cloud.gateway.rsocket.registry.Registry;
import org.springframework.cloud.gateway.rsocket.route.Route;
import org.springframework.cloud.gateway.rsocket.route.Routes;
import org.springframework.cloud.gateway.rsocket.support.Metadata;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;

public class GatewayRSocket
extends AbstractRSocket
implements ResponderRSocket {
    private static final Log log = LogFactory.getLog(GatewayRSocket.class);
    private final Registry registry;
    private final Routes routes;
    private final MeterRegistry meterRegistry;
    private final GatewayRSocketProperties properties;
    private final Metadata metadata;

    GatewayRSocket(Registry registry, Routes routes, MeterRegistry meterRegistry, GatewayRSocketProperties properties, Metadata metadata) {
        this.registry = registry;
        this.routes = routes;
        this.meterRegistry = meterRegistry;
        this.properties = properties;
        this.metadata = metadata;
        this.onClose().doOnSuccess(v -> registry.deregister(metadata)).doOnError(t -> {
            if (log.isErrorEnabled()) {
                log.error((Object)("Error received, deregistering " + metadata), t);
            }
            registry.deregister(metadata);
        }).subscribe();
    }

    protected Registry getRegistry() {
        return this.registry;
    }

    protected Routes getRoutes() {
        return this.routes;
    }

    public Mono<Void> fireAndForget(Payload payload) {
        GatewayExchange exchange = this.createExchange(GatewayExchange.Type.FIRE_AND_FORGET, payload);
        return this.findRSocketOrCreatePending(exchange).flatMap(rSocket -> rSocket.fireAndForget(payload)).doOnError(t -> this.count(exchange, "error")).doFinally(s -> this.count(exchange, ""));
    }

    private GatewayExchange createExchange(GatewayExchange.Type type, Payload payload) {
        GatewayExchange exchange = GatewayExchange.fromPayload(type, payload);
        Tags tags = this.getTags(exchange);
        exchange.setTags(tags);
        return exchange;
    }

    private Tags getTags(GatewayExchange exchange) {
        String requesterName = this.metadata.getName();
        String requesterId = this.metadata.get("id");
        String responderName = exchange.getRoutingMetadata().getName();
        Assert.hasText((String)responderName, (String)"responderName must not be empty");
        Assert.hasText((String)requesterId, (String)"requesterId must not be empty");
        Assert.hasText((String)requesterName, (String)"requesterName must not be empty");
        return Tags.of((String[])new String[]{"requester.name", requesterName, "responder.name", responderName, "requester.id", requesterId, "gateway.id", this.properties.getId()});
    }

    public Flux<Payload> requestChannel(Payload payload, Publisher<Payload> payloads) {
        GatewayExchange exchange = this.createExchange(GatewayExchange.Type.REQUEST_CHANNEL, payload);
        Tags responderTags = Tags.of((String)"source", (String)"responder");
        return this.findRSocketOrCreatePending(exchange).flatMapMany(rSocket -> {
            Tags requesterTags = Tags.of((String)"source", (String)"requester");
            Flux flux = Flux.from((Publisher)payloads).doOnNext(s -> this.count(exchange, "payload", requesterTags)).doOnError(t -> this.count(exchange, "error", requesterTags)).doFinally(s -> this.count(exchange, requesterTags));
            if (rSocket instanceof ResponderRSocket) {
                ResponderRSocket socket = (ResponderRSocket)rSocket;
                return socket.requestChannel(payload, (Publisher)flux).log(GatewayRSocket.class.getName() + ".request-channel", Level.FINEST, new SignalType[0]);
            }
            return rSocket.requestChannel((Publisher)flux);
        }).doOnNext(s -> this.count(exchange, "payload", responderTags)).doOnError(t -> this.count(exchange, "error", responderTags)).doFinally(s -> this.count(exchange, responderTags));
    }

    private void count(GatewayExchange exchange, String suffix) {
        this.count(exchange, suffix, Tags.empty());
    }

    private void count(GatewayExchange exchange, Tags additionalTags) {
        this.count(exchange, null, additionalTags);
    }

    private void count(GatewayExchange exchange, String suffix, Tags additionalTags) {
        Tags tags = exchange.getTags().and((Iterable)additionalTags);
        String name = this.getMetricName(exchange, suffix);
        this.meterRegistry.counter(name, (Iterable)tags).increment();
    }

    private String getMetricName(GatewayExchange exchange) {
        return this.getMetricName(exchange, null);
    }

    private String getMetricName(GatewayExchange exchange, String suffix) {
        StringBuilder name = new StringBuilder("forward.");
        name.append(exchange.getType().getKey());
        if (StringUtils.hasLength((String)suffix)) {
            name.append(".");
            name.append(suffix);
        }
        return name.toString();
    }

    public Mono<Payload> requestResponse(Payload payload) {
        AtomicReference timer = new AtomicReference();
        GatewayExchange exchange = this.createExchange(GatewayExchange.Type.REQUEST_RESPONSE, payload);
        return this.findRSocketOrCreatePending(exchange).flatMap(rSocket -> rSocket.requestResponse(payload)).doOnSubscribe(s -> timer.set(Timer.start((MeterRegistry)this.meterRegistry))).doOnError(t -> this.count(exchange, "error")).doFinally(s -> ((Timer.Sample)timer.get()).stop(this.meterRegistry.timer(this.getMetricName(exchange), (Iterable)exchange.getTags())));
    }

    public Flux<Payload> requestStream(Payload payload) {
        GatewayExchange exchange = this.createExchange(GatewayExchange.Type.REQUEST_STREAM, payload);
        return this.findRSocketOrCreatePending(exchange).flatMapMany(rSocket -> rSocket.requestStream(payload)).doOnNext(s -> this.count(exchange, "payload")).doOnError(t -> this.count(exchange, "error")).doFinally(s -> this.count(exchange, Tags.empty()));
    }

    private Mono<RSocket> findRSocketOrCreatePending(GatewayExchange exchange) {
        return this.findRSocket(exchange).switchIfEmpty(this.createPendingRSocket(exchange));
    }

    private Mono<RSocket> createPendingRSocket(GatewayExchange exchange) {
        if (log.isDebugEnabled()) {
            log.debug((Object)("creating pending RSocket for " + exchange.getRoutingMetadata()));
        }
        PendingRequestRSocket pending = this.constructPendingRSocket(exchange);
        Disposable disposable = this.registry.addListener(pending);
        pending.setSubscriptionDisposable(disposable);
        return Mono.just((Object)pending);
    }

    PendingRequestRSocket constructPendingRSocket(GatewayExchange exchange) {
        Function<Registry.RegisteredEvent, Mono<Route>> routeFinder = registeredEvent -> this.getRouteMono((Registry.RegisteredEvent)registeredEvent, exchange);
        return new PendingRequestRSocket(routeFinder, map -> {
            Tags tags = exchange.getTags().and("responder.id", map.get("id"));
            exchange.setTags(tags);
        });
    }

    protected Mono<Route> getRouteMono(Registry.RegisteredEvent registeredEvent, GatewayExchange exchange) {
        return this.findRoute(exchange).log(PendingRequestRSocket.class.getName() + ".find route pending", Level.FINEST, new SignalType[0]).flatMap(route -> this.matchRoute((Route)route, registeredEvent.getRoutingMetadata()));
    }

    private Mono<Route> findRoute(GatewayExchange exchange) {
        Mono<Route> routeMono = this.routes.findRoute(exchange);
        return routeMono;
    }

    private Mono<Route> matchRoute(Route route, Metadata annoucementMetadata) {
        Metadata targetMetadata = route.getTargetMetadata();
        if (targetMetadata.matches(annoucementMetadata)) {
            return Mono.just((Object)route);
        }
        return Mono.empty();
    }

    private Mono<RSocket> findRSocket(GatewayExchange exchange) {
        return this.routes.findRoute(exchange).log(GatewayRSocket.class.getName() + ".find route", Level.FINEST, new SignalType[0]).flatMap(route -> {
            exchange.getAttributes().put("__route_attr_", route);
            return GatewayFilterChain.executeFilterChain(route.getFilters(), exchange).flatMap(success -> {
                LoadBalancedRSocket loadBalancedRSocket = this.registry.getRegistered(exchange.getRoutingMetadata());
                return loadBalancedRSocket.choose();
            }).map(enrichedRSocket -> {
                Metadata metadata = enrichedRSocket.getMetadata();
                Tags tags = exchange.getTags().and("responder.id", metadata.get("id"));
                exchange.setTags(tags);
                return enrichedRSocket;
            }).cast(RSocket.class).switchIfEmpty(this.doOnEmpty(exchange));
        });
    }

    private Mono<RSocket> doOnEmpty(GatewayExchange exchange) {
        if (log.isDebugEnabled()) {
            log.debug((Object)("Unable to find destination RSocket for " + exchange.getRoutingMetadata()));
        }
        return Mono.empty();
    }

    public static class Factory {
        private final Registry registry;
        private final Routes routes;
        private final MeterRegistry meterRegistry;
        private final GatewayRSocketProperties properties;

        public Factory(Registry registry, Routes routes, MeterRegistry meterRegistry, GatewayRSocketProperties properties) {
            this.registry = registry;
            this.routes = routes;
            this.meterRegistry = meterRegistry;
            this.properties = properties;
        }

        public GatewayRSocket create(Metadata metadata) {
            return new GatewayRSocket(this.registry, this.routes, this.meterRegistry, this.properties, metadata);
        }
    }
}

