package org.springframework.data.gemfire.listener;

import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;
import org.apache.geode.cache.RegionService;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqEvent;
import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.CqListener;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.QueryException;
import org.apache.geode.cache.query.QueryService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.data.gemfire.GemfireQueryException;
import org.springframework.data.gemfire.client.PoolResolver;
import org.springframework.data.gemfire.client.support.DefaultableDelegatingPoolAdapter;
import org.springframework.data.gemfire.client.support.DelegatingPoolAdapter;
import org.springframework.data.gemfire.client.support.PoolManagerPoolResolver;
import org.springframework.data.gemfire.config.annotation.ContinuousQueryListenerContainerConfigurer;
import org.springframework.data.gemfire.util.ArrayUtils;
import org.springframework.data.gemfire.util.CollectionUtils;
import org.springframework.data.gemfire.util.SpringExtensions;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/data/gemfire/listener/ContinuousQueryListenerContainer.class */
public class ContinuousQueryListenerContainer implements BeanFactoryAware, BeanNameAware, InitializingBean, DisposableBean, SmartLifecycle {
    public static final String DEFAULT_THREAD_NAME_PREFIX = String.format("%s-", ContinuousQueryListenerContainer.class.getSimpleName());
    protected static final PoolResolver DEFAULT_POOL_RESOLVER = new PoolManagerPoolResolver();
    private BeanFactory beanFactory;
    private ErrorHandler errorHandler;
    private Executor taskExecutor;
    private QueryService queryService;
    private String beanName;
    private String poolName;
    private boolean autoStartup = true;
    private volatile boolean initialized = false;
    private volatile boolean manageExecutor = false;
    private volatile boolean running = false;
    private int phase = Integer.MAX_VALUE;
    private List<ContinuousQueryListenerContainerConfigurer> cqListenerContainerConfigurers = Collections.emptyList();
    private ContinuousQueryListenerContainerConfigurer compositeCqListenerContainerConfigurer = (str, continuousQueryListenerContainer) -> {
        CollectionUtils.nullSafeList(this.cqListenerContainerConfigurers).forEach(continuousQueryListenerContainerConfigurer -> {
            continuousQueryListenerContainerConfigurer.configure(str, continuousQueryListenerContainer);
        });
    };
    protected final Logger logger = LoggerFactory.getLogger(getClass());
    private PoolResolver poolResolver = DEFAULT_POOL_RESOLVER;
    private Queue<CqQuery> continuousQueries = new ConcurrentLinkedQueue();
    private Set<ContinuousQueryDefinition> continuousQueryDefinitions = new LinkedHashSet();

    /* loaded from: input_file:org/springframework/data/gemfire/listener/ContinuousQueryListenerContainer$EventDispatcherAdapter.class */
    protected class EventDispatcherAdapter implements CqListener {
        private final ContinuousQueryListener listener;

        protected EventDispatcherAdapter(ContinuousQueryListener continuousQueryListener) {
            Assert.notNull(continuousQueryListener, "ContinuousQueryListener is required");
            this.listener = continuousQueryListener;
        }

        protected ContinuousQueryListener getListener() {
            return this.listener;
        }

        public void onError(CqEvent cqEvent) {
            ContinuousQueryListenerContainer.this.dispatchEvent(getListener(), cqEvent);
        }

        public void onEvent(CqEvent cqEvent) {
            ContinuousQueryListenerContainer.this.dispatchEvent(getListener(), cqEvent);
        }

        public void close() {
        }
    }

    public void afterPropertiesSet() {
        applyContinuousQueryListenerContainerConfigurers();
        validateQueryService(initQueryService(eagerlyInitializePool(resolvePoolName())));
        initExecutor();
        initContinuousQueries();
        this.initialized = true;
    }

    private void applyContinuousQueryListenerContainerConfigurers() {
        applyContinuousQueryListenerContainerConfigurers(getCompositeContinuousQueryListenerContainerConfigurer());
    }

