package org.springframework.integration.stomp.outbound;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.Lifecycle;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.mapping.HeaderMapper;
import org.springframework.integration.stomp.StompSessionManager;
import org.springframework.integration.stomp.event.StompExceptionEvent;
import org.springframework.integration.stomp.event.StompReceiptEvent;
import org.springframework.integration.stomp.support.StompHeaderMapper;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.simp.stomp.ConnectionLostException;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSessionHandler;
import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/integration/stomp/outbound/StompMessageHandler.class */
public class StompMessageHandler extends AbstractMessageHandler implements ApplicationEventPublisherAware, Lifecycle {
    private static final int DEFAULT_CONNECT_TIMEOUT = 3000;
    private final StompSessionManager stompSessionManager;
    private volatile StompSession stompSession;
    private volatile Throwable transportError;
    private volatile boolean running;
    private Expression destinationExpression;
    private EvaluationContext evaluationContext;
    private ApplicationEventPublisher applicationEventPublisher;
    private final StompSessionHandler sessionHandler = new IntegrationOutboundStompSessionHandler();
    private final Semaphore connectSemaphore = new Semaphore(0);
    private volatile HeaderMapper<StompHeaders> headerMapper = new StompHeaderMapper();
    private volatile long connectTimeout = 3000;

    /* loaded from: input_file:org/springframework/integration/stomp/outbound/StompMessageHandler$IntegrationOutboundStompSessionHandler.class */
    private class IntegrationOutboundStompSessionHandler extends StompSessionHandlerAdapter {
        private IntegrationOutboundStompSessionHandler() {
        }

        public void afterConnected(StompSession stompSession, StompHeaders stompHeaders) {
            StompMessageHandler.this.transportError = null;
            StompMessageHandler.this.stompSession = stompSession;
            StompMessageHandler.this.connectSemaphore.release();
        }

        public void handleFrame(StompHeaders stompHeaders, Object obj) {
            Object obj2 = obj;
            if (obj2 == null) {
                obj2 = stompHeaders.getFirst("message");
            }
            if (obj2 != null) {
                Throwable messageDeliveryException = new MessageDeliveryException(StompMessageHandler.this.getMessageBuilderFactory().withPayload(obj2).copyHeaders(StompMessageHandler.this.headerMapper.toHeaders(stompHeaders)).build(), "STOMP frame handling error.");
                StompMessageHandler.this.logger.error("STOMP frame handling error.", messageDeliveryException);
                if (StompMessageHandler.this.applicationEventPublisher != null) {
                    StompMessageHandler.this.applicationEventPublisher.publishEvent(new StompExceptionEvent(StompMessageHandler.this, messageDeliveryException));
                }
            }
        }

        public void handleException(StompSession stompSession, StompCommand stompCommand, StompHeaders stompHeaders, byte[] bArr, Throwable th) {
            StompMessageHandler.this.logger.error("The exception for session [" + stompSession + "] on message [" + MessageBuilder.createMessage(bArr, StompHeaderAccessor.create(stompCommand, stompHeaders).getMessageHeaders()) + "]", th);
        }

        public void handleTransportError(StompSession stompSession, Throwable th) {
            StompMessageHandler.this.transportError = th;
            StompMessageHandler.this.stompSession = null;
        }
    }

    public StompMessageHandler(StompSessionManager stompSessionManager) {
        Assert.notNull(stompSessionManager, "'stompSessionManager' is required.");
        this.stompSessionManager = stompSessionManager;
    }

    public void setDestination(String str) {
        Assert.hasText(str, "'destination' must not be empty.");
        this.destinationExpression = new ValueExpression(str);
    }

    public void setDestinationExpression(Expression expression) {
        Assert.notNull(expression, "'destinationExpression' must not be null.");
        this.destinationExpression = expression;
    }

    public void setHeaderMapper(HeaderMapper<StompHeaders> headerMapper) {
        Assert.notNull(headerMapper, "'headerMapper' must not be null.");
        this.headerMapper = headerMapper;
    }

    public void setConnectTimeout(long j) {
        this.connectTimeout = j;
    }

    public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) {
        this.evaluationContext = evaluationContext;
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    protected void onInit() throws Exception {
        super.onInit();
        if (this.evaluationContext == null) {
            this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
        }
    }

    protected void handleMessageInternal(final Message<?> message) throws Exception {
        try {
            connectIfNecessary();
            StompSession stompSession = this.stompSession;
            StompHeaders stompHeaders = new StompHeaders();
            this.headerMapper.fromHeaders(message.getHeaders(), stompHeaders);
            if (stompHeaders.getDestination() == null) {
                Assert.state(this.destinationExpression != null, "One of 'destination' or 'destinationExpression' must be provided, if message header doesn't supply 'destination' STOMP header.");
                stompHeaders.setDestination((String) this.destinationExpression.getValue(this.evaluationContext, message, String.class));
            }
            final StompSession.Receiptable send = stompSession.send(stompHeaders, message.getPayload());
            if (send.getReceiptId() != null) {
                final String destination = stompHeaders.getDestination();
                if (this.applicationEventPublisher != null) {
                    send.addReceiptTask(new Runnable() { // from class: org.springframework.integration.stomp.outbound.StompMessageHandler.1
                        @Override // java.lang.Runnable
                        public void run() {
                            StompReceiptEvent stompReceiptEvent = new StompReceiptEvent(StompMessageHandler.this, destination, send.getReceiptId(), StompCommand.SEND, false);
                            stompReceiptEvent.setMessage(message);
                            StompMessageHandler.this.applicationEventPublisher.publishEvent(stompReceiptEvent);
                        }
                    });
                }
                send.addReceiptLostTask(new Runnable() { // from class: org.springframework.integration.stomp.outbound.StompMessageHandler.2
                    @Override // java.lang.Runnable
                    public void run() {
                        if (StompMessageHandler.this.applicationEventPublisher == null) {
                            StompMessageHandler.this.logger.error("The receipt [" + send.getReceiptId() + "] is lost for [" + message + "] on destination [" + destination + "]");
                            return;
                        }
                        StompReceiptEvent stompReceiptEvent = new StompReceiptEvent(StompMessageHandler.this, destination, send.getReceiptId(), StompCommand.SEND, true);
                        stompReceiptEvent.setMessage(message);
                        StompMessageHandler.this.applicationEventPublisher.publishEvent(stompReceiptEvent);
                    }
                });
            }
        } catch (Exception e) {
            throw new MessageDeliveryException(message, "The [" + this + "] could not deliver message.", e);
        }
    }

    private StompSession connectIfNecessary() throws Exception {
        StompSession stompSession;
        synchronized (this.connectSemaphore) {
            if (this.stompSession == null || !this.stompSessionManager.isConnected()) {
                this.stompSessionManager.disconnect(this.sessionHandler);
                this.stompSessionManager.connect(this.sessionHandler);
                if (!this.connectSemaphore.tryAcquire(this.connectTimeout, TimeUnit.MILLISECONDS) || this.stompSession == null) {
                    if (this.transportError == null) {
                        throw new ConnectionLostException("Failed to obtain StompSession during timeout: " + this.connectTimeout);
                    }
                    if (this.transportError instanceof ConnectionLostException) {
                        throw this.transportError;
                    }
                    throw new ConnectionLostException(this.transportError.getMessage());
                }
            }
            stompSession = this.stompSession;
        }
        return stompSession;
    }

    public void start() {
        this.running = true;
    }

    public void stop() {
        this.running = false;
        this.stompSessionManager.disconnect(this.sessionHandler);
    }

    public boolean isRunning() {
        return this.running;
    }
}
