package org.springframework.xd.dirt.integration.test;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.xd.dirt.core.ModuleDescriptor;
import org.springframework.xd.dirt.core.Stream;
import org.springframework.xd.dirt.core.StreamsPath;
import org.springframework.xd.dirt.module.ModuleDefinitionRepository;
import org.springframework.xd.dirt.stream.StreamDefinitionRepository;
import org.springframework.xd.dirt.stream.StreamFactory;
import org.springframework.xd.dirt.util.MapBytesUtility;
import org.springframework.xd.dirt.zookeeper.Paths;
import org.springframework.xd.module.options.ModuleOptionsMetadataResolver;

/* loaded from: input_file:org/springframework/xd/dirt/integration/test/StreamCommandListener.class */
public class StreamCommandListener implements PathChildrenCacheListener {
    private static final Logger log = LoggerFactory.getLogger(StreamCommandListener.class);
    private static int TIMEOUT = 5000;
    private volatile CuratorFramework client;
    private final StreamFactory streamFactory;
    private ConcurrentMap<String, SettableFuture<Map<String, String>>> streamProperties = new ConcurrentHashMap();
    private final MapBytesUtility mapBytesUtility = new MapBytesUtility();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/xd/dirt/integration/test/StreamCommandListener$SettableFuture.class */
    public static class SettableFuture<V> implements Future<V> {
        private final AtomicReference<V> result;
        private final CountDownLatch latch;
        private final AtomicBoolean cancelled;

        private SettableFuture() {
            this.result = new AtomicReference<>();
            this.latch = new CountDownLatch(1);
            this.cancelled = new AtomicBoolean();
        }

        @Override // java.util.concurrent.Future
        public boolean cancel(boolean z) {
            if (isDone()) {
                return false;
            }
            boolean compareAndSet = this.cancelled.compareAndSet(false, true);
            if (compareAndSet) {
                try {
                    set(null);
                } catch (IllegalStateException e) {
                }
            }
            return compareAndSet;
        }

        @Override // java.util.concurrent.Future
        public boolean isCancelled() {
            return this.cancelled.get();
        }

        @Override // java.util.concurrent.Future
        public boolean isDone() {
            return this.cancelled.get() || this.latch.getCount() == 0;
        }

        public void set(V v) {
            if (this.latch.getCount() == 0) {
                throw new IllegalStateException("result already set");
            }
            this.result.set(v);
            this.latch.countDown();
        }

        @Override // java.util.concurrent.Future
        public V get() throws InterruptedException, ExecutionException {
            if (!this.cancelled.get()) {
                this.latch.await();
            }
            return this.result.get();
        }

        @Override // java.util.concurrent.Future
        public V get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
            if (this.cancelled.get()) {
                return null;
            }
            if (this.latch.await(j, timeUnit)) {
                return this.result.get();
            }
            throw new TimeoutException();
        }
    }

    public StreamCommandListener(StreamDefinitionRepository streamDefinitionRepository, ModuleDefinitionRepository moduleDefinitionRepository, ModuleOptionsMetadataResolver moduleOptionsMetadataResolver) {
        this.streamFactory = new StreamFactory(streamDefinitionRepository, moduleDefinitionRepository, moduleOptionsMetadataResolver);
    }

    public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
        SettableFuture<Map<String, String>> remove;
        this.client = curatorFramework;
        String streamName = new StreamsPath(pathChildrenCacheEvent.getData().getPath()).getStreamName();
        log.info("event: {} stream: {}", streamName, pathChildrenCacheEvent.getType());
        if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
            this.streamProperties.putIfAbsent(streamName, new SettableFuture<>());
            this.streamProperties.get(streamName).set(this.mapBytesUtility.toMap(pathChildrenCacheEvent.getData().getData()));
        } else {
            if (!pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED) || (remove = this.streamProperties.remove(streamName)) == null) {
                return;
            }
            remove.cancel(true);
        }
    }

    private Map<String, String> getStreamProperties(String str, long j) {
        this.streamProperties.putIfAbsent(str, new SettableFuture<>());
        try {
            return this.streamProperties.get(str).get(j, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        } catch (Exception e2) {
            throw new IllegalStateException("failed while waiting for stream properties", e2);
        }
    }

    public void waitForCreate(String str) {
        waitForCreateOrDestroyEvent(str, true);
    }

    public void waitForDestroy(String str) {
        waitForCreateOrDestroyEvent(str, false);
    }

    private void waitForCreateOrDestroyEvent(String str, boolean z) {
        String build = Paths.build(Paths.STREAMS, str);
        try {
            int i = 0;
            Stat stat = (Stat) this.client.checkExists().forPath(build);
            while (true) {
                if ((!z || stat != null) && (z || stat == null)) {
                    break;
                }
                i++;
                if (i >= TIMEOUT / 100) {
                    break;
                }
                Thread.sleep(100L);
                stat = (Stat) this.client.checkExists().forPath(build);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }

    public void waitForDeploy(String str) {
        List<String> moduleDeploymentPaths = getModuleDeploymentPaths(str);
        long currentTimeMillis = System.currentTimeMillis() + TIMEOUT;
        do {
            ListIterator<String> listIterator = moduleDeploymentPaths.listIterator();
            while (listIterator.hasNext()) {
                try {
                    Stat stat = (Stat) this.client.checkExists().forPath(listIterator.next());
                    if (stat != null && stat.getNumChildren() > 0) {
                        listIterator.remove();
                    }
                } catch (RuntimeException e) {
                    throw e;
                } catch (Exception e2) {
                    throw new IllegalStateException(String.format("Failed while waiting for deployment of stream %s.", str), e2);
                }
            }
            if (moduleDeploymentPaths.isEmpty()) {
                break;
            }
        } while (System.currentTimeMillis() < currentTimeMillis);
        if (!moduleDeploymentPaths.isEmpty()) {
            throw new IllegalStateException(String.format("Deployment of stream %s timed out.", str));
        }
    }

    public void waitForUndeploy(String str) {
        String build = Paths.build(Paths.STREAMS, str);
        long currentTimeMillis = System.currentTimeMillis() + TIMEOUT;
        do {
            try {
                Stat stat = (Stat) this.client.checkExists().forPath(build);
                if (stat == null || stat.getNumChildren() == 0) {
                    return;
                } else {
                    Thread.sleep(100L);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            } catch (RuntimeException e2) {
                throw e2;
            } catch (Exception e3) {
                throw new IllegalStateException(String.format("Failed while waiting for undeployment of stream %s.", str), e3);
            }
        } while (System.currentTimeMillis() < currentTimeMillis);
        throw new IllegalStateException(String.format("Undeployment of stream %s timed out.", str));
    }

    private List<String> getModuleDeploymentPaths(String str) {
        ArrayList arrayList = new ArrayList();
        try {
            Stream createStream = this.streamFactory.createStream(str, getStreamProperties(str, 5000L));
            Iterator<ModuleDescriptor> deploymentOrderIterator = createStream.getDeploymentOrderIterator();
            while (deploymentOrderIterator.hasNext()) {
                ModuleDescriptor next = deploymentOrderIterator.next();
                arrayList.add(new StreamsPath().setStreamName(createStream.getName()).setModuleType(next.getModuleDefinition().getType().toString()).setModuleLabel(next.getLabel()).build());
            }
            return arrayList;
        } catch (Exception e) {
            throw new IllegalStateException(String.format("Failed to determine module deployment paths for stream %s", str), e);
        }
    }
}
