package org.springframework.pulsar.reactive.core;

import java.util.Collections;
import java.util.Objects;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.MessageSpec;
import org.apache.pulsar.reactive.client.api.MessageSpecBuilder;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
import org.reactivestreams.Publisher;
import org.springframework.core.log.LogAccessor;
import org.springframework.pulsar.core.SchemaUtils;
import org.springframework.pulsar.reactive.core.ReactivePulsarOperations;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/springframework/pulsar/reactive/core/ReactivePulsarTemplate.class */
public class ReactivePulsarTemplate<T> implements ReactivePulsarOperations<T> {
    private final LogAccessor logger = new LogAccessor(getClass());
    private final ReactivePulsarSenderFactory<T> reactiveMessageSenderFactory;
    private Schema<T> schema;

    /* loaded from: input_file:org/springframework/pulsar/reactive/core/ReactivePulsarTemplate$SendMessageBuilderImpl.class */
    public static class SendMessageBuilderImpl<T> implements ReactivePulsarOperations.SendMessageBuilder<T> {
        private final ReactivePulsarTemplate<T> template;
        private final T message;
        private String topic;
        private MessageSpecBuilderCustomizer<T> messageCustomizer;
        private ReactiveMessageSenderBuilderCustomizer<T> senderCustomizer;

        SendMessageBuilderImpl(ReactivePulsarTemplate<T> reactivePulsarTemplate, T t) {
            this.template = reactivePulsarTemplate;
            this.message = t;
        }

        @Override // org.springframework.pulsar.reactive.core.ReactivePulsarOperations.SendMessageBuilder
        public SendMessageBuilderImpl<T> withTopic(String str) {
            this.topic = str;
            return this;
        }

        @Override // org.springframework.pulsar.reactive.core.ReactivePulsarOperations.SendMessageBuilder
        public SendMessageBuilderImpl<T> withMessageCustomizer(MessageSpecBuilderCustomizer<T> messageSpecBuilderCustomizer) {
            this.messageCustomizer = messageSpecBuilderCustomizer;
            return this;
        }

        @Override // org.springframework.pulsar.reactive.core.ReactivePulsarOperations.SendMessageBuilder
        public SendMessageBuilderImpl<T> withSenderCustomizer(ReactiveMessageSenderBuilderCustomizer<T> reactiveMessageSenderBuilderCustomizer) {
            this.senderCustomizer = reactiveMessageSenderBuilderCustomizer;
            return this;
        }

        @Override // org.springframework.pulsar.reactive.core.ReactivePulsarOperations.SendMessageBuilder
        public Mono<MessageId> send() {
            return this.template.doSend(this.topic, this.message, this.messageCustomizer, this.senderCustomizer);
        }
    }

    public ReactivePulsarTemplate(ReactivePulsarSenderFactory<T> reactivePulsarSenderFactory) {
        this.reactiveMessageSenderFactory = reactivePulsarSenderFactory;
    }

    @Override // org.springframework.pulsar.reactive.core.ReactivePulsarOperations
    public Mono<MessageId> send(T t) {
        return send((String) null, (String) t);
    }

    @Override // org.springframework.pulsar.reactive.core.ReactivePulsarOperations
    public Mono<MessageId> send(String str, T t) {
        return doSend(str, t, null, null);
    }

    @Override // org.springframework.pulsar.reactive.core.ReactivePulsarOperations
    public Flux<MessageId> send(Publisher<T> publisher) {
        return send((String) null, (Publisher) publisher);
    }

    @Override // org.springframework.pulsar.reactive.core.ReactivePulsarOperations
    public Flux<MessageId> send(String str, Publisher<T> publisher) {
        return doSendMany(str, Flux.from(publisher));
    }

    @Override // org.springframework.pulsar.reactive.core.ReactivePulsarOperations
    public SendMessageBuilderImpl<T> newMessage(T t) {
        return new SendMessageBuilderImpl<>(this, t);
    }

    public void setSchema(Schema<T> schema) {
        this.schema = schema;
    }

    private Mono<MessageId> doSend(String str, T t, MessageSpecBuilderCustomizer<T> messageSpecBuilderCustomizer, ReactiveMessageSenderBuilderCustomizer<T> reactiveMessageSenderBuilderCustomizer) {
        String resolveTopicName = ReactiveMessageSenderUtils.resolveTopicName(str, this.reactiveMessageSenderFactory);
        this.logger.trace(() -> {
            return String.format("Sending reactive message to '%s' topic", resolveTopicName);
        });
        return createMessageSender(str, t, reactiveMessageSenderBuilderCustomizer).sendOne(getMessageSpec(messageSpecBuilderCustomizer, t)).doOnError(th -> {
            this.logger.error(th, () -> {
                return String.format("Failed to send message to '%s' topic", resolveTopicName);
            });
        }).doOnSuccess(messageId -> {
            this.logger.trace(() -> {
                return String.format("Sent message to '%s' topic", resolveTopicName);
            });
        });
    }

    private Flux<MessageId> doSendMany(String str, Flux<T> flux) {
        String resolveTopicName = ReactiveMessageSenderUtils.resolveTopicName(str, this.reactiveMessageSenderFactory);
        this.logger.trace(() -> {
            return String.format("Sending reactive messages to '%s' topic", resolveTopicName);
        });
        if (this.schema == null) {
            return flux.flatMapSequential(obj -> {
                return doSend(str, obj, null, null);
            });
        }
        ReactiveMessageSender<T> createMessageSender = createMessageSender(str, null, null);
        Flux map = flux.map(MessageSpec::of);
        Objects.requireNonNull(createMessageSender);
        return ((Flux) map.as((v1) -> {
            return r1.sendMany(v1);
        })).doOnError(th -> {
            this.logger.error(th, () -> {
                return String.format("Failed to send messages to '%s' topic", resolveTopicName);
            });
        }).doOnNext(messageId -> {
            this.logger.trace(() -> {
                return String.format("Sent messages to '%s' topic", resolveTopicName);
            });
        });
    }

    private static <T> MessageSpec<T> getMessageSpec(MessageSpecBuilderCustomizer<T> messageSpecBuilderCustomizer, T t) {
        MessageSpecBuilder<T> builder = MessageSpec.builder(t);
        if (messageSpecBuilderCustomizer != null) {
            messageSpecBuilderCustomizer.customize(builder);
        }
        return builder.build();
    }

    private ReactiveMessageSender<T> createMessageSender(String str, T t, ReactiveMessageSenderBuilderCustomizer<T> reactiveMessageSenderBuilderCustomizer) {
        return this.reactiveMessageSenderFactory.createSender(str, this.schema != null ? this.schema : SchemaUtils.getSchema(t), reactiveMessageSenderBuilderCustomizer == null ? Collections.emptyList() : Collections.singletonList(reactiveMessageSenderBuilderCustomizer));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.springframework.pulsar.reactive.core.ReactivePulsarOperations
    public /* bridge */ /* synthetic */ ReactivePulsarOperations.SendMessageBuilder newMessage(Object obj) {
        return newMessage((ReactivePulsarTemplate<T>) obj);
    }
}
