package org.springframework.cloud.gateway.rsocket.cluster;

import io.rsocket.SocketAcceptor;
import java.math.BigInteger;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.cloud.gateway.rsocket.actuate.BrokerActuator;
import org.springframework.cloud.gateway.rsocket.actuate.BrokerInfo;
import org.springframework.cloud.gateway.rsocket.autoconfigure.BrokerProperties;
import org.springframework.cloud.gateway.rsocket.common.autoconfigure.Broker;
import org.springframework.cloud.gateway.rsocket.common.metadata.Forwarding;
import org.springframework.cloud.gateway.rsocket.common.metadata.RouteSetup;
import org.springframework.cloud.gateway.rsocket.common.metadata.TagsMetadata;
import org.springframework.cloud.gateway.rsocket.core.GatewayRSocketFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

/* loaded from: input_file:org/springframework/cloud/gateway/rsocket/cluster/ClusterJoinListener.class */
public class ClusterJoinListener implements ApplicationListener<ApplicationReadyEvent> {
    private final ClusterService clusterService;
    private final BrokerProperties properties;
    private final RSocketStrategies strategies;
    private final GatewayRSocketFactory gatewayRSocketFactory;

    public ClusterJoinListener(ClusterService clusterService, BrokerProperties brokerProperties, RSocketStrategies rSocketStrategies, GatewayRSocketFactory gatewayRSocketFactory) {
        this.clusterService = clusterService;
        this.properties = brokerProperties;
        this.strategies = rSocketStrategies;
        this.gatewayRSocketFactory = gatewayRSocketFactory;
    }

    public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
        for (Broker broker : this.properties.getBrokers()) {
            RSocketRequester.builder().rsocketStrategies(this.strategies).setupMetadata(RouteSetup.of(this.properties.getRouteId(), this.properties.getServiceName()).build(), RouteSetup.ROUTE_SETUP_MIME_TYPE).rsocketFactory(clientRSocketFactory -> {
                clientRSocketFactory.acceptor(brokerSocketAcceptor());
            }).connectTcp(broker.getHost(), broker.getPort()).flatMap(this::callBrokerInfo).subscribe(this::registerOutgoing);
        }
    }

    SocketAcceptor brokerSocketAcceptor() {
        return (connectionSetupPayload, rSocket) -> {
            TagsMetadata.Builder builder = TagsMetadata.builder();
            builder.serviceName(this.properties.getServiceName()).routeId(this.properties.getRouteId().toString());
            return Mono.just(this.gatewayRSocketFactory.create(builder.build()));
        };
    }

    Mono<Tuple2<BigInteger, RSocketRequester>> callBrokerInfo(RSocketRequester rSocketRequester) {
        return rSocketRequester.route(BrokerActuator.BROKER_INFO_PATH, new Object[0]).metadata(Forwarding.of(this.properties.getRouteId()).serviceName("gateway").disableProxy().build(), Forwarding.FORWARDING_MIME_TYPE).data(BrokerInfo.of(this.properties.getRouteId()).build()).retrieveMono(BigInteger.class).map(bigInteger -> {
            return Tuples.of(bigInteger, rSocketRequester);
        });
    }

    boolean registerOutgoing(Tuple2<BigInteger, RSocketRequester> tuple2) {
        return this.clusterService.registerOutgoing(((BigInteger) tuple2.getT1()).toString(), (RSocketRequester) tuple2.getT2());
    }
}
