package org.springframework.cloud.appbroker.logging.streaming.endpoint;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.cloudfoundry.dropsonde.events.Envelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.appbroker.logging.streaming.events.ServiceInstanceLogEvent;
import org.springframework.cloud.appbroker.logging.streaming.events.StartServiceInstanceLoggingEvent;
import org.springframework.cloud.appbroker.logging.streaming.events.StopServiceInstanceLoggingEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationListener;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.util.UriTemplate;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/springframework/cloud/appbroker/logging/streaming/endpoint/StreamingLogWebSocketHandler.class */
public class StreamingLogWebSocketHandler implements WebSocketHandler, ApplicationListener<ServiceInstanceLogEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingLogWebSocketHandler.class);
    private static final UriTemplate LOGGING_URI_TEMPLATE = new UriTemplate("/logs/{serviceInstanceId}/stream");
    private final ApplicationEventPublisher eventPublisher;
    private final ConcurrentHashMap<String, EmitterProcessor<Envelope>> processors = new ConcurrentHashMap<>();

    public StreamingLogWebSocketHandler(ApplicationEventPublisher applicationEventPublisher) {
        this.eventPublisher = applicationEventPublisher;
    }

    public Mono<Void> handle(WebSocketSession webSocketSession) {
        String serviceInstanceId = getServiceInstanceId(webSocketSession);
        LOG.info("Connection established [{}}], service instance {}", webSocketSession.getHandshakeInfo().getRemoteAddress(), serviceInstanceId);
        EmitterProcessor<Envelope> computeIfAbsent = this.processors.computeIfAbsent(serviceInstanceId, str -> {
            return EmitterProcessor.create();
        });
        this.eventPublisher.publishEvent(new StartServiceInstanceLoggingEvent(this, serviceInstanceId));
        LOG.info("Published event to start streaming logs for service instance with ID {}", serviceInstanceId);
        return webSocketSession.send(computeIfAbsent.map(envelope -> {
            return webSocketSession.binaryMessage(dataBufferFactory -> {
                return dataBufferFactory.wrap(Envelope.ADAPTER.encode(envelope));
            });
        })).then().doFinally(signalType -> {
            afterConnectionClosed(webSocketSession);
        }).doOnError(th -> {
            LOG.error("Error handling logging stream for service instance " + serviceInstanceId, th);
        });
    }

    public void onApplicationEvent(ServiceInstanceLogEvent serviceInstanceLogEvent) {
        broadcastLogMessage(serviceInstanceLogEvent);
    }

    public void broadcastLogMessage(ServiceInstanceLogEvent serviceInstanceLogEvent) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received event to broadcast log message for " + serviceInstanceLogEvent.getServiceInstanceId());
        }
        EmitterProcessor<Envelope> emitterProcessor = this.processors.get(serviceInstanceLogEvent.getServiceInstanceId());
        if (emitterProcessor == null) {
            if (LOG.isWarnEnabled()) {
                LOG.warn("No processor found for {}, stopping log streaming", serviceInstanceLogEvent.getServiceInstanceId());
            }
            this.eventPublisher.publishEvent(new StopServiceInstanceLoggingEvent(this, serviceInstanceLogEvent.getServiceInstanceId()));
        } else {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending message to client for {}", serviceInstanceLogEvent.getServiceInstanceId());
            }
            emitterProcessor.onNext(serviceInstanceLogEvent.getEnvelope());
        }
    }

    private void afterConnectionClosed(WebSocketSession webSocketSession) {
        if (LOG.isInfoEnabled()) {
            LOG.info("Connection closed [" + webSocketSession.getHandshakeInfo().getRemoteAddress() + "]");
        }
        String serviceInstanceId = getServiceInstanceId(webSocketSession);
        this.eventPublisher.publishEvent(new StopServiceInstanceLoggingEvent(this, serviceInstanceId));
        this.processors.computeIfPresent(serviceInstanceId, (str, emitterProcessor) -> {
            return null;
        });
    }

    private String getServiceInstanceId(WebSocketSession webSocketSession) {
        Map match = LOGGING_URI_TEMPLATE.match(webSocketSession.getHandshakeInfo().getUri().getPath());
        if (match.isEmpty()) {
            throw new ServiceInstanceNotFoundException();
        }
        return (String) match.get("serviceInstanceId");
    }
}
