package org.springframework.cloud.stream.binder.servlet;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.reactivestreams.Processor;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.UnicastProcessor;

@RequestMapping({"/${spring.cloud.stream.binder.servlet.prefix:stream}"})
@RestController
/* loaded from: input_file:org/springframework/cloud/stream/binder/servlet/MessageController.class */
public class MessageController implements RouteRegistrar {
    public static final String ROUTE_KEY = "stream_routekey";
    private final EnabledBindings bindings;
    private String prefix;
    private long receiveTimeoutMillis;
    private final ConcurrentMap<String, Bridge<Message<?>>> queues = new ConcurrentHashMap();
    private final ConcurrentMap<String, Set<SseEmitter>> emitters = new ConcurrentHashMap();
    private final Map<String, MessageChannel> inputs = new HashMap();
    private final Map<String, String> outputs = new HashMap();
    private final MessagingTemplate template = new MessagingTemplate();
    public long timeoutSeconds = 10;
    private Set<String> routes = new LinkedHashSet();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/servlet/MessageController$Bridge.class */
    public class Bridge<T> {
        private Processor<T, T> emitter;
        private Flux<T> sink;

        public Bridge() {
            reset();
        }

        public void reset() {
            this.emitter = UnicastProcessor.create().serialize();
            this.sink = Flux.from(this.emitter).replay().autoConnect().take(Duration.ofSeconds(MessageController.this.timeoutSeconds));
        }

        public void send(T t) {
            this.emitter.onNext(t);
        }