    protected void applyContinuousQueryListenerContainerConfigurers(ContinuousQueryListenerContainerConfigurer... continuousQueryListenerContainerConfigurerArr) {
        applyContinuousQueryListenerContainerConfigurers(Arrays.asList((ContinuousQueryListenerContainerConfigurer[]) ArrayUtils.nullSafeArray(continuousQueryListenerContainerConfigurerArr, ContinuousQueryListenerContainerConfigurer.class)));
    }

    protected void applyContinuousQueryListenerContainerConfigurers(Iterable<ContinuousQueryListenerContainerConfigurer> iterable) {
        StreamSupport.stream(CollectionUtils.nullSafeIterable(iterable).spliterator(), false).forEach(continuousQueryListenerContainerConfigurer -> {
            continuousQueryListenerContainerConfigurer.configure(getBeanName(), this);
        });
    }

    @Nullable
    Pool resolvePool(String str) {
        return getPoolResolver().resolve(str);
    }

    String resolvePoolName() {
        return (String) Optional.ofNullable(getPoolName()).filter(StringUtils::hasText).orElseGet(() -> {
            return (String) Optional.ofNullable(getBeanFactory()).filter(beanFactory -> {
                return SpringExtensions.isMatchingBean(beanFactory, "gemfirePool", Pool.class);
            }).map(beanFactory2 -> {
                return "gemfirePool";
            }).orElse("DEFAULT");
        });
    }

    String eagerlyInitializePool(String str) {
        Supplier supplier = () -> {
            Assert.notNull(resolvePool(str), String.format("No Pool with name [%s] was found", str));
            return str;
        };
        return (String) Optional.ofNullable(getBeanFactory()).filter(beanFactory -> {
            return SpringExtensions.isMatchingBean(beanFactory, str, Pool.class);
        }).map(beanFactory2 -> {
            try {
                beanFactory2.getBean(str, Pool.class);
                return str;
            } catch (BeansException e) {
                return (String) supplier.get();
            }
        }).orElseGet(supplier);
    }

    QueryService initQueryService(String str) {
        QueryService queryService = getQueryService();
        if (queryService == null || StringUtils.hasText(str)) {
            setQueryService(DefaultableDelegatingPoolAdapter.from(DelegatingPoolAdapter.from(resolvePool(str))).preferPool().getQueryService(queryService));
        }
        return getQueryService();
    }

    private QueryService validateQueryService(QueryService queryService) {
        Assert.state(queryService != null, "QueryService is required");
        return queryService;
    }

    Executor initExecutor() {
        if (getTaskExecutor() == null) {
            setTaskExecutor(createDefaultTaskExecutor());
            this.manageExecutor = true;
        }
        return getTaskExecutor();
    }

    protected Executor createDefaultTaskExecutor() {
        return new SimpleAsyncTaskExecutor((String) Optional.ofNullable(getBeanName()).filter(StringUtils::hasText).map(str -> {
            return String.format("%s-", str);
        }).orElse(DEFAULT_THREAD_NAME_PREFIX));
    }

    private void initContinuousQueries() {
        initContinuousQueries(getContinuousQueryDefinitions());
    }

    private void initContinuousQueries(Set<ContinuousQueryDefinition> set) {
        stop();
        closeQueries();
        set.forEach(this::addContinuousQuery);
    }

    public boolean isActive() {
        return this.initialized;
    }

    public void setAutoStartup(boolean z) {
        this.autoStartup = z;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public synchronized boolean isRunning() {
        return this.running;
    }

    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
    }

    protected BeanFactory getBeanFactory() {
        return this.beanFactory;
    }

    public void setBeanName(String str) {
        this.beanName = str;
    }

    protected String getBeanName() {
        return this.beanName;
    }

    public void setCache(RegionService regionService) {
        setQueryService(regionService.getQueryService());
    }

    protected Queue<CqQuery> getContinuousQueries() {
        return this.continuousQueries;
    }

    protected Set<ContinuousQueryDefinition> getContinuousQueryDefinitions() {
        return this.continuousQueryDefinitions;
    }

