package org.springframework.yarn.integration;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.MessageDeliveryException;
import org.springframework.integration.MessagingException;
import org.springframework.integration.core.MessageHandler;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.core.SubscribableChannel;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.yarn.am.AppmasterService;
import org.springframework.yarn.am.GenericRpcMessage;
import org.springframework.yarn.am.RpcMessage;
import org.springframework.yarn.integration.support.IntegrationObjectSupport;
import org.springframework.yarn.integration.support.PortExposingTcpSocketSupport;

/* loaded from: input_file:org/springframework/yarn/integration/IntegrationAppmasterService.class */
public abstract class IntegrationAppmasterService<T> extends IntegrationObjectSupport implements AppmasterService {
    private static final Log log = LogFactory.getLog(IntegrationAppmasterService.class);
    private PortExposingTcpSocketSupport socketSupport;
    private SubscribableChannel messageChannel;
    private final MessagingTemplate messagingTemplate = new MessagingTemplate();
    private EventDrivenConsumer consumer;

    /* loaded from: input_file:org/springframework/yarn/integration/IntegrationAppmasterService$ReplyProducingHandler.class */
    private class ReplyProducingHandler implements MessageHandler {
        private ReplyProducingHandler() {
        }

        public void handleMessage(Message<?> message) throws MessagingException {
            IntegrationAppmasterService.this.sendMessage(MessageBuilder.withPayload(IntegrationAppmasterService.this.handleMessageInternal(new GenericRpcMessage<>(message.getPayload())).getBody()).build(), message.getHeaders().getReplyChannel());
        }
    }

    protected void doStart() {
        this.consumer = new EventDrivenConsumer(this.messageChannel, new ReplyProducingHandler());
        this.consumer.start();
    }

    protected void doStop() {
        if (this.consumer != null) {
            this.consumer.stop();
        }
    }

    public int getPort() {
        if (this.socketSupport != null) {
            return this.socketSupport.getServerSocketPort();
        }
        return -1;
    }

    public String getHost() {
        if (this.socketSupport != null) {
            return this.socketSupport.getServerSocketAddress();
        }
        return null;
    }

    public boolean hasPort() {
        return true;
    }

    public abstract RpcMessage<T> handleMessageInternal(RpcMessage<T> rpcMessage);

    public void setMessageChannel(SubscribableChannel subscribableChannel) {
        Assert.notNull(subscribableChannel, "messageChannel must not be null");
        this.messageChannel = subscribableChannel;
    }

    public void setSocketSupport(PortExposingTcpSocketSupport portExposingTcpSocketSupport) {
        Assert.notNull(portExposingTcpSocketSupport, "socketSupport must not be null");
        this.socketSupport = portExposingTcpSocketSupport;
        if (log.isDebugEnabled()) {
            log.debug("Setting socket support: " + portExposingTcpSocketSupport);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendMessage(Message<?> message, Object obj) {
        if (obj instanceof MessageChannel) {
            this.messagingTemplate.send((MessageChannel) obj, message);
        } else {
            if (!(obj instanceof String)) {
                throw new MessageDeliveryException(message, "a non-null reply channel value of type MessageChannel or String is required");
            }
            this.messagingTemplate.send((String) obj, message);
        }
    }
}
