package org.springframework.messaging.simp.stomp;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import reactor.core.Environment;
import reactor.core.composable.Promise;
import reactor.function.Consumer;
import reactor.tcp.TcpClient;
import reactor.tcp.TcpConnection;
import reactor.tcp.encoding.DelimitedCodec;
import reactor.tcp.encoding.StandardCodecs;
import reactor.tcp.netty.NettyTcpClient;
import reactor.tcp.spec.TcpClientSpec;

/* loaded from: input_file:org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.class */
public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLifecycle {
    private static final Log logger = LogFactory.getLog(StompBrokerRelayMessageHandler.class);
    private static final String STOMP_RELAY_SYSTEM_SESSION_ID = "stompRelaySystemSessionId";
    private final MessageChannel messageChannel;
    private final String[] destinationPrefixes;
    private Environment environment;
    private TcpClient<String, String> tcpClient;
    private String relayHost = "127.0.0.1";
    private int relayPort = 61613;
    private String systemLogin = "guest";
    private String systemPasscode = "guest";
    private final StompMessageConverter stompMessageConverter = new StompMessageConverter();
    private final Map<String, RelaySession> relaySessions = new ConcurrentHashMap();
    private Object lifecycleMonitor = new Object();
    private boolean running = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler$RelaySession.class */
    public class RelaySession {
        private final String sessionId;
        private Promise<TcpConnection<String, String>> promise;
        private final BlockingQueue<Message<?>> messageQueue = new LinkedBlockingQueue(50);
        private volatile boolean isConnected = false;
        private final Object monitor = new Object();

        public RelaySession(String str) {
            Assert.notNull(str, "sessionId is required");
            this.sessionId = str;
        }

        public void open(final Message<?> message) {
            Assert.notNull(message, "message is required");
            this.promise = StompBrokerRelayMessageHandler.this.tcpClient.open();
            this.promise.consume(new Consumer<TcpConnection<String, String>>() { // from class: org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.RelaySession.1
                public void accept(TcpConnection<String, String> tcpConnection) {
                    tcpConnection.in().consume(new Consumer<String>() { // from class: org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.RelaySession.1.1
                        public void accept(String str) {
                            RelaySession.this.readStompFrame(str);
                        }
                    });
                    RelaySession.this.forwardInternal(message, tcpConnection);
                }
            });
            this.promise.onError(new Consumer<Throwable>() { // from class: org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.RelaySession.2
                public void accept(Throwable th) {
                    StompBrokerRelayMessageHandler.this.relaySessions.remove(RelaySession.this.sessionId);
                    StompBrokerRelayMessageHandler.logger.error("Failed to connect to broker", th);
                    RelaySession.this.sendError(RelaySession.this.sessionId, "Failed to connect to message broker " + th.toString());
                }
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void readStompFrame(String str) {
            if (StringUtils.isEmpty(str)) {
                return;
            }
            Message<?> message = StompBrokerRelayMessageHandler.this.stompMessageConverter.toMessage(str);
            if (StompBrokerRelayMessageHandler.logger.isTraceEnabled()) {
                StompBrokerRelayMessageHandler.logger.trace("Reading message " + message);
            }
            StompHeaderAccessor wrap = StompHeaderAccessor.wrap(message);
            if (StompCommand.CONNECTED != wrap.getCommand()) {
                wrap.setSessionId(this.sessionId);
                sendMessageToClient(MessageBuilder.withPayloadAndHeaders(message.getPayload(), wrap).build());
            } else {
                synchronized (this.monitor) {
                    this.isConnected = true;
                    flushMessages((TcpConnection) this.promise.get());
                }
            }
        }

        protected void sendMessageToClient(Message<?> message) {
            StompBrokerRelayMessageHandler.this.messageChannel.send(message);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void sendError(String str, String str2) {
            StompHeaderAccessor create = StompHeaderAccessor.create(StompCommand.ERROR);
            create.setSessionId(str);
            create.setMessage(str2);
            sendMessageToClient(MessageBuilder.withPayloadAndHeaders(new byte[0], create).build());
        }

        public void forward(Message<?> message) {
            if (!this.isConnected) {
                synchronized (this.monitor) {
                    if (!this.isConnected) {
                        this.messageQueue.add(message);
                        if (StompBrokerRelayMessageHandler.logger.isTraceEnabled()) {
                            StompBrokerRelayMessageHandler.logger.trace("Not connected yet, message queued, queue size=" + this.messageQueue.size());
                        }
                        return;
                    }
                }
            }
            TcpConnection<String, String> tcpConnection = (TcpConnection) this.promise.get();
            if (this.messageQueue.isEmpty()) {
                forwardInternal(message, tcpConnection);
            } else {
                this.messageQueue.add(message);
                flushMessages(tcpConnection);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean forwardInternal(Message<?> message, TcpConnection<String, String> tcpConnection) {
            if (StompBrokerRelayMessageHandler.logger.isTraceEnabled()) {
                StompBrokerRelayMessageHandler.logger.trace("Forwarding message to STOMP broker, message id=" + message.getHeaders().getId());
            }
            tcpConnection.send(new String(StompBrokerRelayMessageHandler.this.stompMessageConverter.fromMessage(message), Charset.forName("UTF-8")));
            return true;
        }

        private void flushMessages(TcpConnection<String, String> tcpConnection) {
            ArrayList arrayList = new ArrayList();
            this.messageQueue.drainTo(arrayList);
            Iterator it = arrayList.iterator();
            while (it.hasNext() && forwardInternal((Message) it.next(), tcpConnection)) {
            }
        }
    }

    public StompBrokerRelayMessageHandler(MessageChannel messageChannel, Collection<String> collection) {
        Assert.notNull(messageChannel, "messageChannel is required");
        Assert.notNull(collection, "destinationPrefixes is required");
        this.messageChannel = messageChannel;
        this.destinationPrefixes = (String[]) collection.toArray(new String[collection.size()]);
    }

    public void setRelayHost(String str) {
        Assert.hasText(str, "relayHost must not be empty");
        this.relayHost = str;
    }

    public String getRelayHost() {
        return this.relayHost;
    }

    public void setRelayPort(int i) {
        this.relayPort = i;
    }

    public int getRelayPort() {
        return this.relayPort;
    }

    public void setSystemLogin(String str) {
        Assert.hasText(str, "systemLogin must not be empty");
        this.systemLogin = str;
    }

    public String getSystemLogin() {
        return this.systemLogin;
    }

    public void setSystemPasscode(String str) {
        this.systemPasscode = str;
    }

    public String getSystemPasscode() {
        return this.systemPasscode;
    }

    public String[] getDestinationPrefixes() {
        return this.destinationPrefixes;
    }

    public boolean isAutoStartup() {
        return true;
    }

    public int getPhase() {
        return Integer.MAX_VALUE;
    }

    public boolean isRunning() {
        boolean z;
        synchronized (this.lifecycleMonitor) {
            z = this.running;
        }
        return z;
    }

    public void start() {
        synchronized (this.lifecycleMonitor) {
            if (logger.isDebugEnabled()) {
                logger.debug("Starting STOMP broker relay");
            }
            this.environment = new Environment();
            this.tcpClient = (TcpClient) new TcpClientSpec(NettyTcpClient.class).env(this.environment).codec(new DelimitedCodec((byte) 0, true, StandardCodecs.STRING_CODEC)).connect(this.relayHost, this.relayPort).get();
            openSystemSession();
            this.running = true;
        }
    }

    private void openSystemSession() {
        RelaySession relaySession = new RelaySession(STOMP_RELAY_SYSTEM_SESSION_ID) { // from class: org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.1
            @Override // org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.RelaySession
            protected void sendMessageToClient(Message<?> message) {
            }
        };
        this.relaySessions.put(STOMP_RELAY_SYSTEM_SESSION_ID, relaySession);
        StompHeaderAccessor create = StompHeaderAccessor.create(StompCommand.CONNECT);
        create.setAcceptVersion("1.1,1.2");
        create.setLogin(this.systemLogin);
        create.setPasscode(this.systemPasscode);
        create.setHeartbeat(0L, 0L);
        if (logger.isDebugEnabled()) {
            logger.debug("Sending STOMP CONNECT frame to initialize \"system\" TCP connection");
        }
        relaySession.open(MessageBuilder.withPayloadAndHeaders(new byte[0], create).build());
    }

    public void stop() {
        synchronized (this.lifecycleMonitor) {
            if (logger.isDebugEnabled()) {
                logger.debug("Stopping STOMP broker relay");
            }
            this.running = false;
            try {
                this.tcpClient.close().await();
            } catch (Throwable th) {
                logger.error("Failed to close reactor TCP client", th);
            }
            try {
                this.environment.shutdown();
            } catch (Throwable th2) {
                logger.error("Failed to shut down reactor Environment", th2);
            }
        }
    }

    public void stop(Runnable runnable) {
        synchronized (this.lifecycleMonitor) {
            stop();
            runnable.run();
        }
    }

    @Override // org.springframework.messaging.MessageHandler
    public void handleMessage(Message<?> message) {
        StompHeaderAccessor wrap = StompHeaderAccessor.wrap(message);
        String sessionId = wrap.getSessionId();
        String destination = wrap.getDestination();
        StompCommand command = wrap.getCommand();
        SimpMessageType messageType = wrap.getMessageType();
        if (!this.running) {
            if (logger.isTraceEnabled()) {
                logger.trace("STOMP broker relay not running. Ignoring message id=" + wrap.getId());
                return;
            }
            return;
        }
        if (SimpMessageType.MESSAGE.equals(messageType)) {
            sessionId = sessionId == null ? STOMP_RELAY_SYSTEM_SESSION_ID : sessionId;
            wrap.setSessionId(sessionId);
            command = command == null ? StompCommand.SEND : command;
            wrap.setCommandIfNotSet(command);
            message = MessageBuilder.withPayloadAndHeaders(message.getPayload(), wrap).build();
        }
        if (wrap.getCommand() == null) {
            logger.error("Ignoring message, no STOMP command: " + message);
            return;
        }
        if (sessionId == null) {
            logger.error("Ignoring message, no sessionId: " + message);
            return;
        }
        try {
            if (checkDestinationPrefix(command, destination)) {
                if (logger.isTraceEnabled()) {
                    logger.trace("Processing message: " + message);
                }
                if (SimpMessageType.CONNECT.equals(messageType)) {
                    wrap.setHeartbeat(0L, 0L);
                    Message<?> build = MessageBuilder.withPayloadAndHeaders(message.getPayload(), wrap).build();
                    RelaySession relaySession = new RelaySession(sessionId);
                    this.relaySessions.put(sessionId, relaySession);
                    relaySession.open(build);
                } else if (SimpMessageType.DISCONNECT.equals(messageType)) {
                    RelaySession remove = this.relaySessions.remove(sessionId);
                    if (remove == null) {
                        if (logger.isTraceEnabled()) {
                            logger.trace("Session already removed, sessionId=" + sessionId);
                            return;
                        }
                        return;
                    }
                    remove.forward(message);
                } else {
                    RelaySession relaySession2 = this.relaySessions.get(sessionId);
                    if (relaySession2 == null) {
                        logger.warn("Session id=" + sessionId + " not found. Ignoring message: " + message);
                        return;
                    }
                    relaySession2.forward(message);
                }
            }
        } catch (Throwable th) {
            logger.error("Failed to handle message " + message, th);
        }
    }

    protected boolean checkDestinationPrefix(StompCommand stompCommand, String str) {
        if (!stompCommand.requiresDestination()) {
            return true;
        }
        if (str == null) {
            return false;
        }
        for (String str2 : this.destinationPrefixes) {
            if (str.startsWith(str2)) {
                return true;
            }
        }
        return false;
    }
}
