package io.vertx.core.eventbus.impl;

import com.sun.mail.imap.IMAPStore;
import io.vertx.core.AsyncResult;
import io.vertx.core.Closeable;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.MessageProducer;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.eventbus.SendContext;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.spi.metrics.EventBusMetrics;
import io.vertx.core.spi.metrics.MetricsProvider;
import io.vertx.core.spi.metrics.VertxMetrics;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;

/* loaded from: input_file:BOOT-INF/lib/vertx-core-3.5.0.jar:io/vertx/core/eventbus/impl/EventBusImpl.class */
public class EventBusImpl implements EventBus, MetricsProvider {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) EventBusImpl.class);
    protected final VertxInternal vertx;
    protected final EventBusMetrics metrics;
    protected volatile boolean started;
    private final List<Handler<SendContext>> interceptors = new CopyOnWriteArrayList();
    private final AtomicLong replySequence = new AtomicLong(0);
    protected final ConcurrentMap<String, Handlers> handlerMap = new ConcurrentHashMap();
    protected final CodecManager codecManager = new CodecManager();

    /* loaded from: input_file:BOOT-INF/lib/vertx-core-3.5.0.jar:io/vertx/core/eventbus/impl/EventBusImpl$HandlerEntry.class */
    public class HandlerEntry<T> implements Closeable {
        final String address;
        final HandlerRegistration<T> handler;

        public HandlerEntry(String str, HandlerRegistration<T> handlerRegistration) {
            this.address = str;
            this.handler = handlerRegistration;
        }

        public boolean equals(Object obj) {
            if (obj == null) {
                return false;
            }
            if (this == obj) {
                return true;
            }
            if (getClass() != obj.getClass()) {
                return false;
            }
            HandlerEntry handlerEntry = (HandlerEntry) obj;
            return this.address.equals(handlerEntry.address) && this.handler.equals(handlerEntry.handler);
        }

        public int hashCode() {
            return (31 * (this.address != null ? this.address.hashCode() : 0)) + (this.handler != null ? this.handler.hashCode() : 0);
        }

        @Override // io.vertx.core.Closeable
        public void close(Handler<AsyncResult<Void>> handler) {
            this.handler.unregister(handler);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:BOOT-INF/lib/vertx-core-3.5.0.jar:io/vertx/core/eventbus/impl/EventBusImpl$ReplySendContextImpl.class */
    public class ReplySendContextImpl<T> extends SendContextImpl<T> {
        private final MessageImpl replierMessage;

        public ReplySendContextImpl(MessageImpl messageImpl, DeliveryOptions deliveryOptions, HandlerRegistration<T> handlerRegistration, MessageImpl messageImpl2) {
            super(messageImpl, deliveryOptions, handlerRegistration);
            this.replierMessage = messageImpl2;
        }

        @Override // io.vertx.core.eventbus.impl.EventBusImpl.SendContextImpl, io.vertx.core.eventbus.SendContext
        public void next() {
            if (this.iter.hasNext()) {
                this.iter.next().handle(this);
            } else {
                EventBusImpl.this.sendReply(this, this.replierMessage);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:BOOT-INF/lib/vertx-core-3.5.0.jar:io/vertx/core/eventbus/impl/EventBusImpl$SendContextImpl.class */
    public class SendContextImpl<T> implements SendContext<T> {
        public final MessageImpl message;
        public final DeliveryOptions options;
        public final HandlerRegistration<T> handlerRegistration;
        public final Iterator<Handler<SendContext>> iter;

        public SendContextImpl(MessageImpl messageImpl, DeliveryOptions deliveryOptions, HandlerRegistration<T> handlerRegistration) {
            this.message = messageImpl;
            this.options = deliveryOptions;
            this.handlerRegistration = handlerRegistration;
            this.iter = EventBusImpl.this.interceptors.iterator();
        }

        @Override // io.vertx.core.eventbus.SendContext
        public Message<T> message() {
            return this.message;
        }

        @Override // io.vertx.core.eventbus.SendContext
        public void next() {
            if (!this.iter.hasNext()) {
                EventBusImpl.this.sendOrPub(this);
                return;
            }
            try {
                this.iter.next().handle(this);
            } catch (Throwable th) {
                EventBusImpl.log.error("Failure in interceptor", th);
            }
        }

        @Override // io.vertx.core.eventbus.SendContext
        public boolean send() {
            return this.message.isSend();
        }

        @Override // io.vertx.core.eventbus.SendContext
        public Object sentBody() {
            return this.message.sentBody;
        }
    }

    public EventBusImpl(VertxInternal vertxInternal) {
        VertxMetrics metricsSPI = vertxInternal.metricsSPI();
        this.vertx = vertxInternal;
        this.metrics = metricsSPI != null ? metricsSPI.createMetrics(this) : null;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public EventBus addInterceptor(Handler<SendContext> handler) {
        this.interceptors.add(handler);
        return this;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public EventBus removeInterceptor(Handler<SendContext> handler) {
        this.interceptors.remove(handler);
        return this;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public synchronized void start(Handler<AsyncResult<Void>> handler) {
        if (this.started) {
            throw new IllegalStateException("Already started");
        }
        this.started = true;
        handler.handle(Future.succeededFuture());
    }

    @Override // io.vertx.core.eventbus.EventBus
    public EventBus send(String str, Object obj) {
        return send(str, obj, new DeliveryOptions(), null);
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> EventBus send(String str, Object obj, Handler<AsyncResult<Message<T>>> handler) {
        return send(str, obj, new DeliveryOptions(), handler);
    }

    @Override // io.vertx.core.eventbus.EventBus
    public EventBus send(String str, Object obj, DeliveryOptions deliveryOptions) {
        return send(str, obj, deliveryOptions, null);
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> EventBus send(String str, Object obj, DeliveryOptions deliveryOptions, Handler<AsyncResult<Message<T>>> handler) {
        sendOrPubInternal(createMessage(true, str, deliveryOptions.getHeaders(), obj, deliveryOptions.getCodecName()), deliveryOptions, handler);
        return this;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> MessageProducer<T> sender(String str) {
        Objects.requireNonNull(str, IMAPStore.ID_ADDRESS);
        return new MessageProducerImpl(this.vertx, str, true, new DeliveryOptions());
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> MessageProducer<T> sender(String str, DeliveryOptions deliveryOptions) {
        Objects.requireNonNull(str, IMAPStore.ID_ADDRESS);
        Objects.requireNonNull(deliveryOptions, "options");
        return new MessageProducerImpl(this.vertx, str, true, deliveryOptions);
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> MessageProducer<T> publisher(String str) {
        Objects.requireNonNull(str, IMAPStore.ID_ADDRESS);
        return new MessageProducerImpl(this.vertx, str, false, new DeliveryOptions());
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> MessageProducer<T> publisher(String str, DeliveryOptions deliveryOptions) {
        Objects.requireNonNull(str, IMAPStore.ID_ADDRESS);
        Objects.requireNonNull(deliveryOptions, "options");
        return new MessageProducerImpl(this.vertx, str, false, deliveryOptions);
    }

    @Override // io.vertx.core.eventbus.EventBus
    public EventBus publish(String str, Object obj) {
        return publish(str, obj, new DeliveryOptions());
    }

    @Override // io.vertx.core.eventbus.EventBus
    public EventBus publish(String str, Object obj, DeliveryOptions deliveryOptions) {
        sendOrPubInternal(createMessage(false, str, deliveryOptions.getHeaders(), obj, deliveryOptions.getCodecName()), deliveryOptions, null);
        return this;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> MessageConsumer<T> consumer(String str) {
        checkStarted();
        Objects.requireNonNull(str, IMAPStore.ID_ADDRESS);
        return new HandlerRegistration(this.vertx, this.metrics, this, str, null, false, null, -1L);
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> MessageConsumer<T> consumer(String str, Handler<Message<T>> handler) {
        Objects.requireNonNull(handler, "handler");
        MessageConsumer<T> consumer = consumer(str);
        consumer.handler2((Handler) handler);
        return consumer;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> MessageConsumer<T> localConsumer(String str) {
        checkStarted();
        Objects.requireNonNull(str, IMAPStore.ID_ADDRESS);
        return new HandlerRegistration(this.vertx, this.metrics, this, str, null, true, null, -1L);
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> MessageConsumer<T> localConsumer(String str, Handler<Message<T>> handler) {
        Objects.requireNonNull(handler, "handler");
        MessageConsumer<T> localConsumer = localConsumer(str);
        localConsumer.handler2((Handler) handler);
        return localConsumer;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public EventBus registerCodec(MessageCodec messageCodec) {
        this.codecManager.registerCodec(messageCodec);
        return this;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public EventBus unregisterCodec(String str) {
        this.codecManager.unregisterCodec(str);
        return this;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public <T> EventBus registerDefaultCodec(Class<T> cls, MessageCodec<T, ?> messageCodec) {
        this.codecManager.registerDefaultCodec(cls, messageCodec);
        return this;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public EventBus unregisterDefaultCodec(Class cls) {
        this.codecManager.unregisterDefaultCodec(cls);
        return this;
    }

    @Override // io.vertx.core.eventbus.EventBus
    public void close(Handler<AsyncResult<Void>> handler) {
        checkStarted();
        unregisterAll();
        if (this.metrics != null) {
            this.metrics.close();
        }
        if (handler != null) {
            this.vertx.runOnContext(r4 -> {
                handler.handle(Future.succeededFuture());
            });
        }
    }

    @Override // io.vertx.core.metrics.Measured
    public boolean isMetricsEnabled() {
        return this.metrics != null;
    }

    @Override // io.vertx.core.spi.metrics.MetricsProvider
    public EventBusMetrics<?> getMetrics() {
        return this.metrics;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageImpl createMessage(boolean z, String str, MultiMap multiMap, Object obj, String str2) {
        Objects.requireNonNull(str, "no null address accepted");
        return new MessageImpl(str, null, multiMap, obj, this.codecManager.lookupCodec(obj, str2), z, this);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> void addRegistration(String str, HandlerRegistration<T> handlerRegistration, boolean z, boolean z2) {
        Objects.requireNonNull(handlerRegistration.getHandler(), "handler");
        boolean addLocalRegistration = addLocalRegistration(str, handlerRegistration, z, z2);
        handlerRegistration.getClass();
        addRegistration(addLocalRegistration, str, z, z2, handlerRegistration::setResult);
    }

    protected <T> void addRegistration(boolean z, String str, boolean z2, boolean z3, Handler<AsyncResult<Void>> handler) {
        handler.handle(Future.succeededFuture());
    }

    protected <T> boolean addLocalRegistration(String str, HandlerRegistration<T> handlerRegistration, boolean z, boolean z2) {
        Objects.requireNonNull(str, IMAPStore.ID_ADDRESS);
        Context currentContext = Vertx.currentContext();
        boolean z3 = currentContext != null;
        if (!z3) {
            currentContext = this.vertx.getOrCreateContext();
        }
        handlerRegistration.setHandlerContext(currentContext);
        boolean z4 = false;
        HandlerHolder handlerHolder = new HandlerHolder(this.metrics, handlerRegistration, z, z2, currentContext);
        Handlers handlers = this.handlerMap.get(str);
        if (handlers == null) {
            handlers = new Handlers();
            Handlers putIfAbsent = this.handlerMap.putIfAbsent(str, handlers);
            if (putIfAbsent != null) {
                handlers = putIfAbsent;
            }
            z4 = true;
        }
        handlers.list.add(handlerHolder);
        if (z3) {
            currentContext.addCloseHook(new HandlerEntry(str, handlerRegistration));
        }
        return z4;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> void removeRegistration(String str, HandlerRegistration<T> handlerRegistration, Handler<AsyncResult<Void>> handler) {
        removeRegistration(removeLocalRegistration(str, handlerRegistration), str, handler);
    }

    protected <T> void removeRegistration(HandlerHolder handlerHolder, String str, Handler<AsyncResult<Void>> handler) {
        callCompletionHandlerAsync(handler);
    }

    protected <T> HandlerHolder removeLocalRegistration(String str, HandlerRegistration<T> handlerRegistration) {
        Handlers handlers = this.handlerMap.get(str);
        HandlerHolder handlerHolder = null;
        if (handlers != null) {
            synchronized (handlers) {
                int size = handlers.list.size();
                int i = 0;
                while (true) {
                    if (i >= size) {
                        break;
                    }
                    HandlerHolder handlerHolder2 = handlers.list.get(i);
                    if (handlerHolder2.getHandler() == handlerRegistration) {
                        handlers.list.remove(i);
                        handlerHolder2.setRemoved();
                        if (handlers.list.isEmpty()) {
                            this.handlerMap.remove(str);
                            handlerHolder = handlerHolder2;
                        }
                        handlerHolder2.getContext().removeCloseHook(new HandlerEntry(str, handlerHolder2.getHandler()));
                    } else {
                        i++;
                    }
                }
            }
        }
        return handlerHolder;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> void sendReply(MessageImpl messageImpl, MessageImpl messageImpl2, DeliveryOptions deliveryOptions, Handler<AsyncResult<Message<T>>> handler) {
        if (messageImpl.address() == null) {
            throw new IllegalStateException("address not specified");
        }
        new ReplySendContextImpl(messageImpl, deliveryOptions, createReplyHandlerRegistration(messageImpl, deliveryOptions, handler), messageImpl2).next();
    }

    protected <T> void sendReply(SendContextImpl<T> sendContextImpl, MessageImpl messageImpl) {
        sendOrPub(sendContextImpl);
    }

    protected <T> void sendOrPub(SendContextImpl<T> sendContextImpl) {
        MessageImpl messageImpl = sendContextImpl.message;
        if (this.metrics != null) {
            this.metrics.messageSent(messageImpl.address(), !messageImpl.isSend(), true, false);
        }
        deliverMessageLocally(sendContextImpl);
    }

    protected <T> Handler<Message<T>> convertHandler(Handler<AsyncResult<Message<T>>> handler) {
        return message -> {
            Future succeededFuture;
            if (message.body() instanceof ReplyException) {
                ReplyException replyException = (ReplyException) message.body();
                if (this.metrics != null) {
                    this.metrics.replyFailure(message.address(), replyException.failureType());
                }
                succeededFuture = Future.failedFuture(replyException);
            } else {
                succeededFuture = Future.succeededFuture(message);
            }
            handler.handle(succeededFuture);
        };
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void callCompletionHandlerAsync(Handler<AsyncResult<Void>> handler) {
        if (handler != null) {
            this.vertx.runOnContext(r4 -> {
                handler.handle(Future.succeededFuture());
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> void deliverMessageLocally(SendContextImpl<T> sendContextImpl) {
        if (deliverMessageLocally(sendContextImpl.message)) {
            return;
        }
        if (this.metrics != null) {
            this.metrics.replyFailure(sendContextImpl.message.address, ReplyFailure.NO_HANDLERS);
        }
        if (sendContextImpl.handlerRegistration != null) {
            sendContextImpl.handlerRegistration.sendAsyncResultFailure(ReplyFailure.NO_HANDLERS, "No handlers for address " + sendContextImpl.message.address);
        }
    }

    protected boolean isMessageLocal(MessageImpl messageImpl) {
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> boolean deliverMessageLocally(MessageImpl messageImpl) {
        messageImpl.setBus(this);
        Handlers handlers = this.handlerMap.get(messageImpl.address());
        if (handlers == null) {
            if (this.metrics == null) {
                return false;
            }
            this.metrics.messageReceived(messageImpl.address(), !messageImpl.isSend(), isMessageLocal(messageImpl), 0);
            return false;
        }
        if (!messageImpl.isSend()) {
            if (this.metrics != null) {
                this.metrics.messageReceived(messageImpl.address(), !messageImpl.isSend(), isMessageLocal(messageImpl), handlers.list.size());
            }
            Iterator<HandlerHolder> it = handlers.list.iterator();
            while (it.hasNext()) {
                deliverToHandler(messageImpl, it.next());
            }
            return true;
        }
        HandlerHolder choose = handlers.choose();
        if (this.metrics != null) {
            this.metrics.messageReceived(messageImpl.address(), !messageImpl.isSend(), isMessageLocal(messageImpl), choose != null ? 1 : 0);
        }
        if (choose == null) {
            return true;
        }
        deliverToHandler(messageImpl, choose);
        return true;
    }

    protected void checkStarted() {
        if (!this.started) {
            throw new IllegalStateException("Event Bus is not started");
        }
    }

    protected String generateReplyAddress() {
        return Long.toString(this.replySequence.incrementAndGet());
    }

    private <T> HandlerRegistration<T> createReplyHandlerRegistration(MessageImpl messageImpl, DeliveryOptions deliveryOptions, Handler<AsyncResult<Message<T>>> handler) {
        if (handler == null) {
            return null;
        }
        long sendTimeout = deliveryOptions.getSendTimeout();
        String generateReplyAddress = generateReplyAddress();
        messageImpl.setReplyAddress(generateReplyAddress);
        Handler<Message<T>> convertHandler = convertHandler(handler);
        HandlerRegistration<T> handlerRegistration = new HandlerRegistration<>(this.vertx, this.metrics, this, generateReplyAddress, messageImpl.address, true, handler, sendTimeout);
        handlerRegistration.handler2((Handler) convertHandler);
        return handlerRegistration;
    }

    private <T> void sendOrPubInternal(MessageImpl messageImpl, DeliveryOptions deliveryOptions, Handler<AsyncResult<Message<T>>> handler) {
        checkStarted();
        new SendContextImpl(messageImpl, deliveryOptions, createReplyHandlerRegistration(messageImpl, deliveryOptions, handler)).next();
    }

    private void unregisterAll() {
        Iterator<Handlers> it = this.handlerMap.values().iterator();
        while (it.hasNext()) {
            Iterator<HandlerHolder> it2 = it.next().list.iterator();
            while (it2.hasNext()) {
                it2.next().getHandler().unregister();
            }
        }
    }

    private <T> void deliverToHandler(MessageImpl messageImpl, HandlerHolder<T> handlerHolder) {
        MessageImpl copyBeforeReceive = messageImpl.copyBeforeReceive();
        if (this.metrics != null) {
            this.metrics.scheduleMessage(handlerHolder.getHandler().getMetric(), messageImpl.isLocal());
        }
        handlerHolder.getContext().runOnContext(r5 -> {
            try {
                if (!handlerHolder.isRemoved()) {
                    handlerHolder.getHandler().handle(copyBeforeReceive);
                }
            } finally {
                if (handlerHolder.isReplyHandler()) {
                    handlerHolder.getHandler().unregister();
                }
            }
        });
    }

    protected void finalize() throws Throwable {
        close(asyncResult -> {
        });
        super.finalize();
    }
}
