package org.springframework.data.gemfire.listener;

import com.gemstone.gemfire.cache.RegionService;
import com.gemstone.gemfire.cache.client.Pool;
import com.gemstone.gemfire.cache.client.PoolManager;
import com.gemstone.gemfire.cache.query.CqAttributes;
import com.gemstone.gemfire.cache.query.CqAttributesFactory;
import com.gemstone.gemfire.cache.query.CqEvent;
import com.gemstone.gemfire.cache.query.CqListener;
import com.gemstone.gemfire.cache.query.CqQuery;
import com.gemstone.gemfire.cache.query.QueryException;
import com.gemstone.gemfire.cache.query.QueryService;
import com.gemstone.gemfire.internal.concurrent.ConcurrentHashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.data.gemfire.GemfireQueryException;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ErrorHandler;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/data/gemfire/listener/ContinuousQueryListenerContainer.class */
public class ContinuousQueryListenerContainer implements InitializingBean, DisposableBean, BeanNameAware, SmartLifecycle {
    public static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(ContinuousQueryListenerContainer.class) + "-";
    private Executor subscriptionExecutor;
    private Executor taskExecutor;
    private String beanName;
    private ErrorHandler errorHandler;
    private QueryService queryService;
    private String poolName;
    protected final Log logger = LogFactory.getLog(getClass());
    private volatile boolean running = false;
    private volatile boolean initialized = false;
    private volatile boolean manageExecutor = false;
    private Set<ContinuousQueryDefinition> defs = new LinkedHashSet();
    private Set<CqQuery> queries = new ConcurrentHashSet();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/data/gemfire/listener/ContinuousQueryListenerContainer$EventDispatcherAdapter.class */
    public class EventDispatcherAdapter implements CqListener {
        private final ContinuousQueryListener delegate;

        EventDispatcherAdapter(ContinuousQueryListener continuousQueryListener) {
            this.delegate = continuousQueryListener;
        }

        public void onError(CqEvent cqEvent) {
            ContinuousQueryListenerContainer.this.dispatchEvent(this.delegate, cqEvent);
        }

        public void onEvent(CqEvent cqEvent) {
            ContinuousQueryListenerContainer.this.dispatchEvent(this.delegate, cqEvent);
        }

        public void close() {
        }
    }

    public void afterPropertiesSet() {
        if (this.taskExecutor == null) {
            this.manageExecutor = true;
            this.taskExecutor = createDefaultTaskExecutor();
        }
        if (this.subscriptionExecutor == null) {
            this.subscriptionExecutor = this.taskExecutor;
        }
        if (StringUtils.hasText(this.poolName)) {
            Pool find = PoolManager.find(this.poolName);
            Assert.notNull(find, "No pool named [" + this.poolName + "] found");
            this.queryService = find.getQueryService();
        }
        initMapping(this.defs);
        this.initialized = true;
        start();
    }

    protected TaskExecutor createDefaultTaskExecutor() {
        return new SimpleAsyncTaskExecutor(this.beanName != null ? this.beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
    }

    public void destroy() throws Exception {
        this.initialized = false;
        stop();
        closeQueries();
        if (this.manageExecutor && (this.taskExecutor instanceof DisposableBean)) {
            this.taskExecutor.destroy();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Stopped internally-managed task executor");
            }
        }
    }

    public boolean isAutoStartup() {
        return true;
    }

    public void stop(Runnable runnable) {
        stop();
        runnable.run();
    }

    public int getPhase() {
        return Integer.MAX_VALUE;
    }

    public boolean isRunning() {
        return this.running;
    }

