package org.springframework.pulsar.reactive.core;

import java.util.Objects;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.MessageSendResult;
import org.apache.pulsar.reactive.client.api.MessageSpec;
import org.apache.pulsar.reactive.client.api.MessageSpecBuilder;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
import org.reactivestreams.Publisher;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.core.DefaultSchemaResolver;
import org.springframework.pulsar.core.DefaultTopicResolver;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.pulsar.reactive.core.ReactivePulsarOperations;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/springframework/pulsar/reactive/core/ReactivePulsarTemplate.class */
public class ReactivePulsarTemplate<T> implements ReactivePulsarOperations<T> {
    private final LogAccessor logger;
    private final ReactivePulsarSenderFactory<T> reactiveMessageSenderFactory;
    private final SchemaResolver schemaResolver;
    private final TopicResolver topicResolver;

    /* loaded from: input_file:org/springframework/pulsar/reactive/core/ReactivePulsarTemplate$SendManyMessageBuilderImpl.class */
    private static final class SendManyMessageBuilderImpl<T> extends SendMessageBuilderImpl<SendManyMessageBuilderImpl<T>, T> implements ReactivePulsarOperations.SendManyMessageBuilder<T> {
        private final Publisher<MessageSpec<T>> messages;

        SendManyMessageBuilderImpl(ReactivePulsarTemplate<T> reactivePulsarTemplate, Publisher<MessageSpec<T>> publisher) {
            super(reactivePulsarTemplate);
            this.messages = publisher;
        }

        @Override // org.springframework.pulsar.reactive.core.ReactivePulsarOperations.SendManyMessageBuilder
        public Flux<MessageSendResult<T>> send() {
            return this.template.doSendMany(this.topic, Flux.from(this.messages), this.schema, this.senderCustomizer);
        }
    }

    /* loaded from: input_file:org/springframework/pulsar/reactive/core/ReactivePulsarTemplate$SendMessageBuilderImpl.class */
    private static class SendMessageBuilderImpl<O, T> {
        protected final ReactivePulsarTemplate<T> template;

        @Nullable
        protected String topic;

        @Nullable
        protected Schema<T> schema;

        @Nullable
        protected ReactiveMessageSenderBuilderCustomizer<T> senderCustomizer;