    public void setContinuousQueryListenerContainerConfigurers(ContinuousQueryListenerContainerConfigurer... continuousQueryListenerContainerConfigurerArr) {
        setContinuousQueryListenerContainerConfigurers(Arrays.asList((ContinuousQueryListenerContainerConfigurer[]) ArrayUtils.nullSafeArray(continuousQueryListenerContainerConfigurerArr, ContinuousQueryListenerContainerConfigurer.class)));
    }

    public void setContinuousQueryListenerContainerConfigurers(List<ContinuousQueryListenerContainerConfigurer> list) {
        this.cqListenerContainerConfigurers = CollectionUtils.nullSafeList(list);
    }

    protected ContinuousQueryListenerContainerConfigurer getCompositeContinuousQueryListenerContainerConfigurer() {
        return this.compositeCqListenerContainerConfigurer;
    }

    public void setErrorHandler(ErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    public Optional<ErrorHandler> getErrorHandler() {
        return Optional.ofNullable(this.errorHandler);
    }

    public void setPhase(int i) {
        this.phase = i;
    }

    public int getPhase() {
        return this.phase;
    }

    public void setPoolName(String str) {
        this.poolName = str;
    }

    public String getPoolName() {
        return this.poolName;
    }

    public void setPoolResolver(PoolResolver poolResolver) {
        this.poolResolver = poolResolver;
    }

    public PoolResolver getPoolResolver() {
        return this.poolResolver != null ? this.poolResolver : DEFAULT_POOL_RESOLVER;
    }

    public void setQueryListeners(Set<ContinuousQueryDefinition> set) {
        getContinuousQueryDefinitions().clear();
        getContinuousQueryDefinitions().addAll(CollectionUtils.nullSafeSet(set));
    }

    public void setQueryService(QueryService queryService) {
        this.queryService = queryService;
    }

    public QueryService getQueryService() {
        return this.queryService;
    }

    public void setTaskExecutor(Executor executor) {
        this.taskExecutor = executor;
    }

    public Executor getTaskExecutor() {
        return this.taskExecutor;
    }

    public void addListener(ContinuousQueryDefinition continuousQueryDefinition) {
        CqQuery addContinuousQuery = addContinuousQuery(continuousQueryDefinition);
        if (isRunning()) {
            execute(addContinuousQuery);
        }
    }

    public boolean addContinuousQueryDefinition(ContinuousQueryDefinition continuousQueryDefinition) {
        Optional ofNullable = Optional.ofNullable(continuousQueryDefinition);
        Set<ContinuousQueryDefinition> continuousQueryDefinitions = getContinuousQueryDefinitions();
        Objects.requireNonNull(continuousQueryDefinitions);
        return ((Boolean) ofNullable.map((v1) -> {
            return r1.add(v1);
        }).orElse(false)).booleanValue();
    }

    CqQuery addContinuousQuery(ContinuousQueryDefinition continuousQueryDefinition) {
        try {
            CqAttributes cqAttributes = continuousQueryDefinition.toCqAttributes(this::newCqListener);
            return manage(continuousQueryDefinition.isNamed() ? newNamedContinuousQuery(continuousQueryDefinition, cqAttributes) : newUnnamedContinuousQuery(continuousQueryDefinition, cqAttributes));
        } catch (QueryException e) {
            throw new GemfireQueryException(String.format("Unable to create query [%s]", continuousQueryDefinition.getQuery()), e);
        }
    }

    protected CqListener newCqListener(ContinuousQueryListener continuousQueryListener) {
        return new EventDispatcherAdapter(continuousQueryListener);
    }

    private CqQuery newNamedContinuousQuery(ContinuousQueryDefinition continuousQueryDefinition, CqAttributes cqAttributes) throws QueryException {
        return getQueryService().newCq(continuousQueryDefinition.getName(), continuousQueryDefinition.getQuery(), cqAttributes, continuousQueryDefinition.isDurable());
    }

    private CqQuery newUnnamedContinuousQuery(ContinuousQueryDefinition continuousQueryDefinition, CqAttributes cqAttributes) throws CqException {
        return getQueryService().newCq(continuousQueryDefinition.getQuery(), cqAttributes, continuousQueryDefinition.isDurable());
    }

    private CqQuery manage(CqQuery cqQuery) {
        getContinuousQueries().add(cqQuery);
        return cqQuery;
    }

    public synchronized void start() {
        if (isRunning()) {
            return;
        }
        doStart();
        this.running = true;
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Started ContinuousQueryListenerContainer");
        }
    }

