package org.springframework.data.gemfire.listener;

import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.geode.cache.RegionService;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqEvent;
import org.apache.geode.cache.query.CqListener;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.QueryException;
import org.apache.geode.cache.query.QueryService;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.data.gemfire.GemfireQueryException;
import org.springframework.data.gemfire.client.support.DefaultableDelegatingPoolAdapter;
import org.springframework.data.gemfire.client.support.DelegatingPoolAdapter;
import org.springframework.data.gemfire.config.xml.GemfireConstants;
import org.springframework.data.gemfire.util.CacheUtils;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ErrorHandler;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/data/gemfire/listener/ContinuousQueryListenerContainer.class */
public class ContinuousQueryListenerContainer implements BeanFactoryAware, BeanNameAware, InitializingBean, DisposableBean, SmartLifecycle {
    public static final String DEFAULT_THREAD_NAME_PREFIX = String.format("%s-", ClassUtils.getShortName(ContinuousQueryListenerContainer.class));
    private BeanFactory beanFactory;
    private ErrorHandler errorHandler;
    private Executor taskExecutor;
    private QueryService queryService;
    private String beanName;
    private String poolName;
    private boolean autoStartup = true;
    private volatile boolean initialized = false;
    private volatile boolean manageExecutor = false;
    private volatile boolean running = false;
    private int phase = Integer.MAX_VALUE;
    protected final Log logger = LogFactory.getLog(getClass());
    private Queue<CqQuery> continuousQueries = new ConcurrentLinkedQueue();
    private Set<ContinuousQueryDefinition> continuousQueryDefinitions = new LinkedHashSet();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/data/gemfire/listener/ContinuousQueryListenerContainer$EventDispatcherAdapter.class */
    public class EventDispatcherAdapter implements CqListener {
        private final ContinuousQueryListener delegate;

        private EventDispatcherAdapter(ContinuousQueryListener continuousQueryListener) {
            this.delegate = continuousQueryListener;
        }

        public void onError(CqEvent cqEvent) {
            ContinuousQueryListenerContainer.this.dispatchEvent(this.delegate, cqEvent);
        }

        public void onEvent(CqEvent cqEvent) {
            ContinuousQueryListenerContainer.this.dispatchEvent(this.delegate, cqEvent);
        }

        public void close() {
        }
    }

    public void afterPropertiesSet() {
        initQueryService(eagerlyInitializePool(resolvePoolName()));
        initExecutor();
        initContinuousQueries(this.continuousQueryDefinitions);
        Assert.state(this.queryService != null, "QueryService was not properly initialized");
        this.initialized = true;
    }

    String resolvePoolName() {
        String str = this.poolName;
        if (!StringUtils.hasText(str)) {
            str = (this.beanFactory == null || !this.beanFactory.containsBean(GemfireConstants.DEFAULT_GEMFIRE_POOL_NAME)) ? CacheUtils.DEFAULT_POOL_NAME : GemfireConstants.DEFAULT_GEMFIRE_POOL_NAME;
        }
        return str;
    }

    String eagerlyInitializePool(String str) {
        try {
            if (this.beanFactory != null && this.beanFactory.isTypeMatch(str, Pool.class)) {
                this.beanFactory.getBean(str, Pool.class);
            }
        } catch (BeansException e) {
            Assert.notNull(PoolManager.find(str), String.format("No GemFire Pool with name [%s] was found", str));
        }
        return str;
    }

    QueryService initQueryService(String str) {
        if (this.queryService == null || StringUtils.hasText(str)) {
            this.queryService = DefaultableDelegatingPoolAdapter.from(DelegatingPoolAdapter.from(PoolManager.find(str))).preferPool().getQueryService(this.queryService);
        }
        return this.queryService;
    }

    Executor initExecutor() {
        if (this.taskExecutor == null) {
            this.taskExecutor = createDefaultTaskExecutor();
            this.manageExecutor = true;
        }
        return this.taskExecutor;
    }

    protected TaskExecutor createDefaultTaskExecutor() {
        return new SimpleAsyncTaskExecutor(this.beanName != null ? String.format("%s-", this.beanName) : DEFAULT_THREAD_NAME_PREFIX);
    }

    private void initContinuousQueries(Set<ContinuousQueryDefinition> set) {
        if (isRunning()) {
            stop();
        }
        closeQueries();
        Iterator<ContinuousQueryDefinition> it = set.iterator();
        while (it.hasNext()) {
            addContinuousQuery(it.next());
        }
    }

