package cn.callmee.springboot.pulsar.starter.client.config;

import cn.callmee.springboot.pulsar.starter.client.annotations.PulsarConsumer;
import cn.callmee.springboot.pulsar.starter.client.annotations.PulsarProducer;
import cn.callmee.springboot.pulsar.starter.client.exceptions.ClientInitException;
import cn.callmee.springboot.pulsar.starter.client.exceptions.ConsumerInitException;
import cn.callmee.springboot.pulsar.starter.client.exceptions.ProducerInitException;
import cn.callmee.springboot.pulsar.starter.client.holder.Holder;
import cn.callmee.springboot.pulsar.starter.client.message.FailedMessage;
import cn.callmee.springboot.pulsar.starter.client.utils.SchemaUtils;
import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerInterceptor;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.DependsOn;
import org.springframework.util.StringUtils;

@DependsOn({"pulsarClient"})
/* loaded from: input_file:cn/callmee/springboot/pulsar/starter/client/config/PulsarClientInitialConfiguration.class */
public class PulsarClientInitialConfiguration extends PulsarClientInitial {
    private static final Logger log = LoggerFactory.getLogger(PulsarClientInitialConfiguration.class);

    public PulsarClientInitialConfiguration() {
        log.debug("[CONFIG]初始化 Pulsar 消息队列");
    }

    public Object postProcessAfterInitialization(Object obj, String str) {
        return obj;
    }

    @Override // cn.callmee.springboot.pulsar.starter.client.config.PulsarClientInitial
    protected void postInitializationConsumer(Method method, Object obj, Class<?> cls) {
        Consumer<?> buildConsumer;
        PulsarConsumer pulsarConsumer = (PulsarConsumer) method.getAnnotation(PulsarConsumer.class);
        String buildConsumerName = this.pulsarUrlGenerator.buildConsumerName(cls, method);
        Holder.ConsumerHolder consumerHolder = new Holder.ConsumerHolder(pulsarConsumer, method, obj, SchemaUtils.getParameterType(method, pulsarConsumer.paramName()), pulsarConsumer.interceptorClass());
        if (pulsarConsumer.autoStart() && null != (buildConsumer = buildConsumer(buildConsumerName, consumerHolder))) {
            COLLECT_CONSUMERS.add(buildConsumer);
        }
        CONSUMERS.put(buildConsumerName, ImmutableTriple.of(buildConsumerName, pulsarConsumer, consumerHolder));
        addInLog(REG_CONSUMER_LOG_ARRAY, "- topic: {}\n  handler: {}.{}\n  interceptor: {}", new Object[]{pulsarConsumer.topic(), cls.getName(), method.getName(), consumerHolder.getInterceptor()});
    }

    @Override // cn.callmee.springboot.pulsar.starter.client.config.PulsarClientInitial
    protected void postInitializationProducer(Method method, Class<?> cls) {
        String name = method.getName();
        PulsarProducer pulsarProducer = (PulsarProducer) method.getAnnotation(PulsarProducer.class);
        String[] generateProducerTopicList = SchemaUtils.generateProducerTopicList(pulsarProducer);
        Class<?> returnType = method.getReturnType();
        if (Void.TYPE.equals(returnType)) {
            log.warn("{}.{} must define return to active producer, skip init it", cls.getName(), name);
            return;
        }
        for (String str : generateProducerTopicList) {
            String resolveStringValue = STRING_VALUE_RESOLVER.resolveStringValue(StringUtils.hasText(str) ? str : name);
            PRODUCERS.put(resolveStringValue, ImmutableTriple.of(UUID.randomUUID().toString(), pulsarProducer, buildProducer(new Holder.ProducerHolder(resolveStringValue, returnType, pulsarProducer))));
        }
        addInLog(REG_PRODUCER_LOG_ARRAY, "- topic: {}\n  handler: {}.{}\n  msgType: {}\n  interceptor: {}", new Object[]{Arrays.toString(generateProducerTopicList), cls.getName(), name, returnType, pulsarProducer.interceptorClass()});
    }

    private Producer<?> buildProducer(Holder.ProducerHolder producerHolder) {
        try {
            ProducerBuilder producerBuilder = this.pulsarClient.newProducer(getSchema(producerHolder)).intercept(new ProducerInterceptor[]{producerHolder.getInterceptor().getConstructor(new Class[0]).newInstance(new Object[0])}).blockIfQueueFull(producerHolder.getAnnotation().blockIfQueueFull()).topic((String) producerHolder.getNamespace().map(str -> {
                return this.pulsarUrlGenerator.buildTopicUrl(producerHolder.getTopic(), str);
            }).orElseGet(() -> {
                return this.pulsarUrlGenerator.buildTopicUrl(producerHolder.getTopic());
            }));
            if (this.pulsarProperties.isAllowInterceptor()) {
                producerBuilder.intercept(new ProducerInterceptor[]{producerHolder.getInterceptor().getConstructor(new Class[0]).newInstance(new Object[0])});
            }
            return producerBuilder.create();
        } catch (PulsarClientException e) {
            throw new ProducerInitException("Failed to init producer.", e);
        }
    }