    void doStart() {
        getContinuousQueries().forEach(this::execute);
    }

    private void execute(CqQuery cqQuery) {
        try {
            cqQuery.execute();
        } catch (QueryException e) {
            throw new GemfireQueryException(String.format("Could not execute query [%1$s]; state is [%2$s]", cqQuery.getName(), cqQuery.getState()), e);
        }
    }

    protected void dispatchEvent(ContinuousQueryListener continuousQueryListener, CqEvent cqEvent) {
        getTaskExecutor().execute(() -> {
            notify(continuousQueryListener, cqEvent);
        });
    }

    private void notify(ContinuousQueryListener continuousQueryListener, CqEvent cqEvent) {
        try {
            continuousQueryListener.onEvent(cqEvent);
        } catch (Throwable th) {
            handleListenerError(th);
        }
    }

    private void handleListenerError(Throwable th) {
        getErrorHandler().filter(errorHandler -> {
            boolean isActive = isActive();
            if (!isActive && this.logger.isDebugEnabled()) {
                this.logger.debug("A CQ listener exception occurred after container shutdown; ErrorHandler will not be invoked", th);
            }
            return isActive;
        }).ifPresent(errorHandler2 -> {
            errorHandler2.handleError(th);
        });
        if (getErrorHandler().isPresent() || !this.logger.isWarnEnabled()) {
            return;
        }
        this.logger.warn("Execution of CQ listener failed; No ErrorHandler was configured", th);
    }

    public void stop(Runnable runnable) {
        stop();
        runnable.run();
    }

    public synchronized void stop() {
        if (isRunning()) {
            doStop();
            this.running = false;
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Stopped ContinuousQueryListenerContainer");
        }
    }

    void doStop() {
        getContinuousQueries().forEach(cqQuery -> {
            try {
                cqQuery.stop();
            } catch (Exception e) {
                if (this.logger.isWarnEnabled()) {
                    this.logger.warn(String.format("Cannot stop query [%1$s]; state is [%2$s]", cqQuery.getName(), cqQuery.getState()), e);
                }
            }
        });
    }

    public void destroy() {
        stop();
        closeQueries();
        destroyExecutor();
        this.initialized = false;
    }

    private void closeQueries() {
        getContinuousQueries().stream().filter(cqQuery -> {
            return !cqQuery.isClosed();
        }).forEach(cqQuery2 -> {
            try {
                cqQuery2.close();
            } catch (Exception e) {
                if (this.logger.isWarnEnabled()) {
                    this.logger.warn(String.format("Cannot close query [%1$s]; state is [%2$s]", cqQuery2.getName(), cqQuery2.getState()), e);
                }
            }
        });
        getContinuousQueries().clear();
    }

    private void destroyExecutor() {
        Optional filter = Optional.ofNullable(getTaskExecutor()).filter(executor -> {
            return this.manageExecutor;
        });
        Class<DisposableBean> cls = DisposableBean.class;
        Objects.requireNonNull(DisposableBean.class);
        filter.filter((v1) -> {
            return r1.isInstance(v1);
        }).ifPresent(executor2 -> {
            try {
                ((DisposableBean) executor2).destroy();
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Stopped internally-managed TaskExecutor {}", executor2);
                }
            } catch (Exception e) {
                this.logger.warn("Failed to properly destroy the managed TaskExecutor {}: {}", executor2, e.getMessage());
            }
        });
    }
}
