package org.springframework.cloud.netflix.turbine.stream;

import com.netflix.turbine.aggregator.InstanceKey;
import com.netflix.turbine.aggregator.StreamAggregator;
import com.netflix.turbine.internal.JsonUtility;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import rx.RxReactiveStreams;
import rx.subjects.PublishSubject;

@RestController
/* loaded from: input_file:org/springframework/cloud/netflix/turbine/stream/TurbineController.class */
public class TurbineController {
    private static final Log log = LogFactory.getLog(TurbineController.class);
    private final Flux<String> flux;

    public TurbineController(PublishSubject<Map<String, Object>> publishSubject) {
        this.flux = Flux.merge(new Publisher[]{RxReactiveStreams.toPublisher(StreamAggregator.aggregateGroupedStreams(publishSubject.groupBy(map -> {
            return InstanceKey.create((String) map.get("instanceId"));
        })).doOnUnsubscribe(() -> {
            log.info("Unsubscribing aggregation.");
        }).doOnSubscribe(() -> {
            log.info("Starting aggregation");
        }).flatMap(groupedObservable -> {
            return groupedObservable;
        })), Flux.interval(Duration.ofSeconds(5L), Duration.ofSeconds(10L)).map(l -> {
            return Collections.singletonMap("type", "ping");
        }).share()}).share().map(map2 -> {
            return JsonUtility.mapToJson(map2);
        });
    }

    @GetMapping(produces = {"text/event-stream"})
    public Flux<String> stream() {
        return this.flux;
    }
}
