package org.springframework.pulsar.core;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.core.PulsarOperations;
import org.springframework.pulsar.observation.DefaultPulsarTemplateObservationConvention;
import org.springframework.pulsar.observation.PulsarMessageSenderContext;
import org.springframework.pulsar.observation.PulsarTemplateObservation;
import org.springframework.pulsar.observation.PulsarTemplateObservationConvention;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:org/springframework/pulsar/core/PulsarTemplate.class */
public class PulsarTemplate<T> implements PulsarOperations<T>, ApplicationContextAware, BeanNameAware, SmartInitializingSingleton {
    private final LogAccessor logger;
    private final PulsarProducerFactory<T> producerFactory;
    private final List<ProducerInterceptor> interceptors;
    private final SchemaResolver schemaResolver;
    private final TopicResolver topicResolver;
    private boolean observationEnabled;

    @Nullable
    private ObservationRegistry observationRegistry;

    @Nullable
    private PulsarTemplateObservationConvention observationConvention;

    @Nullable
    private ApplicationContext applicationContext;
    private String beanName;

    /* loaded from: input_file:org/springframework/pulsar/core/PulsarTemplate$SendMessageBuilderImpl.class */
    public static class SendMessageBuilderImpl<T> implements PulsarOperations.SendMessageBuilder<T> {
        private final PulsarTemplate<T> template;

        @Nullable
        private final T message;

        @Nullable
        private String topic;

        @Nullable
        private Schema<T> schema;

        @Nullable
        private Collection<String> encryptionKeys;

        @Nullable
        private TypedMessageBuilderCustomizer<T> messageCustomizer;

        @Nullable
        private ProducerBuilderCustomizer<T> producerCustomizer;

        SendMessageBuilderImpl(PulsarTemplate<T> pulsarTemplate, @Nullable T t) {
            this.template = pulsarTemplate;
            this.message = t;
        }

        @Override // org.springframework.pulsar.core.PulsarOperations.SendMessageBuilder
        public PulsarOperations.SendMessageBuilder<T> withTopic(String str) {
            this.topic = str;
            return this;
        }

        @Override // org.springframework.pulsar.core.PulsarOperations.SendMessageBuilder
        public PulsarOperations.SendMessageBuilder<T> withSchema(Schema<T> schema) {
            this.schema = schema;
            return this;
        }

        @Override // org.springframework.pulsar.core.PulsarOperations.SendMessageBuilder
        public PulsarOperations.SendMessageBuilder<T> withEncryptionKeys(Collection<String> collection) {
            this.encryptionKeys = collection;
            return this;
        }

        @Override // org.springframework.pulsar.core.PulsarOperations.SendMessageBuilder
        public PulsarOperations.SendMessageBuilder<T> withMessageCustomizer(TypedMessageBuilderCustomizer<T> typedMessageBuilderCustomizer) {
            this.messageCustomizer = typedMessageBuilderCustomizer;
            return this;
        }

        @Override // org.springframework.pulsar.core.PulsarOperations.SendMessageBuilder
        public PulsarOperations.SendMessageBuilder<T> withProducerCustomizer(ProducerBuilderCustomizer<T> producerBuilderCustomizer) {
            this.producerCustomizer = producerBuilderCustomizer;
            return this;
        }

        @Override // org.springframework.pulsar.core.PulsarOperations.SendMessageBuilder
        public MessageId send() throws PulsarClientException {
            return this.template.doSend(this.topic, this.message, this.schema, this.encryptionKeys, this.messageCustomizer, this.producerCustomizer);
        }

        @Override // org.springframework.pulsar.core.PulsarOperations.SendMessageBuilder
        public CompletableFuture<MessageId> sendAsync() throws PulsarClientException {
            return this.template.doSendAsync(this.topic, this.message, this.schema, this.encryptionKeys, this.messageCustomizer, this.producerCustomizer);
        }
    }

    public PulsarTemplate(PulsarProducerFactory<T> pulsarProducerFactory) {
        this(pulsarProducerFactory, Collections.emptyList());
    }