    public synchronized void start() {
        if (isRunning()) {
            return;
        }
        doStart();
        this.running = true;
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Started ContinuousQueryListenerContainer");
        }
    }

    private void doStart() {
        Iterator<CqQuery> it = this.continuousQueries.iterator();
        while (it.hasNext()) {
            executeQuery(it.next());
        }
    }

    private void executeQuery(CqQuery cqQuery) {
        try {
            cqQuery.execute();
        } catch (QueryException e) {
            throw new GemfireQueryException(String.format("Could not execute query [%1$s]; state is [%2$s].", cqQuery.getName(), cqQuery.getState()), e);
        }
    }

    public void stop(Runnable runnable) {
        stop();
        runnable.run();
    }

    public synchronized void stop() {
        if (isRunning()) {
            doStop();
            this.running = false;
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Stopped ContinuousQueryListenerContainer");
        }
    }

    private void doStop() {
        for (CqQuery cqQuery : this.continuousQueries) {
            try {
                cqQuery.stop();
            } catch (Exception e) {
                this.logger.warn(String.format("Cannot stop query '%1$s'; state is '%2$s.", cqQuery.getName(), cqQuery.getState()), e);
            }
        }
    }

    public void destroy() throws Exception {
        stop();
        closeQueries();
        destroyExecutor();
        this.initialized = false;
    }

    private void closeQueries() {
        for (CqQuery cqQuery : this.continuousQueries) {
            try {
                if (!cqQuery.isClosed()) {
                    cqQuery.close();
                }
            } catch (Exception e) {
                this.logger.warn(String.format("Cannot close query '%1$s'; state is '%2$s.", cqQuery.getName(), cqQuery.getState()), e);
            }
        }
        this.continuousQueries.clear();
    }

    private void destroyExecutor() throws Exception {
        if (this.manageExecutor && (this.taskExecutor instanceof DisposableBean)) {
            this.taskExecutor.destroy();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Stopped internally-managed Task Executor.");
            }
        }
    }

    public final boolean isActive() {
        return this.initialized;
    }

    public void setAutoStartup(boolean z) {
        this.autoStartup = z;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public synchronized boolean isRunning() {
        return this.running;
    }

    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
    }

    public void setBeanName(String str) {
        this.beanName = str;
    }

    public void setCache(RegionService regionService) {
        setQueryService(regionService.getQueryService());
    }

    public void setErrorHandler(ErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    public void setPhase(int i) {
        this.phase = i;
    }

    public int getPhase() {
        return this.phase;
    }

    public void setPoolName(String str) {
        this.poolName = str;
    }

    public void setQueryListeners(Set<ContinuousQueryDefinition> set) {
        this.continuousQueryDefinitions.clear();
        this.continuousQueryDefinitions.addAll(set);
    }

    public void setQueryService(QueryService queryService) {
        this.queryService = queryService;
    }

    public void setTaskExecutor(Executor executor) {
        this.taskExecutor = executor;
    }

    public void addListener(ContinuousQueryDefinition continuousQueryDefinition) {
        doAddListener(continuousQueryDefinition);
    }

    private void doAddListener(ContinuousQueryDefinition continuousQueryDefinition) {
        CqQuery addContinuousQuery = addContinuousQuery(continuousQueryDefinition);
        if (isRunning()) {
            executeQuery(addContinuousQuery);
        }
    }

    private CqQuery addContinuousQuery(ContinuousQueryDefinition continuousQueryDefinition) {
        try {
            CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
            cqAttributesFactory.addCqListener(new EventDispatcherAdapter(continuousQueryDefinition.getListener()));
            CqAttributes create = cqAttributesFactory.create();
            CqQuery newCq = StringUtils.hasText(continuousQueryDefinition.getName()) ? this.queryService.newCq(continuousQueryDefinition.getName(), continuousQueryDefinition.getQuery(), create, continuousQueryDefinition.isDurable()) : this.queryService.newCq(continuousQueryDefinition.getQuery(), create, continuousQueryDefinition.isDurable());
            this.continuousQueries.add(newCq);
            return newCq;
        } catch (QueryException e) {
            throw new GemfireQueryException("Cannot create query ", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void dispatchEvent(final ContinuousQueryListener continuousQueryListener, final CqEvent cqEvent) {
        this.taskExecutor.execute(new Runnable() { // from class: org.springframework.data.gemfire.listener.ContinuousQueryListenerContainer.1
            @Override // java.lang.Runnable
            public void run() {
                ContinuousQueryListenerContainer.this.executeListener(continuousQueryListener, cqEvent);
            }
        });
    }

    protected void executeListener(ContinuousQueryListener continuousQueryListener, CqEvent cqEvent) {
        try {
            continuousQueryListener.onEvent(cqEvent);
        } catch (Throwable th) {
            handleListenerException(th);
        }
    }

    protected void handleListenerException(Throwable th) {
        if (isActive()) {
            invokeErrorHandler(th);
        } else {
            this.logger.debug("Listener exception after container shutdown", th);
        }
    }

    protected void invokeErrorHandler(Throwable th) {
        if (this.errorHandler != null) {
            this.errorHandler.handleError(th);
        } else if (this.logger.isWarnEnabled()) {
            this.logger.warn("Execution of the CQ event listener failed, and no ErrorHandler has been set.", th);
        }
    }
}