        public Flux<T> receive() {
            return this.sink;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/servlet/MessageController$Route.class */
    public class Route {
        private String key;
        private String channel;
        private String path;

        private Route(String str, String str2, String str3) {
            String str4;
            String str5 = null;
            String substring = str2.length() > str.length() ? str2.substring(str.length()) : "";
            String[] split = substring.split("/");
            if (split.length > 1) {
                str4 = split[0];
                str5 = substring.substring(str4.length() + 1, substring.length());
            } else {
                str4 = substring;
            }
            if ((!MessageController.this.bindings.getInputs().contains(str4)) & (!MessageController.this.bindings.getOutputs().contains(str4))) {
                str4 = str3;
                str5 = substring.length() > 0 ? substring : null;
            }
            this.channel = str4;
            this.key = str5;
            this.path = this.key != null ? this.key + "/" + str4 : str4;
        }

        public Route(String str, String str2) {
            this.key = str;
            this.channel = str2;
            this.path = str != null ? str + "/" + str2 : str2;
        }

        public String getPath() {
            return this.path;
        }

        public String getKey() {
            return this.key;
        }

        public String getChannel() {
            return this.channel;
        }
    }

    public MessageController(String str, EnabledBindings enabledBindings) {
        str = str.startsWith("/") ? str : "/" + str;
        this.prefix = str.endsWith("/") ? str : str + "/";
        this.bindings = enabledBindings;
        this.template.setReceiveTimeout(this.receiveTimeoutMillis);
    }

    public void setReceiveTimeoutSeconds(long j) {
        this.receiveTimeoutMillis = j;
        this.template.setReceiveTimeout(j);
    }

    public void setBufferTimeoutSeconds(long j) {
        this.timeoutSeconds = j;
    }

    @GetMapping(path = {"/**"}, produces = {"text/event-stream"})
    public ResponseEntity<SseEmitter> sse(@RequestAttribute("org.springframework.web.servlet.HandlerMapping.pathWithinHandlerMapping") String str, @RequestHeader HttpHeaders httpHeaders) throws IOException {
        Route output = output(str);
        if (!this.bindings.getOutputs().contains(output.getChannel())) {
            return ResponseEntity.notFound().build();
        }
        Message<Collection<Object>> poll = poll(output.getChannel(), output.getKey(), true);
        return ResponseEntity.ok().headers(HeaderUtils.fromMessage(poll.getHeaders(), httpHeaders)).body(emit(output, poll));
    }

    @GetMapping({"/**"})
    public ResponseEntity<Object> supplier(@RequestAttribute("org.springframework.web.servlet.HandlerMapping.pathWithinHandlerMapping") String str, @RequestHeader HttpHeaders httpHeaders, @RequestParam(required = false) boolean z) {
        Route output = output(str);
        String channel = output.getChannel();
        if (this.bindings.getOutputs().contains(channel)) {
            Message<Collection<Object>> poll = poll(channel, output.getKey(), !z);
            if (this.routes.contains(output.getKey()) || !((Collection) poll.getPayload()).isEmpty() || output.getKey() == null) {
                return convert(poll, httpHeaders);
            }
        }
        Route input = input(str);
        if (!this.bindings.getInputs().contains(input.getChannel())) {
            return ResponseEntity.notFound().build();
        }
        String key = input.getKey();
        String substring = key.contains("/") ? key.substring(key.lastIndexOf("/") + 1) : key;
        return string(str.replaceAll("/" + substring, ""), substring, httpHeaders);
    }

    @PostMapping(path = {"/**"}, consumes = {"text/plain"})
    public ResponseEntity<Object> string(@RequestAttribute("org.springframework.web.servlet.HandlerMapping.pathWithinHandlerMapping") String str, @RequestBody String str2, @RequestHeader HttpHeaders httpHeaders) {
        return function(str, str2, httpHeaders);
    }

    @PostMapping(path = {"/**"}, consumes = {"application/json"})
    public ResponseEntity<Object> json(@RequestAttribute("org.springframework.web.servlet.HandlerMapping.pathWithinHandlerMapping") String str, @RequestBody String str2, @RequestHeader HttpHeaders httpHeaders) {
        return function(str, extract(str2), httpHeaders);
    }

    private Object extract(String str) {
        String trim = str.trim();
        Object obj = trim;
        if (trim.startsWith("[")) {
            obj = JsonUtils.split(trim);
        }
        return obj;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v65, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v90, types: [java.util.Collection] */
    @PostMapping({"/**"})
    public ResponseEntity<Object> function(@RequestAttribute("org.springframework.web.servlet.HandlerMapping.pathWithinHandlerMapping") String str, @RequestBody Object obj, @RequestHeader HttpHeaders httpHeaders) {
        List asList;
        Route input = input(str);
        String channel = input.getChannel();
        if (!this.inputs.containsKey(channel)) {
            return ResponseEntity.notFound().build();
        }
        boolean z = false;
        if (obj instanceof String) {
            obj = extract((String) obj);
        }
        if (obj instanceof Collection) {
            asList = (Collection) obj;
        } else if (ObjectUtils.isArray(obj)) {
            asList = Arrays.asList(ObjectUtils.toObjectArray(obj));
        } else {
            z = true;
            asList = Arrays.asList(obj);
        }
        HashMap hashMap = new HashMap((Map) HeaderUtils.fromHttp(httpHeaders));
        if (input.getKey() != null) {
            hashMap.put(ROUTE_KEY, input.getKey());
        }
        MessageChannel messageChannel = this.inputs.get(channel);
        HashMap hashMap2 = null;
        ArrayList arrayList = new ArrayList();
        HttpStatus httpStatus = HttpStatus.ACCEPTED;
        if (this.outputs.containsKey(channel)) {
            Iterator it = asList.iterator();
            while (it.hasNext()) {
                Message sendAndReceive = this.template.sendAndReceive(messageChannel, MessageBuilder.withPayload(it.next()).copyHeadersIfAbsent(hashMap).setHeader("replyChannel", this.outputs.get(channel)).build());
                if (sendAndReceive != null) {
                    if (hashMap2 == null) {
                        hashMap2 = new LinkedHashMap((Map) sendAndReceive.getHeaders());
                    }
                    arrayList.add(sendAndReceive.getPayload());
                }
            }
            httpStatus = HttpStatus.OK;
            if (arrayList.isEmpty()) {
                httpStatus = HttpStatus.ACCEPTED;
                arrayList.addAll(asList);
            }
        } else {
            Iterator it2 = asList.iterator();
            while (it2.hasNext()) {
                this.template.send(messageChannel, MessageBuilder.withPayload(it2.next()).copyHeadersIfAbsent(hashMap).build());
            }
            hashMap2 = hashMap;
            arrayList.addAll(asList);
        }
        if (hashMap2 == null) {
            hashMap2 = new LinkedHashMap();
        }
        hashMap2.put(ROUTE_KEY, input.getKey());
        String str2 = (z && arrayList.size() == 1) ? arrayList.get(0) : arrayList;
        String str3 = str2;
        if (httpHeaders.getContentType() != null) {
            str3 = str2;
            if (httpHeaders.getContentType().includes(MediaType.APPLICATION_JSON)) {
                boolean contains = str2.toString().contains("\"");
                str3 = str2;
                if (contains) {
                    str3 = str2.toString();
                }
            }
        }
        return convert(httpStatus, MessageBuilder.withPayload(str3).copyHeadersIfAbsent(hashMap2).build(), httpHeaders);
    }

    private ResponseEntity<Object> convert(Message<?> message, HttpHeaders httpHeaders) {
        return convert(HttpStatus.OK, message, httpHeaders);
    }

    private ResponseEntity<Object> convert(HttpStatus httpStatus, Message<?> message, HttpHeaders httpHeaders) {
        return ResponseEntity.status(httpStatus).headers(HeaderUtils.fromMessage(message.getHeaders(), httpHeaders)).body(message.getPayload());
    }

    private SseEmitter emit(Route route, Message<Collection<Object>> message) throws IOException {
        SseEmitter sseEmitter = new SseEmitter(Long.MAX_VALUE);
        String path = route.getPath();
        if (!this.emitters.containsKey(path)) {
            this.emitters.putIfAbsent(path, new HashSet());
        }
        this.emitters.get(path).add(sseEmitter);
        sseEmitter.onCompletion(() -> {
            this.emitters.get(path).remove(sseEmitter);
        });
        sseEmitter.onTimeout(() -> {
            this.emitters.get(path).remove(sseEmitter);
        });
        Iterator it = ((Collection) message.getPayload()).iterator();
        while (it.hasNext()) {
            sseEmitter.send(it.next());
        }
        return sseEmitter;
    }

    public void reset() {
        this.queues.clear();
    }

    private Message<Collection<Object>> poll(String str, String str2, boolean z) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        Bridge<Message<?>> bridge = this.queues.get(new Route(str2, str).getPath());
        if (bridge != null) {
            bridge.receive().subscribe(message -> {
                arrayList2.add(message);
                arrayList.add(message.getPayload());
            });
            if (!z) {
                bridge.reset();
            }
        }
        MessageBuilder withPayload = MessageBuilder.withPayload(arrayList);
        if (!arrayList2.isEmpty()) {
            withPayload.copyHeadersIfAbsent(((Message) arrayList2.get(0)).getHeaders());
        }
        return withPayload.build();
    }

    public void subscribe(String str, SubscribableChannel subscribableChannel) {
        this.outputs.put(this.bindings.getInput(str), str);
        subscribableChannel.subscribe(message -> {
            append(str, message);
        });
    }

    private void append(String str, Message<?> message) {
        String str2 = (String) message.getHeaders().get(ROUTE_KEY);
        if (message.getHeaders().getReplyChannel() instanceof MessageChannel) {
            ((MessageChannel) message.getHeaders().getReplyChannel()).send(message);
            return;
        }
        String path = new Route(str2, str).getPath();
        if (!this.queues.containsKey(path)) {
            this.queues.putIfAbsent(path, new Bridge<>());
        }
        this.queues.get(path).send(message);
        if (this.emitters.containsKey(path)) {
            for (SseEmitter sseEmitter : new HashSet(this.emitters.get(path))) {
                try {
                    sseEmitter.send(message.getPayload());
                } catch (IOException e) {
                    this.emitters.get(path).remove(sseEmitter);
                }
            }
        }
    }

    public void bind(String str, String str2, MessageChannel messageChannel) {
        this.inputs.put(str, messageChannel);
    }

    public Route output(String str) {
        return new Route(this.prefix, str, this.bindings.getOutputs().size() == 1 ? this.bindings.getOutputs().iterator().next() : "output");
    }

    public Route input(String str) {
        return new Route(this.prefix, str, this.bindings.getInputs().size() == 1 ? this.bindings.getInputs().iterator().next() : "input");
    }

    @Override // org.springframework.cloud.stream.binder.servlet.RouteRegistrar
    public void registerRoutes(Set<String> set) {
        this.routes.addAll(set);
    }

    @Override // org.springframework.cloud.stream.binder.servlet.RouteRegistrar
    public void unregisterRoutes(Set<String> set) {
        this.routes.removeAll(set);
        Iterator<String> it = set.iterator();
        while (it.hasNext()) {
            this.queues.remove(output(this.prefix + it.next()).getPath());
        }
    }
}