    private Consumer<?> buildConsumer(String str, Holder.ConsumerHolder consumerHolder) {
        try {
            String resolveStringValue = STRING_VALUE_RESOLVER.resolveStringValue(consumerHolder.getAnnotation().consumerName());
            String resolveStringValue2 = STRING_VALUE_RESOLVER.resolveStringValue(consumerHolder.getAnnotation().subscriptionName());
            String resolveStringValue3 = STRING_VALUE_RESOLVER.resolveStringValue(consumerHolder.getAnnotation().topic());
            String resolveStringValue4 = STRING_VALUE_RESOLVER.resolveStringValue(consumerHolder.getAnnotation().namespace());
            SubscriptionType subscriptionType = this.pulsarUrlGenerator.getSubscriptionType(consumerHolder);
            List<Parameter> invokeParameterStructure = SchemaUtils.getInvokeParameterStructure(consumerHolder);
            ConsumerBuilder<?> messageListener = this.pulsarClient.newConsumer(SchemaUtils.getSchema(consumerHolder.getAnnotation().serialization(), null != consumerHolder.getType() ? consumerHolder.getType() : consumerHolder.getAnnotation().clazz())).intercept(new ConsumerInterceptor[]{consumerHolder.getInterceptor().getConstructor(new Class[0]).newInstance(new Object[0])}).consumerName(this.pulsarUrlGenerator.buildPulsarConsumerName(resolveStringValue, str)).subscriptionName(this.pulsarUrlGenerator.buildPulsarSubscriptionName(resolveStringValue2, str)).topic(new String[]{this.pulsarUrlGenerator.buildTopicUrl(resolveStringValue3, resolveStringValue4)}).subscriptionType(subscriptionType).subscriptionInitialPosition(consumerHolder.getAnnotation().initialPosition()).messageListener((consumer, message) -> {
                log.debug("handle consumer: {}", consumer.getConsumerName());
                try {
                    Method handler = consumerHolder.getHandler();
                    handler.setAccessible(true);
                    Object wrapMessage = consumerHolder.isWrapped() ? wrapMessage(message) : message.getValue();
                    handler.invoke(consumerHolder.getBean(), invokeParameterStructure.stream().map(parameter -> {
                        if (null == parameter) {
                            return null;
                        }
                        return parameter.getType().isAssignableFrom(Consumer.class) ? consumer : parameter.getType().isAssignableFrom(MessageId.class) ? message.getMessageId() : wrapMessage;
                    }).toArray());
                    try {
                        consumer.acknowledge(message);
                    } catch (PulsarClientException e) {
                        log.warn("message id {} has been acknowledge", message.getMessageId());
                    }
                } catch (Exception e2) {
                    log.warn("consume failed: {}", e2.getLocalizedMessage());
                    consumer.negativeAcknowledge(message);
                    this.SINK.tryEmitNext(new FailedMessage(e2, consumer, message));
                }
            });
            if (this.pulsarProperties.isAllowInterceptor()) {
                messageListener.intercept(new ConsumerInterceptor[]{consumerHolder.getInterceptor().getConstructor(new Class[0]).newInstance(new Object[0])});
            }
            if (this.pulsarProperties.getConsumer().getAckTimeoutMs() > 0) {
                messageListener.ackTimeout(this.pulsarProperties.getConsumer().getAckTimeoutMs(), TimeUnit.MILLISECONDS);
            }
            this.pulsarUrlGenerator.buildDeadLetterPolicy(consumerHolder.getAnnotation().maxRedeliverCount(), consumerHolder.getAnnotation().deadLetterTopic(), messageListener);
            return messageListener.subscribe();
        } catch (PulsarClientException | ClientInitException e) {
            if (!(e instanceof PulsarClientException.ConsumerBusyException)) {
                throw new ConsumerInitException("Failed to init consumer.", e);
            }
            log.warn(e.getLocalizedMessage());
            return null;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1227625322:
                if (implMethodName.equals("lambda$buildConsumer$eacbc091$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("cn/callmee/springboot/pulsar/starter/client/config/PulsarClientInitialConfiguration") && serializedLambda.getImplMethodSignature().equals("(Lcn/callmee/springboot/pulsar/starter/client/holder/Holder$ConsumerHolder;Ljava/util/List;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    PulsarClientInitialConfiguration pulsarClientInitialConfiguration = (PulsarClientInitialConfiguration) serializedLambda.getCapturedArg(0);
                    Holder.ConsumerHolder consumerHolder = (Holder.ConsumerHolder) serializedLambda.getCapturedArg(1);
                    List list = (List) serializedLambda.getCapturedArg(2);
                    return (consumer, message) -> {
                        log.debug("handle consumer: {}", consumer.getConsumerName());
                        try {
                            Method handler = consumerHolder.getHandler();
                            handler.setAccessible(true);
                            Object wrapMessage = consumerHolder.isWrapped() ? wrapMessage(message) : message.getValue();
                            handler.invoke(consumerHolder.getBean(), list.stream().map(parameter -> {
                                if (null == parameter) {
                                    return null;
                                }
                                return parameter.getType().isAssignableFrom(Consumer.class) ? consumer : parameter.getType().isAssignableFrom(MessageId.class) ? message.getMessageId() : wrapMessage;
                            }).toArray());
                            try {
                                consumer.acknowledge(message);
                            } catch (PulsarClientException e) {
                                log.warn("message id {} has been acknowledge", message.getMessageId());
                            }
                        } catch (Exception e2) {
                            log.warn("consume failed: {}", e2.getLocalizedMessage());
                            consumer.negativeAcknowledge(message);
                            this.SINK.tryEmitNext(new FailedMessage(e2, consumer, message));
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
