/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.sleuth.instrument.messaging;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.IterableAssert;
import org.assertj.core.api.OptionalAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.autoconfig.instrument.messaging.SleuthMessagingProperties;
import org.springframework.cloud.sleuth.exporter.FinishedSpan;
import org.springframework.cloud.sleuth.instrument.messaging.B3Context;
import org.springframework.cloud.sleuth.instrument.messaging.DefaultMessageSpanCustomizer;
import org.springframework.cloud.sleuth.instrument.messaging.MessageHeaderPropagatorGetter;
import org.springframework.cloud.sleuth.instrument.messaging.MessageHeaderPropagatorSetter;
import org.springframework.cloud.sleuth.instrument.messaging.MessageSpanCustomizer;
import org.springframework.cloud.sleuth.instrument.messaging.MyMessageSpanCustomizer;
import org.springframework.cloud.sleuth.instrument.messaging.TracingChannelInterceptor;
import org.springframework.cloud.sleuth.propagation.Propagator;
import org.springframework.cloud.sleuth.test.TestSpanHandler;
import org.springframework.cloud.sleuth.test.TestTracingAwareSupplier;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.ChannelInterceptorAdapter;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.support.ExecutorChannelInterceptor;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.StringUtils;

public abstract class TracingChannelInterceptorTest
implements TestTracingAwareSupplier {
    protected ChannelInterceptor interceptor = new TracingChannelInterceptor(this.tracerTest().tracing().tracer(), this.tracerTest().tracing().propagator(), (Propagator.Setter)new MessageHeaderPropagatorSetter(), (Propagator.Getter)new MessageHeaderPropagatorGetter(), TracingChannelInterceptorTest.remoteServiceNameMapper(new SleuthMessagingProperties()), (MessageSpanCustomizer)new DefaultMessageSpanCustomizer());
    protected TestSpanHandler spans = this.tracerTest().handler();
    protected QueueChannel channel = new QueueChannel();
    protected DirectChannel directChannel = new DirectChannel();
    protected Message message;
    protected MessageHandler handler = new MessageHandler(){

        public void handleMessage(Message<?> msg) throws MessagingException {
            TracingChannelInterceptorTest.this.message = msg;
        }
    };

    static Function<String, String> remoteServiceNameMapper(SleuthMessagingProperties properties) {
        return s -> {
            if (!StringUtils.hasText((String)s)) {
                return null;
            }
            if (s.startsWith("amqp") || s.startsWith("rabbit")) {
                return properties.getMessaging().getRabbit().getRemoteServiceName();
            }
            if (s.startsWith("kafka")) {
                return properties.getMessaging().getKafka().getRemoteServiceName();
            }
            return null;
        };
    }

    @AfterEach
    public void close() {
        this.tracerTest().close();
    }

    @Test
    public void pollingReceive_emptyQueue() {
        this.channel.addInterceptor(this.consumerSideOnly(this.interceptor));
        Assertions.assertThat((Object)this.channel.receive(0L)).isNull();
        Assertions.assertThat((Iterable)this.spans).hasSize(0);
    }

    @Test
    public void injectsProducerSpan() {
        this.channel.addInterceptor(this.producerSideOnly(this.interceptor));
        this.channel.send(MessageBuilder.withPayload((Object)"foo").build());
        Assertions.assertThat((Map)this.channel.receive().getHeaders()).containsKey((Object)"b3");
        ((IterableAssert)Assertions.assertThat((Iterable)this.spans).hasSize(1)).extracting(FinishedSpan::getKind).containsExactly((Object[])new Span.Kind[]{Span.Kind.PRODUCER});
    }

    @Test
    public void injectsProducerAndConsumerSpan() {
        this.directChannel.addInterceptor(this.interceptor);
        this.directChannel.subscribe(this.handler);
        this.directChannel.send(MessageBuilder.withPayload((Object)"foo").build());
        Assertions.assertThat((Object)this.message).isNotNull();
        Assertions.assertThat((Map)this.message.getHeaders()).containsKeys((Object[])new String[]{"b3", "nativeHeaders"});
        Assertions.assertThat((Iterable)this.spans).extracting(FinishedSpan::getKind).contains((Object[])new Span.Kind[]{Span.Kind.CONSUMER, Span.Kind.PRODUCER});
    }

    @Test
    public void allowsSpanCustomization() {
        this.interceptor = new TracingChannelInterceptor(this.tracerTest().tracing().tracer(), this.tracerTest().tracing().propagator(), (Propagator.Setter)new MessageHeaderPropagatorSetter(), (Propagator.Getter)new MessageHeaderPropagatorGetter(), TracingChannelInterceptorTest.remoteServiceNameMapper(new SleuthMessagingProperties()), (MessageSpanCustomizer)new MyMessageSpanCustomizer());
        this.directChannel.addInterceptor(this.interceptor);
        this.directChannel.subscribe(this.handler);
        this.directChannel.send(MessageBuilder.withPayload((Object)"foo").build());
        ((OptionalAssert)Assertions.assertThat(this.spans.reportedSpans().stream().filter(s -> "changedHandle".equals(s.getName())).findFirst().map(s -> (String)s.getTags().get("handleKey"))).isPresent()).get().isEqualTo((Object)"handleValue");
        ((OptionalAssert)Assertions.assertThat(this.spans.reportedSpans().stream().filter(s -> "changedSend".equals(s.getName())).findFirst().map(s -> (String)s.getTags().get("sendKey"))).isPresent()).get().isEqualTo((Object)"sendValue");
    }

    @Test
    public void injectsProducerSpan_nativeHeaders() {
        this.channel.addInterceptor(this.producerSideOnly(this.interceptor));
        this.channel.send(MessageBuilder.withPayload((Object)"foo").build());
        Assertions.assertThat((Map)((Map)this.channel.receive().getHeaders().get((Object)"nativeHeaders"))).containsOnlyKeys(new Object[]{"b3"});
    }

    @Test
    public void pollingReceive_injectsConsumerSpan() {
        this.channel.addInterceptor(this.consumerSideOnly(this.interceptor));
        this.channel.send(MessageBuilder.withPayload((Object)"foo").build());
        Assertions.assertThat((Map)this.channel.receive().getHeaders()).containsKeys((Object[])new String[]{"b3", "nativeHeaders"});
        ((IterableAssert)Assertions.assertThat((Iterable)this.spans).hasSize(1)).extracting(FinishedSpan::getKind).containsExactly((Object[])new Span.Kind[]{Span.Kind.CONSUMER});
    }

    @Test
    public void pollingReceive_injectsConsumerSpan_nativeHeaders() {
        this.channel.addInterceptor(this.consumerSideOnly(this.interceptor));
        this.channel.send(MessageBuilder.withPayload((Object)"foo").build());
        Assertions.assertThat((Map)((Map)this.channel.receive().getHeaders().get((Object)"nativeHeaders"))).containsOnlyKeys(new Object[]{"b3"});
    }

    @Test
    public void subscriber_startsAndStopsConsumerAndProcessingSpan() {
        ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel();
        channel.addInterceptor((ChannelInterceptor)this.executorSideOnly(this.interceptor));
        ArrayList messages = new ArrayList();
        channel.subscribe(messages::add);
        channel.send(MessageBuilder.withPayload((Object)"foo").build());
        Assertions.assertThat((Map)((Message)messages.get(0)).getHeaders()).doesNotContainKeys((Object[])new String[]{"b3", "nativeHeaders"});
        Assertions.assertThat((Iterable)this.spans).extracting(FinishedSpan::getKind).containsExactly((Object[])new Span.Kind[]{Span.Kind.CONSUMER, null});
    }

    @Test
    public void subscriber_removesTraceIdsFromMessage() {
        ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel();
        channel.addInterceptor(this.interceptor);
        ArrayList messages = new ArrayList();
        channel.subscribe(messages::add);
        channel.send(MessageBuilder.withPayload((Object)"foo").build());
        Assertions.assertThat((Map)((Message)messages.get(0)).getHeaders()).doesNotContainKeys((Object[])new String[]{"b3"});
    }

    @Test
    public void subscriber_removesTraceIdsFromMessage_nativeHeaders() {
        ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel();
        channel.addInterceptor(this.interceptor);
        ArrayList messages = new ArrayList();
        channel.subscribe(messages::add);
        channel.send(MessageBuilder.withPayload((Object)"foo").build());
        Assertions.assertThat((Map)((Map)((Message)messages.get(0)).getHeaders().get((Object)"nativeHeaders"))).doesNotContainKeys(new Object[]{"b3"});
    }

    @Test
    public void integrated_sendAndPoll() {
        this.channel.addInterceptor(this.interceptor);
        this.channel.send(MessageBuilder.withPayload((Object)"foo").build());
        this.channel.receive();
        Assertions.assertThat((Iterable)this.spans).extracting(FinishedSpan::getKind).containsExactlyInAnyOrder((Object[])new Span.Kind[]{Span.Kind.CONSUMER, Span.Kind.PRODUCER});
    }

    @Test
    public void integrated_sendAndSubscriber() {
        ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel();
        channel.addInterceptor(this.interceptor);
        ArrayList messages = new ArrayList();
        channel.subscribe(messages::add);
        channel.send(MessageBuilder.withPayload((Object)"foo").build());
        Assertions.assertThat((Iterable)this.spans).extracting(FinishedSpan::getKind).containsExactly((Object[])new Span.Kind[]{Span.Kind.CONSUMER, null, Span.Kind.PRODUCER});
    }

    @Test
    public void errorMessageHeadersRetained() {
        this.channel.addInterceptor(this.interceptor);
        QueueChannel deadReplyChannel = new QueueChannel();
        QueueChannel errorsReplyChannel = new QueueChannel();
        HashMap<String, QueueChannel> errorChannelHeaders = new HashMap<String, QueueChannel>();
        errorChannelHeaders.put("replyChannel", errorsReplyChannel);
        errorChannelHeaders.put("errorChannel", errorsReplyChannel);
        this.channel.send((Message)new ErrorMessage((Throwable)new MessagingException(MessageBuilder.withPayload((Object)"hi").setHeader("b3", (Object)"000000000000000a-000000000000000a").setReplyChannel((MessageChannel)deadReplyChannel).setErrorChannel((MessageChannel)deadReplyChannel).build()), errorChannelHeaders));
        this.message = this.channel.receive();
        Assertions.assertThat((Object)this.message).isNotNull();
        String b3 = (String)this.message.getHeaders().get((Object)"b3", String.class);
        B3Context b3Context = new B3Context(b3);
        Assertions.assertThat((String)b3Context.traceId).endsWith((CharSequence)"000000000000000a");
        Assertions.assertThat((String)b3Context.spanId).doesNotEndWith((CharSequence)"000000000000000a");
        Assertions.assertThat((Iterable)this.spans).hasSize(2);
        Assertions.assertThat((Object)this.message.getHeaders().getReplyChannel()).isSameAs((Object)errorsReplyChannel);
        Assertions.assertThat((Object)this.message.getHeaders().getErrorChannel()).isSameAs((Object)errorsReplyChannel);
    }

    @Test
    public void errorMessageOriginalMessageRetained() {
        this.channel.addInterceptor(this.interceptor);
        Message originalMessage = MessageBuilder.withPayload((Object)"Hello").setHeader("header", (Object)"value").build();
        Message failedMessage = MessageBuilder.fromMessage((Message)originalMessage).removeHeader("header").build();
        this.channel.send((Message)new ErrorMessage((Throwable)new MessagingException(failedMessage), originalMessage.getHeaders(), originalMessage));
        this.message = this.channel.receive();
        Assertions.assertThat((Object)this.message).isNotNull();
        Assertions.assertThat((Object)this.message).isInstanceOfSatisfying(ErrorMessage.class, errorMessage -> {
            Assertions.assertThat((Object)errorMessage.getOriginalMessage()).isSameAs((Object)originalMessage);
            Assertions.assertThat((Object)errorMessage.getHeaders().get((Object)"header")).isEqualTo((Object)"value");
        });
    }

    @Test
    public void errorMessageHeadersWithNullPayloadRetained() {
        this.channel.addInterceptor(this.interceptor);
        HashMap<String, String> errorChannelHeaders = new HashMap<String, String>();
        errorChannelHeaders.put("b3", "000000000000000a-000000000000000a");
        this.channel.send((Message)new ErrorMessage((Throwable)new MessagingException("exception"), errorChannelHeaders));
        this.message = this.channel.receive();
        String b3 = (String)this.message.getHeaders().get((Object)"b3", String.class);
        B3Context b3Context = new B3Context(b3);
        Assertions.assertThat((String)b3Context.traceId).endsWith((CharSequence)"000000000000000a");
        Assertions.assertThat((String)b3Context.spanId).doesNotEndWith((CharSequence)"000000000000000a");
        Assertions.assertThat((Iterable)this.spans).hasSize(2);
    }

    @Test
    public void should_store_kafka_as_remote_service_name_when_kafka_header_is_present() {
        ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel();
        channel.addInterceptor(this.interceptor);
        ArrayList messages = new ArrayList();
        channel.subscribe(messages::add);
        HashMap<String, String> headers = new HashMap<String, String>();
        headers.put("kafka_messageKey", "hello");
        channel.send(MessageBuilder.createMessage((Object)"foo", (MessageHeaders)new MessageHeaders(headers)));
        Assertions.assertThat((Iterable)this.spans).extracting(FinishedSpan::getRemoteServiceName).contains((Object[])new String[]{"kafka"});
    }

    @Test
    public void should_store_rabbitmq_as_remote_service_name_when_rabbit_header_is_present() {
        ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel();
        channel.addInterceptor(this.interceptor);
        ArrayList messages = new ArrayList();
        channel.subscribe(messages::add);
        HashMap<String, String> headers = new HashMap<String, String>();
        headers.put("amqp_receivedRoutingKey", "hello");
        channel.send(MessageBuilder.createMessage((Object)"foo", (MessageHeaders)new MessageHeaders(headers)));
        Assertions.assertThat((Iterable)this.spans).extracting(FinishedSpan::getRemoteServiceName).contains((Object[])new String[]{"rabbitmq"});
    }

    @Test
    public void should_store_broker_as_remote_service_name_when_no_special_headers_were_found() {
        ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel();
        channel.addInterceptor(this.interceptor);
        ArrayList messages = new ArrayList();
        channel.subscribe(messages::add);
        HashMap headers = new HashMap();
        channel.send(MessageBuilder.createMessage((Object)"foo", (MessageHeaders)new MessageHeaders(headers)));
        Assertions.assertThat((Iterable)this.spans).extracting(FinishedSpan::getRemoteServiceName).containsOnly((Object[])new String[]{"broker", null});
    }

    public ChannelInterceptor producerSideOnly(final ChannelInterceptor delegate) {
        return new ChannelInterceptorAdapter(){

            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                return delegate.preSend(message, channel);
            }

            public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
                delegate.afterSendCompletion(message, channel, sent, ex);
            }
        };
    }

    ChannelInterceptor consumerSideOnly(final ChannelInterceptor delegate) {
        return new ChannelInterceptorAdapter(){

            public Message<?> postReceive(Message<?> message, MessageChannel channel) {
                return delegate.postReceive(message, channel);
            }

            public void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) {
                delegate.afterReceiveCompletion(message, channel, ex);
            }
        };
    }

    ExecutorChannelInterceptor executorSideOnly(final ChannelInterceptor delegate) {
        class ExecutorSideOnly
        extends ChannelInterceptorAdapter
        implements ExecutorChannelInterceptor {
            ExecutorSideOnly() {
            }

            public Message<?> beforeHandle(Message<?> message, MessageChannel channel, MessageHandler handler) {
                return ((ExecutorChannelInterceptor)delegate).beforeHandle(message, channel, handler);
            }

            public void afterMessageHandled(Message<?> message, MessageChannel channel, MessageHandler handler, Exception ex) {
                ((ExecutorChannelInterceptor)delegate).afterMessageHandled(message, channel, handler, ex);
            }
        }
        return new ExecutorSideOnly();
    }
}

