package com.rabbitmq.stream.impl;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.stream.Address;
import com.rabbitmq.stream.AddressResolver;
import com.rabbitmq.stream.BackOffDelayPolicy;
import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.ConsumerBuilder;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.MessageHandler;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.ProducerBuilder;
import com.rabbitmq.stream.StreamCreator;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.compression.CompressionCodecFactory;
import com.rabbitmq.stream.impl.Client;
import com.rabbitmq.stream.impl.OffsetTrackingCoordinator;
import com.rabbitmq.stream.impl.StreamConsumerBuilder;
import com.rabbitmq.stream.impl.StreamEnvironmentBuilder;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.ssl.SslContextBuilder;
import java.io.IOException;
import java.net.URI;
import java.net.URLDecoder;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.stream.Collectors;
import javax.net.ssl.SSLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.xml.BeanDefinitionParserDelegate;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/stream-client-0.4.0.jar:com/rabbitmq/stream/impl/StreamEnvironment.class */
public class StreamEnvironment implements Environment {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) StreamEnvironment.class);
    private final Random random;
    private final EventLoopGroup eventLoopGroup;
    private final ScheduledExecutorService scheduledExecutorService;
    private final boolean privateScheduleExecutorService;
    private final Client.ClientParameters clientParametersPrototype;
    private final List<Address> addresses;
    private final List<StreamProducer> producers;
    private final List<StreamConsumer> consumers;
    private final Codec codec;
    private final BackOffDelayPolicy recoveryBackOffDelayPolicy;
    private final BackOffDelayPolicy topologyUpdateBackOffDelayPolicy;
    private final ConsumersCoordinator consumersCoordinator;
    private final ProducersCoordinator producersCoordinator;
    private final OffsetTrackingCoordinator offsetTrackingCoordinator;
    private final AtomicBoolean closed;
    private final AddressResolver addressResolver;
    private final Clock clock;
    private final ScheduledFuture<?> clockRefreshFuture;
    private final ByteBufAllocator byteBufAllocator;
    private final AtomicBoolean locatorInitialized;
    private final Runnable locatorInitializationSequence;
    private volatile Client locator;

    /* loaded from: input_file:BOOT-INF/lib/stream-client-0.4.0.jar:com/rabbitmq/stream/impl/StreamEnvironment$TrackingConsumerRegistration.class */
    static class TrackingConsumerRegistration {
        private final Runnable closingCallback;
        private final Consumer<MessageHandler.Context> postMessageProcessingCallback;
        private final LongConsumer trackingCallback;

        TrackingConsumerRegistration(Runnable runnable, Consumer<MessageHandler.Context> consumer, LongConsumer longConsumer) {
            this.closingCallback = runnable;
            this.postMessageProcessingCallback = consumer;
            this.trackingCallback = longConsumer;
        }

        public Runnable closingCallback() {
            return this.closingCallback;
        }

        public LongConsumer trackingCallback() {
            return this.trackingCallback;
        }

        public Consumer<MessageHandler.Context> postMessageProcessingCallback() {
            return this.postMessageProcessingCallback;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamEnvironment(ScheduledExecutorService scheduledExecutorService, Client.ClientParameters clientParameters, List<URI> list, BackOffDelayPolicy backOffDelayPolicy, BackOffDelayPolicy backOffDelayPolicy2, AddressResolver addressResolver, int i, int i2, int i3, StreamEnvironmentBuilder.DefaultTlsConfiguration defaultTlsConfiguration, ByteBufAllocator byteBufAllocator, boolean z) {
        this(scheduledExecutorService, clientParameters, list, backOffDelayPolicy, backOffDelayPolicy2, addressResolver, i, i2, i3, defaultTlsConfiguration, byteBufAllocator, z, clientParameters2 -> {
            return new Client(clientParameters2);
        });
    }

    StreamEnvironment(ScheduledExecutorService scheduledExecutorService, Client.ClientParameters clientParameters, List<URI> list, BackOffDelayPolicy backOffDelayPolicy, BackOffDelayPolicy backOffDelayPolicy2, AddressResolver addressResolver, int i, int i2, int i3, StreamEnvironmentBuilder.DefaultTlsConfiguration defaultTlsConfiguration, ByteBufAllocator byteBufAllocator, boolean z, Function<Client.ClientParameters, Client> function) {
        boolean z2;
        this.random = new Random();
        this.producers = new CopyOnWriteArrayList();
        this.consumers = new CopyOnWriteArrayList();
        this.closed = new AtomicBoolean(false);
        this.clock = new Clock();
        this.locatorInitialized = new AtomicBoolean(false);
        this.recoveryBackOffDelayPolicy = backOffDelayPolicy;
        this.topologyUpdateBackOffDelayPolicy = backOffDelayPolicy2;
        this.byteBufAllocator = byteBufAllocator;
        clientParameters.byteBufAllocator(byteBufAllocator);
        Client.ClientParameters maybeSetUpClientParametersFromUris = maybeSetUpClientParametersFromUris(list, clientParameters);
        this.addressResolver = addressResolver;
        if (defaultTlsConfiguration == null || !defaultTlsConfiguration.enabled()) {
            z2 = false;
        } else {
            z2 = true;
            try {
                maybeSetUpClientParametersFromUris.sslContext(defaultTlsConfiguration.sslContext() == null ? SslContextBuilder.forClient().build() : defaultTlsConfiguration.sslContext());
                maybeSetUpClientParametersFromUris.tlsHostnameVerification(defaultTlsConfiguration.hostnameVerificationEnabled());
            } catch (SSLException e) {
                throw new StreamException("Error while creating Netty SSL context", e);
            }
        }
        if (list.isEmpty()) {
            this.addresses = Collections.singletonList(new Address(maybeSetUpClientParametersFromUris.host, maybeSetUpClientParametersFromUris.port));
        } else {
            int i4 = z2 ? Client.DEFAULT_TLS_PORT : Client.DEFAULT_PORT;
            this.addresses = (List) list.stream().map(uri -> {
                return new Address(uri.getHost() == null ? ConnectionFactory.DEFAULT_HOST : uri.getHost(), uri.getPort() == -1 ? i4 : uri.getPort());
            }).collect(Collectors.toList());
        }
        if (maybeSetUpClientParametersFromUris.eventLoopGroup == null) {
            this.eventLoopGroup = new NioEventLoopGroup();
            this.clientParametersPrototype = maybeSetUpClientParametersFromUris.duplicate().eventLoopGroup(this.eventLoopGroup);
        } else {
            this.eventLoopGroup = null;
            this.clientParametersPrototype = maybeSetUpClientParametersFromUris.duplicate().eventLoopGroup(maybeSetUpClientParametersFromUris.eventLoopGroup);
        }
        if (scheduledExecutorService == null) {
            this.scheduledExecutorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
            this.privateScheduleExecutorService = true;
        } else {
            this.scheduledExecutorService = scheduledExecutorService;
            this.privateScheduleExecutorService = false;
        }
        this.producersCoordinator = new ProducersCoordinator(this, i, i2, Utils.coordinatorClientFactory(this));
        this.consumersCoordinator = new ConsumersCoordinator(this, i3, Utils.coordinatorClientFactory(this));
        this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this);
        AtomicReference atomicReference = new AtomicReference();
        atomicReference.set(shutdownContext -> {
            if (shutdownContext.isShutdownUnexpected()) {
                this.locator = null;
                LOGGER.debug("Unexpected locator disconnection, trying to reconnect");
                Client.ClientParameters shutdownListener = this.clientParametersPrototype.duplicate().shutdownListener((Client.ShutdownListener) atomicReference.get());
                AsyncRetry.asyncRetry(() -> {
                    Address resolve = addressResolver.resolve(this.addresses.size() == 1 ? this.addresses.get(0) : this.addresses.get(this.random.nextInt(this.addresses.size())));
                    LOGGER.debug("Trying to reconnect locator on {}", resolve);
                    Client client = (Client) function.apply(shutdownListener.host(resolve.host()).port(resolve.port()).clientProperty("connection_name", "rabbitmq-stream-locator"));
                    LOGGER.debug("Locator connected on {}", resolve);
                    return client;
                }).description("Locator recovery").scheduler(this.scheduledExecutorService).delayPolicy(backOffDelayPolicy).build().thenAccept(client -> {
                    this.locator = client;
                });
            }
        });
        Client.ClientParameters duplicate = maybeSetUpClientParametersFromUris.duplicate();
        Runnable runnable = () -> {
            RuntimeException runtimeException = null;
            Iterator<Address> it = this.addresses.iterator();
            while (it.hasNext()) {
                Address resolve = addressResolver.resolve(it.next());
                try {
                    this.locator = (Client) function.apply(duplicate.duplicate().host(resolve.host()).port(resolve.port()).clientProperty("connection_name", "rabbitmq-stream-locator").shutdownListener((Client.ShutdownListener) atomicReference.get()));
                    LOGGER.debug("Locator connected to {}", resolve);
                    break;
                } catch (RuntimeException e2) {
                    LOGGER.debug("Error while try to connect to {}: {}", resolve, e2.getMessage());
                    runtimeException = e2;
                }
            }
            if (this.locator == null) {
                throw runtimeException;
            }
        };
        if (z) {
            this.locatorInitializationSequence = runnable;
        } else {
            runnable.run();
            this.locatorInitialized.set(true);
            this.locatorInitializationSequence = () -> {
            };
        }
        this.codec = maybeSetUpClientParametersFromUris.codec == null ? Codecs.DEFAULT : maybeSetUpClientParametersFromUris.codec;
        this.clockRefreshFuture = this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            this.clock.refresh();
        }, 1L, 1L, TimeUnit.SECONDS);
    }

    private static String uriDecode(String str) {
        try {
            return URLDecoder.decode(str.replace("+", "%2B"), "US-ASCII");
        } catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

    Client.ClientParameters maybeSetUpClientParametersFromUris(List<URI> list, Client.ClientParameters clientParameters) {
        if (list.isEmpty()) {
            return clientParameters;
        }
        URI uri = list.get(0);
        Client.ClientParameters duplicate = clientParameters.duplicate();
        String host = uri.getHost();
        if (host != null) {
            duplicate.host(host);
        }
        int port = uri.getPort();
        if (port != -1) {
            duplicate.port(port);
        }
        String rawUserInfo = uri.getRawUserInfo();
        if (rawUserInfo != null) {
            String[] split = rawUserInfo.split(":");
            if (split.length > 2) {
                throw new IllegalArgumentException("Bad user info in URI " + rawUserInfo);
            }
            duplicate.username(uriDecode(split[0]));
            if (split.length == 2) {
                duplicate.password(uriDecode(split[1]));
            }
        }
        String rawPath = uri.getRawPath();
        if (rawPath != null && rawPath.length() > 0) {
            if (rawPath.indexOf(47, 1) != -1) {
                throw new IllegalArgumentException("Multiple segments in path of URI: " + rawPath);
            }
            duplicate.virtualHost(uriDecode(uri.getPath().substring(1)));
        }
        return duplicate;
    }

    public ByteBufAllocator byteBufAllocator() {
        return this.byteBufAllocator;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void maybeInitializeLocator() {
        if (this.locatorInitialized.compareAndSet(false, true)) {
            try {
                this.locatorInitializationSequence.run();
            } catch (RuntimeException e) {
                this.locatorInitialized.set(false);
                throw e;
            }
        }
    }

    @Override // com.rabbitmq.stream.Environment
    public StreamCreator streamCreator() {
        return new StreamStreamCreator(this);
    }

    @Override // com.rabbitmq.stream.Environment
    public void deleteStream(String str) {
        maybeInitializeLocator();
        Client.Response delete = locator().delete(str);
        if (!delete.isOk()) {
            throw new StreamException("Error while deleting stream " + str + " (" + Utils.formatConstant(delete.getResponseCode()) + ")", delete.getResponseCode());
        }
    }

    @Override // com.rabbitmq.stream.Environment
    public ProducerBuilder producerBuilder() {
        return new StreamProducerBuilder(this);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addProducer(StreamProducer streamProducer) {
        this.producers.add(streamProducer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeProducer(StreamProducer streamProducer) {
        this.producers.remove(streamProducer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addConsumer(StreamConsumer streamConsumer) {
        this.consumers.add(streamConsumer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeConsumer(StreamConsumer streamConsumer) {
        this.consumers.remove(streamConsumer);
    }

    @Override // com.rabbitmq.stream.Environment
    public ConsumerBuilder consumerBuilder() {
        return new StreamConsumerBuilder(this);
    }

    @Override // com.rabbitmq.stream.Environment, java.lang.AutoCloseable
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            Iterator<StreamProducer> it = this.producers.iterator();
            while (it.hasNext()) {
                try {
                    it.next().closeFromEnvironment();
                } catch (Exception e) {
                    LOGGER.warn("Error while closing producer, moving on to the next one", (Throwable) e);
                }
            }
            Iterator<StreamConsumer> it2 = this.consumers.iterator();
            while (it2.hasNext()) {
                try {
                    it2.next().closeFromEnvironment();
                } catch (Exception e2) {
                    LOGGER.warn("Error while closing consumer, moving on to the next one", (Throwable) e2);
                }
            }
            this.producersCoordinator.close();
            this.consumersCoordinator.close();
            this.offsetTrackingCoordinator.close();
            try {
                if (this.locator != null && this.locator.isOpen()) {
                    this.locator.close();
                    this.locator = null;
                }
            } catch (Exception e3) {
                LOGGER.warn("Error while closing locator client", (Throwable) e3);
            }
            this.clockRefreshFuture.cancel(false);
            if (this.privateScheduleExecutorService) {
                this.scheduledExecutorService.shutdownNow();
            }
            try {
                if (this.eventLoopGroup != null && (!this.eventLoopGroup.isShuttingDown() || !this.eventLoopGroup.isShutdown())) {
                    LOGGER.debug("Closing Netty event loop group");
                    this.eventLoopGroup.shutdownGracefully(1L, 10L, TimeUnit.SECONDS).get(10L, TimeUnit.SECONDS);
                }
            } catch (InterruptedException e4) {
                LOGGER.info("Event loop group closing has been interrupted");
                Thread.currentThread().interrupt();
            } catch (ExecutionException e5) {
                LOGGER.info("Event loop group closing failed", (Throwable) e5);
            } catch (TimeoutException e6) {
                LOGGER.info("Could not close event loop group in 10 seconds");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ScheduledExecutorService scheduledExecutorService() {
        return this.scheduledExecutorService;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BackOffDelayPolicy recoveryBackOffDelayPolicy() {
        return this.recoveryBackOffDelayPolicy;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BackOffDelayPolicy topologyUpdateBackOffDelayPolicy() {
        return this.topologyUpdateBackOffDelayPolicy;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompressionCodecFactory compressionCodecFactory() {
        return this.clientParametersPrototype.compressionCodecFactory;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Runnable registerConsumer(StreamConsumer streamConsumer, String str, OffsetSpecification offsetSpecification, String str2, MessageHandler messageHandler) {
        return this.consumersCoordinator.subscribe(streamConsumer, str, offsetSpecification, str2, messageHandler);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Runnable registerProducer(StreamProducer streamProducer, String str, String str2) {
        return this.producersCoordinator.registerProducer(streamProducer, str, str2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Client locator() {
        if (this.locator == null) {
            throw new StreamException("No connection available");
        }
        return this.locator;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Clock clock() {
        return this.clock;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AddressResolver addressResolver() {
        return this.addressResolver;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Codec codec() {
        return this.codec;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Client.ClientParameters clientParametersCopy() {
        return this.clientParametersPrototype.duplicate();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TrackingConsumerRegistration registerTrackingConsumer(StreamConsumer streamConsumer, StreamConsumerBuilder.TrackingConfiguration trackingConfiguration) {
        Runnable registerTrackingConsumer = this.producersCoordinator.registerTrackingConsumer(streamConsumer);
        OffsetTrackingCoordinator.Registration registration = null;
        if (this.offsetTrackingCoordinator.needTrackingRegistration(trackingConfiguration)) {
            registration = this.offsetTrackingCoordinator.registerTrackingConsumer(streamConsumer, trackingConfiguration);
        }
        return new TrackingConsumerRegistration(registerTrackingConsumer, registration == null ? null : registration.postMessageProcessingCallback(), registration == null ? Utils.NO_OP_LONG_CONSUMER : registration.trackingCallback());
    }

    public String toString() {
        Client client = this.locator;
        return "{ locator : " + (client == null ? BeanDefinitionParserDelegate.NULL_ELEMENT : "'" + client.getHost() + ":" + client.getPort() + "'") + ", 'producers' : " + this.producersCoordinator + ", 'consumers' : " + this.consumersCoordinator + "}";
    }
}