    public PulsarTemplate(PulsarProducerFactory<T> pulsarProducerFactory, List<ProducerInterceptor> list) {
        this(pulsarProducerFactory, list, new DefaultSchemaResolver(), new DefaultTopicResolver(), true);
    }

    public PulsarTemplate(PulsarProducerFactory<T> pulsarProducerFactory, List<ProducerInterceptor> list, SchemaResolver schemaResolver, TopicResolver topicResolver, boolean z) {
        this.logger = new LogAccessor(getClass());
        this.beanName = "";
        this.producerFactory = pulsarProducerFactory;
        this.interceptors = list;
        this.schemaResolver = schemaResolver;
        this.topicResolver = topicResolver;
        this.observationEnabled = z;
    }

    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    public void afterSingletonsInstantiated() {
        if (!this.observationEnabled) {
            this.logger.debug(() -> {
                return "Observations are not enabled - not recording";
            });
        } else if (this.applicationContext == null) {
            this.logger.warn(() -> {
                return "Observations enabled but application context null - not recording";
            });
        } else {
            this.observationRegistry = (ObservationRegistry) this.applicationContext.getBeanProvider(ObservationRegistry.class).getIfUnique(() -> {
                return this.observationRegistry;
            });
            this.observationConvention = (PulsarTemplateObservationConvention) this.applicationContext.getBeanProvider(PulsarTemplateObservationConvention.class).getIfUnique(() -> {
                return this.observationConvention;
            });
        }
    }

    @Override // org.springframework.pulsar.core.PulsarOperations
    public MessageId send(@Nullable T t) throws PulsarClientException {
        return doSend(null, t, null, null, null, null);
    }

    @Override // org.springframework.pulsar.core.PulsarOperations
    public MessageId send(@Nullable T t, @Nullable Schema<T> schema) throws PulsarClientException {
        return doSend(null, t, schema, null, null, null);
    }

    @Override // org.springframework.pulsar.core.PulsarOperations
    public MessageId send(@Nullable String str, @Nullable T t) throws PulsarClientException {
        return doSend(str, t, null, null, null, null);
    }

    @Override // org.springframework.pulsar.core.PulsarOperations
    public MessageId send(@Nullable String str, @Nullable T t, @Nullable Schema<T> schema) throws PulsarClientException {
        return doSend(str, t, schema, null, null, null);
    }

    @Override // org.springframework.pulsar.core.PulsarOperations
    public CompletableFuture<MessageId> sendAsync(@Nullable T t) throws PulsarClientException {
        return doSendAsync(null, t, null, null, null, null);
    }

    @Override // org.springframework.pulsar.core.PulsarOperations
    public CompletableFuture<MessageId> sendAsync(@Nullable T t, @Nullable Schema<T> schema) throws PulsarClientException {
        return doSendAsync(null, t, schema, null, null, null);
    }

    @Override // org.springframework.pulsar.core.PulsarOperations
    public CompletableFuture<MessageId> sendAsync(@Nullable String str, @Nullable T t) throws PulsarClientException {
        return doSendAsync(str, t, null, null, null, null);
    }

    @Override // org.springframework.pulsar.core.PulsarOperations
    public CompletableFuture<MessageId> sendAsync(@Nullable String str, @Nullable T t, @Nullable Schema<T> schema) throws PulsarClientException {
        return doSendAsync(str, t, schema, null, null, null);
    }

    @Override // org.springframework.pulsar.core.PulsarOperations
    public PulsarOperations.SendMessageBuilder<T> newMessage(@Nullable T t) {
        return new SendMessageBuilderImpl(this, t);
    }

    public void setBeanName(String str) {
        this.beanName = str;
    }