        SendMessageBuilderImpl(ReactivePulsarTemplate<T> reactivePulsarTemplate) {
            this.template = reactivePulsarTemplate;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public O withTopic(String str) {
            this.topic = str;
            return this;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public O withSchema(Schema<T> schema) {
            this.schema = schema;
            return this;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public O withSenderCustomizer(ReactiveMessageSenderBuilderCustomizer<T> reactiveMessageSenderBuilderCustomizer) {
            this.senderCustomizer = reactiveMessageSenderBuilderCustomizer;
            return this;
        }
    }

    /* loaded from: input_file:org/springframework/pulsar/reactive/core/ReactivePulsarTemplate$SendOneMessageBuilderImpl.class */
    private static final class SendOneMessageBuilderImpl<T> extends SendMessageBuilderImpl<SendOneMessageBuilderImpl<T>, T> implements ReactivePulsarOperations.SendOneMessageBuilder<T> {

        @Nullable
        private final T message;

        @Nullable
        private MessageSpecBuilderCustomizer<T> messageCustomizer;

        SendOneMessageBuilderImpl(ReactivePulsarTemplate<T> reactivePulsarTemplate, @Nullable T t) {
            super(reactivePulsarTemplate);
            this.message = t;
        }

        @Override // org.springframework.pulsar.reactive.core.ReactivePulsarOperations.SendOneMessageBuilder
        public SendOneMessageBuilderImpl<T> withMessageCustomizer(MessageSpecBuilderCustomizer<T> messageSpecBuilderCustomizer) {
            this.messageCustomizer = messageSpecBuilderCustomizer;
            return this;
        }

        @Override // org.springframework.pulsar.reactive.core.ReactivePulsarOperations.SendOneMessageBuilder
        public Mono<MessageId> send() {
            return this.template.doSend(this.topic, this.message, this.schema, this.messageCustomizer, this.senderCustomizer);
        }
    }

    public ReactivePulsarTemplate(ReactivePulsarSenderFactory<T> reactivePulsarSenderFactory) {
        this(reactivePulsarSenderFactory, new DefaultSchemaResolver(), new DefaultTopicResolver());
    }

    public ReactivePulsarTemplate(ReactivePulsarSenderFactory<T> reactivePulsarSenderFactory, SchemaResolver schemaResolver, TopicResolver topicResolver) {
        this.logger = new LogAccessor(getClass());
        this.reactiveMessageSenderFactory = reactivePulsarSenderFactory;
        this.schemaResolver = schemaResolver;
        this.topicResolver = topicResolver;
    }

    @Override // org.springframework.pulsar.reactive.core.ReactivePulsarOperations
    public Mono<MessageId> send(@Nullable T t) {
        return send((String) null, (String) t);
    }

    @Override // org.springframework.pulsar.reactive.core.ReactivePulsarOperations
    public Mono<MessageId> send(@Nullable T t, @Nullable Schema<T> schema) {
        return doSend(null, t, schema, null, null);
    }

    @Override // org.springframework.pulsar.reactive.core.ReactivePulsarOperations
    public Mono<MessageId> send(@Nullable String str, @Nullable T t) {
        return doSend(str, t, null, null, null);
    }

    @Override // org.springframework.pulsar.reactive.core.ReactivePulsarOperations
    public Mono<MessageId> send(@Nullable String str, @Nullable T t, @Nullable Schema<T> schema) {
        return doSend(str, t, schema, null, null);
    }

    @Override // org.springframework.pulsar.reactive.core.ReactivePulsarOperations
    public Flux<MessageSendResult<T>> send(Publisher<MessageSpec<T>> publisher) {
        return send((String) null, (Publisher) publisher);
    }

    @Override // org.springframework.pulsar.reactive.core.ReactivePulsarOperations
    public Flux<MessageSendResult<T>> send(Publisher<MessageSpec<T>> publisher, @Nullable Schema<T> schema) {
        return doSendMany(null, Flux.from(publisher), schema, null);
    }

    @Override // org.springframework.pulsar.reactive.core.ReactivePulsarOperations
    public Flux<MessageSendResult<T>> send(@Nullable String str, Publisher<MessageSpec<T>> publisher) {
        return doSendMany(str, Flux.from(publisher), null, null);
    }

    @Override // org.springframework.pulsar.reactive.core.ReactivePulsarOperations
    public Flux<MessageSendResult<T>> send(@Nullable String str, Publisher<MessageSpec<T>> publisher, @Nullable Schema<T> schema) {
        return doSendMany(str, Flux.from(publisher), schema, null);
    }

    @Override // org.springframework.pulsar.reactive.core.ReactivePulsarOperations
    public ReactivePulsarOperations.SendOneMessageBuilder<T> newMessage(@Nullable T t) {
        return new SendOneMessageBuilderImpl(this, t);
    }

    @Override // org.springframework.pulsar.reactive.core.ReactivePulsarOperations
    public ReactivePulsarOperations.SendManyMessageBuilder<T> newMessages(Publisher<MessageSpec<T>> publisher) {
        return new SendManyMessageBuilderImpl(this, publisher);
    }

    private Mono<MessageId> doSend(@Nullable String str, @Nullable T t, @Nullable Schema<T> schema, @Nullable MessageSpecBuilderCustomizer<T> messageSpecBuilderCustomizer, @Nullable ReactiveMessageSenderBuilderCustomizer<T> reactiveMessageSenderBuilderCustomizer) {
        String resolveTopic = resolveTopic(str, t);
        this.logger.trace(() -> {
            return "Sending reactive msg to '%s' topic".formatted(resolveTopic);
        });
        return createMessageSender(resolveTopic, t, schema, reactiveMessageSenderBuilderCustomizer).sendOne(getMessageSpec(messageSpecBuilderCustomizer, t)).doOnError(th -> {
            this.logger.error(th, () -> {
                return "Failed to send message to '%s' topic".formatted(resolveTopic);
            });
        }).doOnSuccess(messageId -> {
            this.logger.trace(() -> {
                return "Sent message to '%s' topic".formatted(resolveTopic);
            });
        });
    }

    private Flux<MessageSendResult<T>> doSendMany(@Nullable String str, Flux<MessageSpec<T>> flux, @Nullable Schema<T> schema, @Nullable ReactiveMessageSenderBuilderCustomizer<T> reactiveMessageSenderBuilderCustomizer) {
        return flux.switchOnFirst((signal, flux2) -> {
            MessageSpec messageSpec = (MessageSpec) signal.get();
            if (messageSpec == null || !signal.isOnNext()) {
                return flux2.thenMany(Flux.empty());
            }
            String resolveTopic = resolveTopic(str, messageSpec.getValue());
            ReactiveMessageSender createMessageSender = createMessageSender(resolveTopic, messageSpec.getValue(), schema, reactiveMessageSenderBuilderCustomizer);
            Objects.requireNonNull(createMessageSender);
            return ((Flux) flux2.as((v1) -> {
                return r1.sendMany(v1);
            })).doOnError(th -> {
                this.logger.error(th, () -> {
                    return "Failed to send messages to '%s' topic".formatted(resolveTopic);
                });
            }).doOnNext(messageSendResult -> {
                this.logger.trace(() -> {
                    return "Sent messages to '%s' topic".formatted(resolveTopic);
                });
            });
        });
    }

    private String resolveTopic(@Nullable String str, @Nullable Object obj) {
        String defaultTopic = this.reactiveMessageSenderFactory.getDefaultTopic();
        return (String) this.topicResolver.resolveTopic(str, obj, () -> {
            return defaultTopic;
        }).orElseThrow();
    }

    private static <T> MessageSpec<T> getMessageSpec(@Nullable MessageSpecBuilderCustomizer<T> messageSpecBuilderCustomizer, @Nullable T t) {
        MessageSpecBuilder<T> builder = MessageSpec.builder(t);
        if (messageSpecBuilderCustomizer != null) {
            messageSpecBuilderCustomizer.customize(builder);
        }
        return builder.build();
    }

    private ReactiveMessageSender<T> createMessageSender(@Nullable String str, @Nullable T t, @Nullable Schema<T> schema, @Nullable ReactiveMessageSenderBuilderCustomizer<T> reactiveMessageSenderBuilderCustomizer) {
        return this.reactiveMessageSenderFactory.createSender(schema == null ? (Schema) this.schemaResolver.resolveSchema(t).orElseThrow() : schema, str, reactiveMessageSenderBuilderCustomizer);
    }
}
