package org.springframework.integration.rsocket;

import io.rsocket.Closeable;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.transport.ServerTransport;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.transport.netty.server.WebsocketServerTransport;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.util.Assert;
import reactor.core.publisher.Mono;
import reactor.netty.http.server.HttpServer;

/* loaded from: input_file:org/springframework/integration/rsocket/ServerRSocketConnector.class */
public class ServerRSocketConnector extends AbstractRSocketConnector implements ApplicationEventPublisherAware {
    private final ServerTransport<? extends Closeable> serverTransport;
    private Consumer<RSocketFactory.ServerRSocketFactory> factoryConfigurer;
    private Mono<? extends Closeable> serverMono;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/integration/rsocket/ServerRSocketConnector$ServerRSocketAcceptor.class */
    public static class ServerRSocketAcceptor extends IntegrationRSocketAcceptor implements SocketAcceptor {
        private static final Log LOGGER = LogFactory.getLog(ServerRSocketAcceptor.class);
        private final Map<Object, RSocketRequester> clientRSocketRequesters;
        private BiFunction<String, DataBuffer, Object> clientRSocketKeyStrategy;
        private ApplicationEventPublisher applicationEventPublisher;

        private ServerRSocketAcceptor() {
            this.clientRSocketRequesters = new HashMap();
            this.clientRSocketKeyStrategy = (str, dataBuffer) -> {
                return str;
            };
        }

        public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
            DataBuffer payloadToDataBuffer = IntegrationRSocket.payloadToDataBuffer(connectionSetupPayload, getRSocketStrategies().dataBufferFactory());
            int refCount = IntegrationRSocket.refCount(payloadToDataBuffer);
            return Mono.just(createRSocket(connectionSetupPayload, rSocket)).doOnNext(integrationRSocket -> {
                String destination = integrationRSocket.getDestination(connectionSetupPayload);
                this.clientRSocketRequesters.put(this.clientRSocketKeyStrategy.apply(destination, payloadToDataBuffer), integrationRSocket.getRequester());
                RSocketConnectedEvent rSocketConnectedEvent = new RSocketConnectedEvent(integrationRSocket, destination, payloadToDataBuffer, integrationRSocket.getRequester());
                if (this.applicationEventPublisher != null) {
                    this.applicationEventPublisher.publishEvent(rSocketConnectedEvent);
                } else if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("The RSocket has been connected: " + rSocketConnectedEvent);
                }
            }).cast(RSocket.class).doFinally(signalType -> {
                if (IntegrationRSocket.refCount(payloadToDataBuffer) == refCount) {
                    DataBufferUtils.release(payloadToDataBuffer);
                }
            });
        }
    }

    public ServerRSocketConnector(String str, int i) {
        this((ServerTransport<? extends Closeable>) TcpServerTransport.create(str, i));
    }

    public ServerRSocketConnector(HttpServer httpServer) {
        this((ServerTransport<? extends Closeable>) WebsocketServerTransport.create(httpServer));
    }

    public ServerRSocketConnector(ServerTransport<? extends Closeable> serverTransport) {
        super(new ServerRSocketAcceptor());
        this.factoryConfigurer = serverRSocketFactory -> {
        };
        Assert.notNull(serverTransport, "'serverTransport' must not be null");
        this.serverTransport = serverTransport;
    }

    public void setFactoryConfigurer(Consumer<RSocketFactory.ServerRSocketFactory> consumer) {
        Assert.notNull(consumer, "'factoryConfigurer' must not be null");
        this.factoryConfigurer = consumer;
    }

    public void setClientRSocketKeyStrategy(BiFunction<String, DataBuffer, Object> biFunction) {
        Assert.notNull(biFunction, "'clientRSocketKeyStrategy' must not be null");
        serverRSocketAcceptor().clientRSocketKeyStrategy = biFunction;
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        serverRSocketAcceptor().applicationEventPublisher = applicationEventPublisher;
    }

    @Override // org.springframework.integration.rsocket.AbstractRSocketConnector
    public void afterPropertiesSet() {
        super.afterPropertiesSet();
        RSocketFactory.ServerRSocketFactory receive = RSocketFactory.receive();
        this.factoryConfigurer.accept(receive);
        this.serverMono = receive.acceptor(serverRSocketAcceptor()).transport(this.serverTransport).start().cache();
    }

    public Map<Object, RSocketRequester> getClientRSocketRequesters() {
        return Collections.unmodifiableMap(serverRSocketAcceptor().clientRSocketRequesters);
    }

    @Nullable
    public RSocketRequester getClientRSocketRequester(Object obj) {
        return (RSocketRequester) serverRSocketAcceptor().clientRSocketRequesters.get(obj);
    }

    private ServerRSocketAcceptor serverRSocketAcceptor() {
        return (ServerRSocketAcceptor) this.rsocketAcceptor;
    }

    @Override // org.springframework.integration.rsocket.AbstractRSocketConnector
    protected void doStart() {
        this.serverMono.subscribe();
    }

    public void destroy() {
        this.serverMono.doOnNext((v0) -> {
            v0.dispose();
        }).subscribe();
    }
}
