package org.springframework.cloud.appbroker.logging.streaming;

import java.util.HashMap;
import java.util.Map;
import org.cloudfoundry.doppler.Envelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.appbroker.logging.LoggingUtils;
import org.springframework.cloud.appbroker.logging.streaming.events.ServiceInstanceLogEvent;
import org.springframework.cloud.appbroker.logging.streaming.events.ServiceInstanceLoggingEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationListener;
import reactor.core.Disposable;

/* loaded from: input_file:org/springframework/cloud/appbroker/logging/streaming/ApplicationLogStreamPublisher.class */
public class ApplicationLogStreamPublisher implements ApplicationListener<ServiceInstanceLoggingEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(ApplicationLogStreamPublisher.class);
    private final Map<String, Registration> registry = new HashMap();
    private final LogStreamPublisher<Envelope> logStreamPublisher;
    private final ApplicationEventPublisher publisher;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/appbroker/logging/streaming/ApplicationLogStreamPublisher$Registration.class */
    public static final class Registration {
        private final Disposable subscription;
        private int count;

        private Registration(Disposable disposable) {
            this.count = 1;
            this.subscription = disposable;
        }

        public void increment() {
            if (ApplicationLogStreamPublisher.LOG.isDebugEnabled()) {
                ApplicationLogStreamPublisher.LOG.debug("Incrementing subscription count from {} to {}", Integer.valueOf(this.count), Integer.valueOf(this.count + 1));
            }
            this.count++;
        }

        public int decrement() {
            if (ApplicationLogStreamPublisher.LOG.isDebugEnabled()) {
                ApplicationLogStreamPublisher.LOG.debug("Decrementing subscription count from {} to {}", Integer.valueOf(this.count), Integer.valueOf(this.count - 1));
            }
            int i = this.count - 1;
            this.count = i;
            return i;
        }

        public Disposable getSubscription() {
            return this.subscription;
        }
    }

    public ApplicationLogStreamPublisher(LogStreamPublisher<Envelope> logStreamPublisher, ApplicationEventPublisher applicationEventPublisher) {
        this.logStreamPublisher = logStreamPublisher;
        this.publisher = applicationEventPublisher;
    }

    public void onApplicationEvent(ServiceInstanceLoggingEvent serviceInstanceLoggingEvent) {
        String serviceInstanceId = serviceInstanceLoggingEvent.getServiceInstanceId();
        switch (serviceInstanceLoggingEvent.getOperation()) {
            case START:
                LOG.debug("Received event to begin listening to logs for {}", serviceInstanceId);
                startPublishing(serviceInstanceId);
                return;
            case STOP:
                LOG.debug("Received event to stop listening to logs for {}", serviceInstanceId);
                stopPublishing(serviceInstanceId);
                return;
            default:
                throw new IllegalArgumentException("Unknown operation: " + serviceInstanceLoggingEvent.getOperation());
        }
    }

    private void startPublishing(String str) {
        synchronized (this.registry) {
            Registration registration = this.registry.get(str);
            if (registration != null) {
                LOG.debug("Incrementing registration subscription count for {}", str);
                registration.increment();
            } else {
                Disposable subscribe = this.logStreamPublisher.getLogStream(str).map(LoggingUtils::convertDopplerEnvelopeToDropsonde).doOnNext(envelope -> {
                    this.publisher.publishEvent(new ServiceInstanceLogEvent(this, str, envelope));
                }).subscribe();
                LOG.debug("Creating new registration for {}", str);
                this.registry.put(str, new Registration(subscribe));
            }
        }
    }

    private void stopPublishing(String str) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received event to stop listening to logs for {}", str);
        }
        synchronized (this.registry) {
            Registration registration = this.registry.get(str);
            if (registration == null) {
                if (LOG.isWarnEnabled()) {
                    LOG.warn("Received deregister event for service instance {} but there no event handler registered", str);
                }
            } else if (registration.decrement() == 0) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Disposing of registration since there are no more subscriptions");
                }
                registration.getSubscription().dispose();
                this.registry.remove(str);
            }
        }
    }
}
