package org.springframework.pulsar.reactive.core;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderBuilder;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache;
import org.apache.pulsar.reactive.client.api.ReactivePulsarClient;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.core.DefaultTopicResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.pulsar.reactive.core.RestartableComponentSupport;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ReflectionUtils;

/* loaded from: input_file:org/springframework/pulsar/reactive/core/DefaultReactivePulsarSenderFactory.class */
public final class DefaultReactivePulsarSenderFactory<T> implements ReactivePulsarSenderFactory<T>, RestartableComponentSupport {
    private static final int LIFECYCLE_PHASE = -1073741924;
    private final LogAccessor logger = new LogAccessor(getClass());
    private final AtomicReference<RestartableComponentSupport.State> currentState = RestartableComponentSupport.initialState();
    private final ReactivePulsarClient reactivePulsarClient;
    private final TopicResolver topicResolver;

    @Nullable
    private final ReactiveMessageSenderCache reactiveMessageSenderCache;

    @Nullable
    private String defaultTopic;

    @Nullable
    private final List<ReactiveMessageSenderBuilderCustomizer<T>> defaultConfigCustomizers;

    /* loaded from: input_file:org/springframework/pulsar/reactive/core/DefaultReactivePulsarSenderFactory$Builder.class */
    public static final class Builder<T> {
        private final ReactivePulsarClient reactivePulsarClient;
        private TopicResolver topicResolver = new DefaultTopicResolver();

        @Nullable
        private ReactiveMessageSenderCache messageSenderCache;

        @Nullable
        private String defaultTopic;

        @Nullable
        private List<ReactiveMessageSenderBuilderCustomizer<T>> defaultConfigCustomizers;

        private Builder(ReactivePulsarClient reactivePulsarClient) {
            Assert.notNull(reactivePulsarClient, "Reactive client is required");
            this.reactivePulsarClient = reactivePulsarClient;
        }

        public Builder<T> withTopicResolver(TopicResolver topicResolver) {
            this.topicResolver = topicResolver;
            return this;
        }

        public Builder<T> withMessageSenderCache(ReactiveMessageSenderCache reactiveMessageSenderCache) {
            this.messageSenderCache = reactiveMessageSenderCache;
            return this;
        }

        public Builder<T> withDefaultTopic(String str) {
            this.defaultTopic = str;
            return this;
        }

        public Builder<T> withDefaultConfigCustomizer(ReactiveMessageSenderBuilderCustomizer<T> reactiveMessageSenderBuilderCustomizer) {
            this.defaultConfigCustomizers = List.of(reactiveMessageSenderBuilderCustomizer);
            return this;
        }

        public Builder<T> withDefaultConfigCustomizers(List<ReactiveMessageSenderBuilderCustomizer<T>> list) {
            this.defaultConfigCustomizers = list;
            return this;
        }

        public DefaultReactivePulsarSenderFactory<T> build() {
            Assert.notNull(this.topicResolver, "Topic resolver is required");
            return new DefaultReactivePulsarSenderFactory<>(this.reactivePulsarClient, this.topicResolver, this.messageSenderCache, this.defaultTopic, this.defaultConfigCustomizers);
        }
    }

    private DefaultReactivePulsarSenderFactory(ReactivePulsarClient reactivePulsarClient, TopicResolver topicResolver, @Nullable ReactiveMessageSenderCache reactiveMessageSenderCache, @Nullable String str, @Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> list) {
        this.reactivePulsarClient = reactivePulsarClient;
        this.topicResolver = topicResolver;
        this.reactiveMessageSenderCache = reactiveMessageSenderCache;
        this.defaultTopic = str;
        this.defaultConfigCustomizers = list;
    }

    public static <T> Builder<T> builderFor(ReactivePulsarClient reactivePulsarClient) {
        return new Builder<>(reactivePulsarClient);
    }

    public static <T> Builder<T> builderFor(PulsarClient pulsarClient) {
        return new Builder<>(AdaptedReactivePulsarClientFactory.create(pulsarClient));
    }