    private MessageId doSend(@Nullable String str, @Nullable T t, @Nullable Schema<T> schema, @Nullable Collection<String> collection, @Nullable TypedMessageBuilderCustomizer<T> typedMessageBuilderCustomizer, @Nullable ProducerBuilderCustomizer<T> producerBuilderCustomizer) throws PulsarClientException {
        try {
            return doSendAsync(str, t, schema, collection, typedMessageBuilderCustomizer, producerBuilderCustomizer).get();
        } catch (Exception e) {
            throw PulsarClientException.unwrap(e);
        }
    }

    private CompletableFuture<MessageId> doSendAsync(@Nullable String str, @Nullable T t, @Nullable Schema<T> schema, @Nullable Collection<String> collection, @Nullable TypedMessageBuilderCustomizer<T> typedMessageBuilderCustomizer, @Nullable ProducerBuilderCustomizer<T> producerBuilderCustomizer) throws PulsarClientException {
        String objects = Objects.toString(this.producerFactory.getDefaultTopic(), null);
        String orElseThrow = this.topicResolver.resolveTopic(str, (String) t, () -> {
            return objects;
        }).orElseThrow();
        this.logger.trace(() -> {
            return "Sending msg to '%s' topic".formatted(orElseThrow);
        });
        PulsarMessageSenderContext newContext = PulsarMessageSenderContext.newContext(orElseThrow, this.beanName);
        Observation newObservation = newObservation(newContext);
        try {
            newObservation.start();
            Producer<T> prepareProducerForSend = prepareProducerForSend(orElseThrow, t, schema, collection, producerBuilderCustomizer);
            try {
                TypedMessageBuilder<T> value = prepareProducerForSend.newMessage().value(t);
                if (typedMessageBuilderCustomizer != null) {
                    typedMessageBuilderCustomizer.customize(value);
                }
                Map<String, String> properties = newContext.properties();
                Objects.requireNonNull(value);
                properties.forEach(value::property);
                return value.sendAsync().whenComplete((BiConsumer) (messageId, th) -> {
                    if (th == null) {
                        this.logger.trace(() -> {
                            return "Sent msg to '%s' topic".formatted(orElseThrow);
                        });
                        newObservation.stop();
                    } else {
                        this.logger.error(th, () -> {
                            return "Failed to send msg to '%s' topic".formatted(orElseThrow);
                        });
                        newObservation.error(th);
                        newObservation.stop();
                    }
                    ProducerUtils.closeProducerAsync(prepareProducerForSend, this.logger);
                });
            } catch (Exception e) {
                ProducerUtils.closeProducerAsync(prepareProducerForSend, this.logger);
                throw e;
            }
        } catch (RuntimeException e2) {
            newObservation.error(e2);
            newObservation.stop();
            throw e2;
        }
    }

    private Observation newObservation(PulsarMessageSenderContext pulsarMessageSenderContext) {
        return this.observationRegistry == null ? Observation.NOOP : PulsarTemplateObservation.TEMPLATE_OBSERVATION.observation(this.observationConvention, DefaultPulsarTemplateObservationConvention.INSTANCE, () -> {
            return pulsarMessageSenderContext;
        }, this.observationRegistry);
    }

    private Producer<T> prepareProducerForSend(@Nullable String str, @Nullable T t, @Nullable Schema<T> schema, @Nullable Collection<String> collection, @Nullable ProducerBuilderCustomizer<T> producerBuilderCustomizer) throws PulsarClientException {
        Schema<T> orElseThrow = schema == null ? this.schemaResolver.resolveSchema((SchemaResolver) t).orElseThrow() : schema;
        ArrayList arrayList = new ArrayList();
        if (!CollectionUtils.isEmpty(this.interceptors)) {
            arrayList.add(producerBuilder -> {
                List<ProducerInterceptor> list = this.interceptors;
                Objects.requireNonNull(producerBuilder);
                list.forEach(producerInterceptor -> {
                    producerBuilder.intercept(new ProducerInterceptor[]{producerInterceptor});
                });
            });
        }
        if (producerBuilderCustomizer != null) {
            arrayList.add(producerBuilderCustomizer);
        }
        return this.producerFactory.createProducer(orElseThrow, str, collection, arrayList);
    }
}
