package net.kyori.bunny;

import com.google.common.base.MoreObjects;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import javax.inject.Inject;
import net.kyori.blizzard.NonNull;
import net.kyori.blizzard.Nullable;
import net.kyori.bunny.message.Consume;
import net.kyori.bunny.message.Message;
import net.kyori.bunny.message.MessageConsumer;
import net.kyori.bunny.message.MessageRegistry;
import net.kyori.bunny.message.TargetedMessageConsumer;
import net.kyori.membrane.facet.Connectable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:net/kyori/bunny/QueueImpl.class */
public abstract class QueueImpl implements Connectable, Queue {
    private static final Logger LOGGER = LoggerFactory.getLogger(Queue.class);

    @Inject
    private Bunny bunny;

    @Inject
    private Gson gson;

    @Inject
    private MessageRegistry mr;

    @NonNull
    private final String name;
    private final boolean durable;
    private final boolean exclusive;
    private final boolean autoDelete;

    @Nullable
    private final Map<String, Object> arguments;

    @Nullable
    private String consumerTag;
    private final Multimap<TypeToken<? extends Message>, SubscriptionImpl<? extends Message>> consumers = HashMultimap.create();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/kyori/bunny/QueueImpl$ConsumerImpl.class */
    public final class ConsumerImpl implements Consumer {
        private ConsumerImpl() {
        }

        public void handleConsumeOk(String str) {
        }

        public void handleCancelOk(String str) {
        }

        public void handleCancel(String str) throws IOException {
        }

        public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
        }

        public void handleRecoverOk(String str) {
        }

        public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
            QueueImpl.this.bunny.channel().basicAck(envelope.getDeliveryTag(), false);
            try {
                delivery(basicProperties, bArr);
            } catch (Throwable th) {
                QueueImpl.LOGGER.error(String.format("Exception delivering message: %s", QueueImpl.describe(basicProperties)), th);
            }
        }

        private void delivery(AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
            TypeToken<? extends Message> type = QueueImpl.this.mr.type(basicProperties.getType());
            if (type == null) {
                return;
            }
            Collection collection = QueueImpl.this.consumers.get(type);
            if (collection.isEmpty()) {
                return;
            }
            Message message = (Message) QueueImpl.this.gson.fromJson(new String(bArr, StandardCharsets.UTF_8), type.getType());
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                TargetedMessageConsumer targetedMessageConsumer = ((SubscriptionImpl) it.next()).consumer;
                it.getClass();
                targetedMessageConsumer.accept(message, it::remove, basicProperties);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/kyori/bunny/QueueImpl$SubscriptionImpl.class */
    public class SubscriptionImpl<M extends Message> implements Subscription {
        final TargetedMessageConsumer consumer;

        private SubscriptionImpl(TargetedMessageConsumer<M> targetedMessageConsumer) {
            this.consumer = targetedMessageConsumer;
        }

        @Override // net.kyori.bunny.Subscription
        public void cancel() {
            QueueImpl.this.consumers.values().remove(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueueImpl(@NonNull String str, boolean z, boolean z2, boolean z3, @Nullable Map<String, Object> map) {
        this.name = str;
        this.durable = z;
        this.exclusive = z2;
        this.autoDelete = z3;
        this.arguments = map;
    }

    @NonNull
    public String name() {
        return this.name;
    }

    public boolean durable() {
        return this.durable;
    }

    public boolean exclusive() {
        return this.exclusive;
    }

    public boolean autoDelete() {
        return this.autoDelete;
    }

    @Nullable
    public Map<String, Object> arguments() {
        if (this.arguments != null) {
            return Collections.unmodifiableMap(this.arguments);
        }
        return null;
    }

    public void connect() throws IOException, TimeoutException {
        LOGGER.info("Declaring queue '{}'", this);
        this.bunny.channel().queueDeclare(this.name, this.durable, this.exclusive, this.autoDelete, this.arguments);
        this.consumerTag = this.bunny.channel().basicConsume(this.name, false, "", false, true, (Map) null, new ConsumerImpl());
        LOGGER.info("Starting consume on '{}' with tag '{}'", this, this.consumerTag);
    }

    public void disconnect() throws IOException, TimeoutException {
        if (this.consumerTag == null || !this.bunny.active()) {
            return;
        }
        LOGGER.info("Cancelling consume on '{}' with tag '{}'", this, this.consumerTag);
        this.bunny.channel().basicCancel(this.consumerTag);
    }

    public void bind(@NonNull Exchange exchange, @NonNull String str) {
        try {
            LOGGER.info("Binding queue '{}' to exchange '{}' with routing key '{}'", new Object[]{this, exchange, str});
            this.bunny.channel().queueBind(this.name, exchange.name(), str, (Map) null);
        } catch (IOException e) {
            LOGGER.error("Exception binding queue", e);
        }
    }

    public void unbind(@NonNull Exchange exchange, @NonNull String str) {
        try {
            LOGGER.info("Unbinding queue '{}' from exchange '{}' with routing key '{}'", new Object[]{this, exchange, str});
            this.bunny.channel().queueUnbind(this.name, exchange.name(), str, (Map) null);
        } catch (IOException e) {
            LOGGER.error("Exception unbinding queue", e);
        }
    }

    @NonNull
    public <M extends Message> Subscription subscribe(@NonNull TypeToken<M> typeToken, @NonNull TargetedMessageConsumer<M> targetedMessageConsumer) {
        SubscriptionImpl subscriptionImpl = new SubscriptionImpl(targetedMessageConsumer);
        this.consumers.put(typeToken, subscriptionImpl);
        return subscriptionImpl;
    }

    public void subscribe(@NonNull MessageConsumer messageConsumer) {
        TypeToken of = TypeToken.of(messageConsumer.getClass());
        Arrays.stream(messageConsumer.getClass().getDeclaredMethods()).filter(method -> {
            return method.isAnnotationPresent(Consume.class);
        }).filter(method2 -> {
            return method2.getGenericParameterTypes().length == 3;
        }).filter(method3 -> {
            Type[] genericParameterTypes = method3.getGenericParameterTypes();
            return TypeToken.of(genericParameterTypes[0]).isSubtypeOf(Message.class) && TypeToken.of(genericParameterTypes[1]).isSupertypeOf(Subscription.class) && TypeToken.of(genericParameterTypes[2]).isSupertypeOf(AMQP.BasicProperties.class);
        }).forEach(method4 -> {
            subscribe(of.resolveType(method4.getGenericParameterTypes()[0]), (message, subscription, basicProperties) -> {
                try {
                    method4.invoke(messageConsumer, message, subscription, basicProperties);
                } catch (IllegalAccessException | InvocationTargetException e) {
                    LOGGER.error("Exception delivering message to consumer", e);
                }
            });
        });
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).addValue(this.name).toString();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String describe(AMQP.BasicProperties basicProperties) {
        StringBuilder sb = new StringBuilder();
        basicProperties.appendPropertyDebugStringTo(sb);
        return sb.toString();
    }
}