    @Override // org.springframework.pulsar.reactive.core.ReactivePulsarSenderFactory
    public ReactiveMessageSender<T> createSender(Schema<T> schema, @Nullable String str) {
        return doCreateReactiveMessageSender(schema, str, null);
    }

    @Override // org.springframework.pulsar.reactive.core.ReactivePulsarSenderFactory
    public ReactiveMessageSender<T> createSender(Schema<T> schema, @Nullable String str, @Nullable ReactiveMessageSenderBuilderCustomizer<T> reactiveMessageSenderBuilderCustomizer) {
        return doCreateReactiveMessageSender(schema, str, reactiveMessageSenderBuilderCustomizer != null ? Collections.singletonList(reactiveMessageSenderBuilderCustomizer) : null);
    }

    @Override // org.springframework.pulsar.reactive.core.ReactivePulsarSenderFactory
    public ReactiveMessageSender<T> createSender(Schema<T> schema, @Nullable String str, @Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> list) {
        return doCreateReactiveMessageSender(schema, str, list);
    }

    private ReactiveMessageSender<T> doCreateReactiveMessageSender(Schema<T> schema, @Nullable String str, @Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> list) {
        Objects.requireNonNull(schema, "Schema must be specified");
        String str2 = (String) this.topicResolver.resolveTopic(str, () -> {
            return getDefaultTopic();
        }).orElseThrow();
        this.logger.trace(() -> {
            return "Creating reactive message sender for '%s' topic".formatted(str2);
        });
        ReactiveMessageSenderBuilder messageSender = this.reactivePulsarClient.messageSender(schema);
        if (!CollectionUtils.isEmpty(this.defaultConfigCustomizers)) {
            this.defaultConfigCustomizers.forEach(reactiveMessageSenderBuilderCustomizer -> {
                reactiveMessageSenderBuilderCustomizer.customize(messageSender);
            });
        }
        messageSender.topic(str2);
        if (this.reactiveMessageSenderCache != null) {
            messageSender.cache(this.reactiveMessageSenderCache);
        }
        if (!CollectionUtils.isEmpty(list)) {
            list.forEach(reactiveMessageSenderBuilderCustomizer2 -> {
                reactiveMessageSenderBuilderCustomizer2.customize(messageSender);
            });
        }
        messageSender.topic(str2);
        return messageSender.build();
    }

    @Override // org.springframework.pulsar.reactive.core.ReactivePulsarSenderFactory
    public String getDefaultTopic() {
        return this.defaultTopic;
    }

    public int getPhase() {
        return LIFECYCLE_PHASE;
    }

    @Override // org.springframework.pulsar.reactive.core.RestartableComponentSupport
    public AtomicReference<RestartableComponentSupport.State> currentState() {
        return this.currentState;
    }

    @Override // org.springframework.pulsar.reactive.core.RestartableComponentSupport
    public LogAccessor logger() {
        return this.logger;
    }

    @Override // org.springframework.pulsar.reactive.core.RestartableComponentSupport
    public void doStop() {
        try {
            reflectivelyClearCache();
            this.reactiveMessageSenderCache.close();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void reflectivelyClearCache() {
        Field findField = ReflectionUtils.findField(this.reactiveMessageSenderCache.getClass(), "cacheProvider");
        ReflectionUtils.makeAccessible(findField);
        Object field = ReflectionUtils.getField(findField, this.reactiveMessageSenderCache);
        Field findField2 = ReflectionUtils.findField(field.getClass(), "cache");
        ReflectionUtils.makeAccessible(findField2);
        Object field2 = ReflectionUtils.getField(findField2, field);
        Field findField3 = ReflectionUtils.findField(field2.getClass(), "cache");
        ReflectionUtils.makeAccessible(findField3);
        Object field3 = ReflectionUtils.getField(findField3, field2);
        Method findMethod = ReflectionUtils.findMethod(field3.getClass(), "clear");
        ReflectionUtils.makeAccessible(findMethod);
        ReflectionUtils.invokeMethod(findMethod, field3);
    }
}
