package org.springframework.pulsar.reactive.core;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory;
import org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageSenderSpec;
import org.apache.pulsar.reactive.client.api.MutableReactiveMessageSenderSpec;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderBuilder;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec;
import org.apache.pulsar.reactive.client.api.ReactivePulsarClient;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.core.DefaultTopicResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:org/springframework/pulsar/reactive/core/DefaultReactivePulsarSenderFactory.class */
public class DefaultReactivePulsarSenderFactory<T> implements ReactivePulsarSenderFactory<T> {
    private final LogAccessor logger;
    private final ReactivePulsarClient reactivePulsarClient;
    private final ReactiveMessageSenderSpec reactiveMessageSenderSpec;

    @Nullable
    private final ReactiveMessageSenderCache reactiveMessageSenderCache;
    private TopicResolver topicResolver;

    public DefaultReactivePulsarSenderFactory(PulsarClient pulsarClient, @Nullable ReactiveMessageSenderSpec reactiveMessageSenderSpec, @Nullable ReactiveMessageSenderCache reactiveMessageSenderCache) {
        this(AdaptedReactivePulsarClientFactory.create(pulsarClient), reactiveMessageSenderSpec, reactiveMessageSenderCache, new DefaultTopicResolver());
    }

    public DefaultReactivePulsarSenderFactory(ReactivePulsarClient reactivePulsarClient, @Nullable ReactiveMessageSenderSpec reactiveMessageSenderSpec, @Nullable ReactiveMessageSenderCache reactiveMessageSenderCache, TopicResolver topicResolver) {
        this.logger = new LogAccessor(getClass());
        this.reactivePulsarClient = reactivePulsarClient;
        this.reactiveMessageSenderSpec = new ImmutableReactiveMessageSenderSpec(reactiveMessageSenderSpec != null ? reactiveMessageSenderSpec : new MutableReactiveMessageSenderSpec());
        this.reactiveMessageSenderCache = reactiveMessageSenderCache;
        this.topicResolver = topicResolver;
    }

    @Override // org.springframework.pulsar.reactive.core.ReactivePulsarSenderFactory
    public ReactiveMessageSender<T> createSender(Schema<T> schema, @Nullable String str) {
        return doCreateReactiveMessageSender(schema, str, null);
    }

    @Override // org.springframework.pulsar.reactive.core.ReactivePulsarSenderFactory
    public ReactiveMessageSender<T> createSender(Schema<T> schema, @Nullable String str, @Nullable ReactiveMessageSenderBuilderCustomizer<T> reactiveMessageSenderBuilderCustomizer) {
        return doCreateReactiveMessageSender(schema, str, reactiveMessageSenderBuilderCustomizer != null ? Collections.singletonList(reactiveMessageSenderBuilderCustomizer) : null);
    }

    @Override // org.springframework.pulsar.reactive.core.ReactivePulsarSenderFactory
    public ReactiveMessageSender<T> createSender(Schema<T> schema, @Nullable String str, @Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> list) {
        return doCreateReactiveMessageSender(schema, str, list);
    }

    private ReactiveMessageSender<T> doCreateReactiveMessageSender(Schema<T> schema, @Nullable String str, @Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> list) {
        Objects.requireNonNull(schema, "Schema must be specified");
        String str2 = (String) this.topicResolver.resolveTopic(str, () -> {
            return getReactiveMessageSenderSpec().getTopicName();
        }).orElseThrow();
        this.logger.trace(() -> {
            return "Creating reactive message sender for '%s' topic".formatted(str2);
        });
        ReactiveMessageSenderBuilder messageSender = this.reactivePulsarClient.messageSender(schema);
        messageSender.applySpec(this.reactiveMessageSenderSpec);
        messageSender.topic(str2);
        if (this.reactiveMessageSenderCache != null) {
            messageSender.cache(this.reactiveMessageSenderCache);
        }
        if (!CollectionUtils.isEmpty(list)) {
            list.forEach(reactiveMessageSenderBuilderCustomizer -> {
                reactiveMessageSenderBuilderCustomizer.customize(messageSender);
            });
        }
        messageSender.topic(str2);
        return messageSender.build();
    }

    @Override // org.springframework.pulsar.reactive.core.ReactivePulsarSenderFactory
    public ReactiveMessageSenderSpec getReactiveMessageSenderSpec() {
        return this.reactiveMessageSenderSpec;
    }
}
