package io.axual.streams.proxy.switching;

import io.axual.client.proxy.switching.discovery.DiscoverySubscriber;
import io.axual.client.proxy.switching.generic.SwitchingProxy;
import io.axual.common.annotation.InterfaceStability;
import io.axual.common.concurrent.LockedObject;
import io.axual.common.tools.MapUtil;
import io.axual.common.tools.SleepUtil;
import io.axual.discovery.client.tools.DiscoveryConfigParserV2;
import io.axual.streams.proxy.generic.proxy.StreamsProxy;
import java.lang.Thread;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
/* loaded from: input_file:io/axual/streams/proxy/switching/SwitchingStreams.class */
public class SwitchingStreams extends SwitchingProxy<StreamsProxy, SwitchingStreamsConfig> implements StreamsProxy {
    private static final Logger LOG = LoggerFactory.getLogger(SwitchingStreams.class);
    private final AtomicBoolean running;
    private Thread restarterThread;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/axual/streams/proxy/switching/SwitchingStreams$Restarter.class */
    public class Restarter implements Runnable {
        private Restarter() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!Thread.currentThread().isInterrupted() && SwitchingStreams.this.running.get()) {
                if (SwitchingStreams.this.maybeReplaceProxiedObject(false)) {
                    if (!SwitchingStreams.this.running.compareAndSet(true, false)) {
                        SwitchingStreams.LOG.warn("Replaced a Streams instance which was not running, should not happen");
                    }
                    SwitchingStreams.this.startStreaming();
                }
                if (SwitchingStreams.this.running.get()) {
                    SleepUtil.sleep(Duration.ofSeconds(1L));
                }
            }
        }
    }

    public SwitchingStreams(Map<String, Object> map) {
        super(new SwitchingStreamsConfig(map), new DiscoverySubscriber(new DiscoveryConfigParserV2().parse(map), SwitchingStreams.class.getSimpleName(), new StreamsSwitcher(), false));
        this.running = new AtomicBoolean(false);
    }

    public SwitchingStreams(Properties properties) {
        this((Map<String, Object>) MapUtil.objectToStringMap(properties));
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public void setStateListener(KafkaStreams.StateListener stateListener) {
        LockedObject.ReadLock readLock = getReadLock();
        Throwable th = null;
        try {
            try {
                ((StreamsProxy) readLock.object).setStateListener(stateListener);
                if (readLock != null) {
                    if (0 == 0) {
                        readLock.close();
                        return;
                    }
                    try {
                        readLock.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (readLock != null) {
                if (th != null) {
                    try {
                        readLock.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    readLock.close();
                }
            }
            throw th4;
        }
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public KafkaStreams.State state() {
        LockedObject.ReadLock readLock = getReadLock();
        Throwable th = null;
        try {
            KafkaStreams.State state = ((StreamsProxy) readLock.object).state();
            if (readLock != null) {
                if (0 != 0) {
                    try {
                        readLock.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    readLock.close();
                }
            }
            return state;
        } catch (Throwable th3) {
            if (readLock != null) {
                if (0 != 0) {
                    try {
                        readLock.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    readLock.close();
                }
            }
            throw th3;
        }
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public Map<MetricName, ? extends Metric> metrics() {
        LockedObject.ReadLock readLock = getReadLock();
        Throwable th = null;
        try {
            Map<MetricName, ? extends Metric> metrics = ((StreamsProxy) readLock.object).metrics();
            if (readLock != null) {
                if (0 != 0) {
                    try {
                        readLock.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    readLock.close();
                }
            }
            return metrics;
        } catch (Throwable th3) {
            if (readLock != null) {
                if (0 != 0) {
                    try {
                        readLock.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    readLock.close();
                }
            }
            throw th3;
        }
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public synchronized void start() {
        startStreaming();
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public void stop() {
        close();
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public void close(Duration duration) {
        stopStreaming();
        super.close(duration);
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public void cleanUp() {
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
        LockedObject.ReadLock readLock = getReadLock();
        Throwable th = null;
        try {
            try {
                ((StreamsProxy) readLock.object).setUncaughtExceptionHandler(streamsUncaughtExceptionHandler);
                if (readLock != null) {
                    if (0 == 0) {
                        readLock.close();
                        return;
                    }
                    try {
                        readLock.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (readLock != null) {
                if (th != null) {
                    try {
                        readLock.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    readLock.close();
                }
            }
            throw th4;
        }
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    @Deprecated
    public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
        LockedObject.ReadLock readLock = getReadLock();
        Throwable th = null;
        try {
            try {
                ((StreamsProxy) readLock.object).setUncaughtExceptionHandler(uncaughtExceptionHandler);
                if (readLock != null) {
                    if (0 == 0) {
                        readLock.close();
                        return;
                    }
                    try {
                        readLock.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (readLock != null) {
                if (th != null) {
                    try {
                        readLock.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    readLock.close();
                }
            }
            throw th4;
        }
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    @Deprecated
    public Collection<StreamsMetadata> allMetadata() {
        LockedObject.ReadLock readLock = getReadLock();
        Throwable th = null;
        try {
            Collection<StreamsMetadata> allMetadata = ((StreamsProxy) readLock.object).allMetadata();
            if (readLock != null) {
                if (0 != 0) {
                    try {
                        readLock.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    readLock.close();
                }
            }
            return allMetadata;
        } catch (Throwable th3) {
            if (readLock != null) {
                if (0 != 0) {
                    try {
                        readLock.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    readLock.close();
                }
            }
            throw th3;
        }
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    @Deprecated
    public Collection<StreamsMetadata> allMetadataForStore(String str) {
        LockedObject.ReadLock readLock = getReadLock();
        Throwable th = null;
        try {
            try {
                Collection<StreamsMetadata> allMetadataForStore = ((StreamsProxy) readLock.object).allMetadataForStore(str);
                if (readLock != null) {
                    if (0 != 0) {
                        try {
                            readLock.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        readLock.close();
                    }
                }
                return allMetadataForStore;
            } finally {
            }
        } catch (Throwable th3) {
            if (readLock != null) {
                if (th != null) {
                    try {
                        readLock.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    readLock.close();
                }
            }
            throw th3;
        }
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    @Deprecated
    public Set<ThreadMetadata> localThreadsMetadata() {
        LockedObject.ReadLock readLock = getReadLock();
        Throwable th = null;
        try {
            Set<ThreadMetadata> localThreadsMetadata = ((StreamsProxy) readLock.object).localThreadsMetadata();
            if (readLock != null) {
                if (0 != 0) {
                    try {
                        readLock.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    readLock.close();
                }
            }
            return localThreadsMetadata;
        } catch (Throwable th3) {
            if (readLock != null) {
                if (0 != 0) {
                    try {
                        readLock.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    readLock.close();
                }
            }
            throw th3;
        }
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public <K> KeyQueryMetadata queryMetadataForKey(String str, K k, Serializer<K> serializer) {
        LockedObject.ReadLock readLock = getReadLock();
        Throwable th = null;
        try {
            try {
                KeyQueryMetadata queryMetadataForKey = ((StreamsProxy) readLock.object).queryMetadataForKey(str, (String) k, (Serializer<String>) serializer);
                if (readLock != null) {
                    if (0 != 0) {
                        try {
                            readLock.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        readLock.close();
                    }
                }
                return queryMetadataForKey;
            } finally {
            }
        } catch (Throwable th3) {
            if (readLock != null) {
                if (th != null) {
                    try {
                        readLock.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    readLock.close();
                }
            }
            throw th3;
        }
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public <K> KeyQueryMetadata queryMetadataForKey(String str, K k, StreamPartitioner<? super K, ?> streamPartitioner) {
        LockedObject.ReadLock readLock = getReadLock();
        Throwable th = null;
        try {
            try {
                KeyQueryMetadata queryMetadataForKey = ((StreamsProxy) readLock.object).queryMetadataForKey(str, (String) k, (StreamPartitioner<? super String, ?>) streamPartitioner);
                if (readLock != null) {
                    if (0 != 0) {
                        try {
                            readLock.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        readLock.close();
                    }
                }
                return queryMetadataForKey;
            } finally {
            }
        } catch (Throwable th3) {
            if (readLock != null) {
                if (th != null) {
                    try {
                        readLock.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    readLock.close();
                }
            }
            throw th3;
        }
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public <T> T store(StoreQueryParameters<T> storeQueryParameters) {
        LockedObject.ReadLock readLock = getReadLock();
        Throwable th = null;
        try {
            try {
                T t = (T) ((StreamsProxy) readLock.object).store(storeQueryParameters);
                if (readLock != null) {
                    if (0 != 0) {
                        try {
                            readLock.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        readLock.close();
                    }
                }
                return t;
            } finally {
            }
        } catch (Throwable th3) {
            if (readLock != null) {
                if (th != null) {
                    try {
                        readLock.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    readLock.close();
                }
            }
            throw th3;
        }
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public Optional<String> addStreamThread() {
        LockedObject.ReadLock readLock = getReadLock();
        Throwable th = null;
        try {
            Optional<String> addStreamThread = ((StreamsProxy) readLock.object).addStreamThread();
            if (readLock != null) {
                if (0 != 0) {
                    try {
                        readLock.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    readLock.close();
                }
            }
            return addStreamThread;
        } catch (Throwable th3) {
            if (readLock != null) {
                if (0 != 0) {
                    try {
                        readLock.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    readLock.close();
                }
            }
            throw th3;
        }
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public Optional<String> removeStreamThread() {
        LockedObject.ReadLock readLock = getReadLock();
        Throwable th = null;
        try {
            Optional<String> removeStreamThread = ((StreamsProxy) readLock.object).removeStreamThread();
            if (readLock != null) {
                if (0 != 0) {
                    try {
                        readLock.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    readLock.close();
                }
            }
            return removeStreamThread;
        } catch (Throwable th3) {
            if (readLock != null) {
                if (0 != 0) {
                    try {
                        readLock.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    readLock.close();
                }
            }
            throw th3;
        }
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public Optional<String> removeStreamThread(Duration duration) {
        LockedObject.ReadLock readLock = getReadLock();
        Throwable th = null;
        try {
            try {
                Optional<String> removeStreamThread = ((StreamsProxy) readLock.object).removeStreamThread(duration);
                if (readLock != null) {
                    if (0 != 0) {
                        try {
                            readLock.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        readLock.close();
                    }
                }
                return removeStreamThread;
            } finally {
            }
        } catch (Throwable th3) {
            if (readLock != null) {
                if (th != null) {
                    try {
                        readLock.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    readLock.close();
                }
            }
            throw th3;
        }
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public Collection<org.apache.kafka.streams.StreamsMetadata> metadataForAllStreamsClients() {
        LockedObject.ReadLock readLock = getReadLock();
        Throwable th = null;
        try {
            Collection<org.apache.kafka.streams.StreamsMetadata> metadataForAllStreamsClients = ((StreamsProxy) readLock.object).metadataForAllStreamsClients();
            if (readLock != null) {
                if (0 != 0) {
                    try {
                        readLock.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    readLock.close();
                }
            }
            return metadataForAllStreamsClients;
        } catch (Throwable th3) {
            if (readLock != null) {
                if (0 != 0) {
                    try {
                        readLock.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    readLock.close();
                }
            }
            throw th3;
        }
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public Collection<org.apache.kafka.streams.StreamsMetadata> streamsMetadataForStore(String str) {
        LockedObject.ReadLock readLock = getReadLock();
        Throwable th = null;
        try {
            try {
                Collection<org.apache.kafka.streams.StreamsMetadata> streamsMetadataForStore = ((StreamsProxy) readLock.object).streamsMetadataForStore(str);
                if (readLock != null) {
                    if (0 != 0) {
                        try {
                            readLock.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        readLock.close();
                    }
                }
                return streamsMetadataForStore;
            } finally {
            }
        } catch (Throwable th3) {
            if (readLock != null) {
                if (th != null) {
                    try {
                        readLock.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    readLock.close();
                }
            }
            throw th3;
        }
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public Set<org.apache.kafka.streams.ThreadMetadata> metadataForLocalThreads() {
        LockedObject.ReadLock readLock = getReadLock();
        Throwable th = null;
        try {
            Set<org.apache.kafka.streams.ThreadMetadata> metadataForLocalThreads = ((StreamsProxy) readLock.object).metadataForLocalThreads();
            if (readLock != null) {
                if (0 != 0) {
                    try {
                        readLock.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    readLock.close();
                }
            }
            return metadataForLocalThreads;
        } catch (Throwable th3) {
            if (readLock != null) {
                if (0 != 0) {
                    try {
                        readLock.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    readLock.close();
                }
            }
            throw th3;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void startStreaming() {
        if (!this.running.compareAndSet(false, true)) {
            LOG.warn("Could not start streaming cause it appears it is already/still running");
            return;
        }
        StreamsUncaughtExceptionHandler create = this.config.getUncaughtExceptionHandlerFactory().create(this);
        LockedObject.ReadLock readLock = getReadLock();
        Throwable th = null;
        try {
            if (create != null) {
                ((StreamsProxy) readLock.object).setUncaughtExceptionHandler(create);
            }
            ((StreamsProxy) readLock.object).start();
            if (readLock != null) {
                if (0 != 0) {
                    try {
                        readLock.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    readLock.close();
                }
            }
            if (this.restarterThread == null || !this.restarterThread.isAlive()) {
                this.restarterThread = new Thread(new Restarter(), "streamRestarter");
                this.restarterThread.setDaemon(true);
                this.restarterThread.start();
            }
        } catch (Throwable th3) {
            if (readLock != null) {
                if (0 != 0) {
                    try {
                        readLock.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    readLock.close();
                }
            }
            throw th3;
        }
    }

    private synchronized void stopStreaming() {
        if (!this.running.get()) {
            LOG.info("Close was called on StreamRunner but was already stopped.");
            return;
        }
        if (this.restarterThread != null) {
            this.restarterThread.interrupt();
            this.restarterThread = null;
        }
        LockedObject.WriteLock writeLock = getWriteLock();
        Throwable th = null;
        try {
            ((StreamsProxy) writeLock.getObject()).close();
            ((StreamsProxy) writeLock.getObject()).cleanUp();
            if (writeLock != null) {
                if (0 != 0) {
                    try {
                        writeLock.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    writeLock.close();
                }
            }
            this.running.set(false);
        } catch (Throwable th3) {
            if (writeLock != null) {
                if (0 != 0) {
                    try {
                        writeLock.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    writeLock.close();
                }
            }
            throw th3;
        }
    }
}