    public void start() {
        if (this.running) {
            return;
        }
        this.running = true;
        doStart();
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Started ContinousQueryListenerContainer");
        }
    }

    public void stop() {
        if (this.running) {
            this.running = false;
            doStop();
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Stopped ContinousQueryListenerContainer");
        }
    }

    private void doStart() {
        Iterator<CqQuery> it = this.queries.iterator();
        while (it.hasNext()) {
            executeQuery(it.next());
        }
    }

    private void doStop() {
        Iterator<CqQuery> it = this.queries.iterator();
        while (it.hasNext()) {
            try {
                it.next().stop();
            } catch (QueryException e) {
                this.logger.warn("Cannot stop query", e);
            } catch (RuntimeException e2) {
                this.logger.warn("Cannot stop query", e2);
            }
        }
    }

    private void closeQueries() {
        for (CqQuery cqQuery : this.queries) {
            try {
                if (!cqQuery.isClosed()) {
                    cqQuery.close();
                }
            } catch (RuntimeException e) {
                this.logger.warn("Cannot close query", e);
            } catch (QueryException e2) {
                this.logger.warn("Cannot close query", e2);
            }
        }
        this.queries.clear();
    }

    protected void executeListener(ContinuousQueryListener continuousQueryListener, CqEvent cqEvent) {
        try {
            continuousQueryListener.onEvent(cqEvent);
        } catch (Throwable th) {
            handleListenerException(th);
        }
    }

    public final boolean isActive() {
        return this.initialized;
    }

    protected void handleListenerException(Throwable th) {
        if (isActive()) {
            invokeErrorHandler(th);
        } else {
            this.logger.debug("Listener exception after container shutdown", th);
        }
    }

    protected void invokeErrorHandler(Throwable th) {
        if (this.errorHandler != null) {
            this.errorHandler.handleError(th);
        } else if (this.logger.isWarnEnabled()) {
            this.logger.warn("Execution of the CQ event listener failed, and no ErrorHandler has been set.", th);
        }
    }

    public void setBeanName(String str) {
        this.beanName = str;
    }

    public void setTaskExecutor(Executor executor) {
        this.taskExecutor = executor;
    }

    public void setErrorHandler(ErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    public void setCache(RegionService regionService) {
        this.queryService = regionService.getQueryService();
    }

    public void setQueryService(QueryService queryService) {
        this.queryService = queryService;
    }

    public void setPoolName(String str) {
        this.poolName = str;
    }

    public void setQueryListeners(Set<ContinuousQueryDefinition> set) {
        this.defs.clear();
        this.defs.addAll(set);
    }

    public void addListener(ContinuousQueryDefinition continuousQueryDefinition) {
        doAddListener(continuousQueryDefinition);
    }

    private void initMapping(Set<ContinuousQueryDefinition> set) {
        if (isRunning()) {
            stop();
        }
        closeQueries();
        Iterator<ContinuousQueryDefinition> it = set.iterator();
        while (it.hasNext()) {
            addCQuery(it.next());
        }
        if (this.initialized) {
            start();
        }
    }

    private void doAddListener(ContinuousQueryDefinition continuousQueryDefinition) {
        CqQuery addCQuery = addCQuery(continuousQueryDefinition);
        if (isRunning()) {
            executeQuery(addCQuery);
        }
    }

    private CqQuery addCQuery(ContinuousQueryDefinition continuousQueryDefinition) {
        try {
            CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
            cqAttributesFactory.addCqListener(new EventDispatcherAdapter(continuousQueryDefinition.getListener()));
            CqAttributes create = cqAttributesFactory.create();
            CqQuery newCq = StringUtils.hasText(continuousQueryDefinition.getName()) ? this.queryService.newCq(continuousQueryDefinition.getName(), continuousQueryDefinition.getQuery(), create, continuousQueryDefinition.isDurable()) : this.queryService.newCq(continuousQueryDefinition.getQuery(), create, continuousQueryDefinition.isDurable());
            this.queries.add(newCq);
            return newCq;
        } catch (QueryException e) {
            throw new GemfireQueryException("Cannot create query ", e);
        } catch (RuntimeException e2) {
            throw new GemfireQueryException("Cannot create query ", e2);
        }
    }

    private void executeQuery(CqQuery cqQuery) {
        try {
            cqQuery.execute();
        } catch (RuntimeException e) {
            throw new GemfireQueryException("Cannot execute query", e);
        } catch (QueryException e2) {
            throw new GemfireQueryException("Cannot execute query", e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void dispatchEvent(final ContinuousQueryListener continuousQueryListener, final CqEvent cqEvent) {
        this.taskExecutor.execute(new Runnable() { // from class: org.springframework.data.gemfire.listener.ContinuousQueryListenerContainer.1
            @Override // java.lang.Runnable
            public void run() {
                ContinuousQueryListenerContainer.this.executeListener(continuousQueryListener, cqEvent);
            }
        });
    }
}
