package com.gemstone.gemfire.internal.cache.wan;

import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.client.ServerConnectivityException;
import com.gemstone.gemfire.cache.client.ServerOperationException;
import com.gemstone.gemfire.cache.client.internal.Connection;
import com.gemstone.gemfire.cache.client.internal.ServerProxy;
import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedException;
import com.gemstone.gemfire.cache.wan.GatewaySender;
import com.gemstone.gemfire.distributed.internal.ServerLocation;
import com.gemstone.gemfire.i18n.LogWriterI18n;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor;
import com.gemstone.gemfire.internal.cache.wan.serial.SerialGatewaySenderImpl;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.security.GemFireSecurityException;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/* loaded from: input_file:WEB-INF/lib/gemfire-7.0.jar:com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.class */
public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDispatcher {
    private final AbstractGatewaySenderEventProcessor processor;
    private volatile Connection connection;
    private final LogWriterI18n logger;
    private final AbstractGatewaySender sender;
    private AckReaderThread ackReaderThread;
    private final Set<String> notFoundRegions = new HashSet();
    private final Object notFoundRegionsSync = new Object();
    private ReentrantReadWriteLock connectionLifeCycleLock = new ReentrantReadWriteLock();
    private int failedConnectCount = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/gemfire-7.0.jar:com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher$AckReaderThread.class */
    public class AckReaderThread extends Thread {
        private Object runningStateLock;
        private volatile boolean shutdown;
        private final GemFireCacheImpl cache;
        private volatile boolean ackReaderThreadRunning;

        public AckReaderThread(GatewaySender gatewaySender) {
            super("AckReaderThread for : " + gatewaySender.getId());
            this.runningStateLock = new Object();
            this.shutdown = false;
            this.ackReaderThreadRunning = false;
            setDaemon(true);
            this.cache = (GemFireCacheImpl) ((AbstractGatewaySender) gatewaySender).getCache();
        }

        public void waitForRunningAckReaderThreadRunningState() {
            while (!this.ackReaderThreadRunning) {
                synchronized (this.runningStateLock) {
                    try {
                        this.runningStateLock.wait();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            }
        }

        private boolean checkCancelled() {
            return this.shutdown || this.cache.getCancelCriterion().cancelInProgress() != null;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            this.cache.getDistributedSystem();
            int i = -1;
            if (GatewaySenderEventRemoteDispatcher.this.logger.fineEnabled()) {
                GatewaySenderEventRemoteDispatcher.this.logger.fine("AckReaderThread started.. ");
            }
            synchronized (this.runningStateLock) {
                this.ackReaderThreadRunning = true;
                this.runningStateLock.notifyAll();
            }
            while (!checkCancelled()) {
                try {
                    try {
                        try {
                            GatewayAck readAcknowledgement = GatewaySenderEventRemoteDispatcher.this.readAcknowledgement(i);
                            if (readAcknowledgement != null) {
                                boolean z = readAcknowledgement.getBatchException() != null;
                                int batchId = readAcknowledgement.getBatchId();
                                i = batchId;
                                int numEvents = readAcknowledgement.getNumEvents();
                                if (z) {
                                    GatewaySenderEventRemoteDispatcher.this.logger.info(LocalizedStrings.GatewaySenderEventRemoteDispatcher_GATEWAY_SENDER_0_RECEIVED_ACK_FOR_BATCH_ID_1_WITH_EXCEPTION, new Object[]{GatewaySenderEventRemoteDispatcher.this.processor.getSender(), Integer.valueOf(readAcknowledgement.getBatchId())}, (Throwable) readAcknowledgement.getBatchException());
                                } else {
                                    GatewaySenderEventRemoteDispatcher.this.logger.info(LocalizedStrings.GatewaySenderEventRemoteDispatcher_GATEWAY_SENDER_0_RECEIVED_ACK_FOR_BATCH_ID_1_OF_2_EVENTS, new Object[]{GatewaySenderEventRemoteDispatcher.this.processor.getSender(), Integer.valueOf(readAcknowledgement.getBatchId()), Integer.valueOf(readAcknowledgement.getNumEvents())});
                                }
                                if (z) {
                                    GatewaySenderEventRemoteDispatcher.this.sender.getStatistics().incBatchesRedistributed();
                                    if (GatewaySenderEventRemoteDispatcher.this.sender.isRemoveFromQueueOnException()) {
                                        logBatchExceptions(readAcknowledgement.getBatchException());
                                        List<GatewaySenderEventImpl> list = GatewaySenderEventRemoteDispatcher.this.processor.getBatchIdToEventsMap().get(Integer.valueOf(readAcknowledgement.getBatchException().getBatchId()));
                                        if (list != null) {
                                            GatewaySenderEventRemoteDispatcher.this.processor.handleSuccessBatchAck(batchId, list.size());
                                        }
                                    } else {
                                        logBatchExceptions(readAcknowledgement.getBatchException());
                                        List<BatchException70> exceptions = readAcknowledgement.getBatchException().getExceptions();
                                        for (int i2 = 0; i2 < exceptions.get(0).getIndex(); i2++) {
                                            GatewaySenderEventRemoteDispatcher.this.processor.eventQueueRemove();
                                        }
                                        GatewaySenderEventRemoteDispatcher.this.processor.handleException();
                                    }
                                } else {
                                    GatewaySenderEventRemoteDispatcher.this.processor.handleSuccessBatchAck(batchId, numEvents);
                                }
                            } else {
                                if (GatewaySenderEventRemoteDispatcher.this.logger.fineEnabled()) {
                                    GatewaySenderEventRemoteDispatcher.this.logger.fine(GatewaySenderEventRemoteDispatcher.this.processor.getSender() + ": Received null ack from remote site.");
                                }
                                GatewaySenderEventRemoteDispatcher.this.processor.handleException();
                            }
                        } catch (Exception e) {
                            GatewaySenderEventRemoteDispatcher.this.logger.severe(LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH, (Throwable) e);
                            GatewaySenderEventRemoteDispatcher.this.sender.lifeCycleLock.writeLock().lock();
                            try {
                                GatewaySenderEventRemoteDispatcher.this.processor.stopProcessing();
                                GatewaySenderEventRemoteDispatcher.this.sender.lifeCycleLock.writeLock().unlock();
                                if (GatewaySenderEventRemoteDispatcher.this.logger.fineEnabled()) {
                                    GatewaySenderEventRemoteDispatcher.this.logger.fine("AckReaderThread exiting. ");
                                }
                                this.ackReaderThreadRunning = false;
                                return;
                            } finally {
                            }
                        }
                    } catch (Throwable th) {
                        if (GatewaySenderEventRemoteDispatcher.this.logger.fineEnabled()) {
                            GatewaySenderEventRemoteDispatcher.this.logger.fine("AckReaderThread exiting. ");
                        }
                        this.ackReaderThreadRunning = false;
                        throw th;
                    }
                } catch (Exception e2) {
                    GatewaySenderEventRemoteDispatcher.this.logger.severe(LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH, (Throwable) e2);
                    GatewaySenderEventRemoteDispatcher.this.sender.lifeCycleLock.writeLock().lock();
                    try {
                        GatewaySenderEventRemoteDispatcher.this.processor.stopProcessing();
                        GatewaySenderEventRemoteDispatcher.this.sender.lifeCycleLock.writeLock().unlock();
                    } finally {
                    }
                }
            }
            if (GatewaySenderEventRemoteDispatcher.this.logger.fineEnabled()) {
                GatewaySenderEventRemoteDispatcher.this.logger.fine("AckReaderThread exiting. ");
            }
            this.ackReaderThreadRunning = false;
        }

        private void logBatchExceptions(BatchException70 batchException70) {
            for (BatchException70 batchException702 : batchException70.getExceptions()) {
                boolean z = true;
                if (batchException702.getCause() instanceof RegionDestroyedException) {
                    RegionDestroyedException regionDestroyedException = (RegionDestroyedException) batchException702.getCause();
                    synchronized (GatewaySenderEventRemoteDispatcher.this.notFoundRegionsSync) {
                        if (GatewaySenderEventRemoteDispatcher.this.notFoundRegions.contains(regionDestroyedException.getRegionFullPath())) {
                            z = false;
                        } else {
                            GatewaySenderEventRemoteDispatcher.this.notFoundRegions.add(regionDestroyedException.getRegionFullPath());
                        }
                    }
                }
                if (z) {
                    GatewaySenderEventRemoteDispatcher.this.logger.warning(LocalizedStrings.GatewayEventRemoteDispatcher_A_BATCHEXCEPTION_OCCURRED_PROCESSING_EVENT__0, Integer.valueOf(batchException702.getIndex()), batchException702);
                }
                List<GatewaySenderEventImpl> list = GatewaySenderEventRemoteDispatcher.this.processor.getBatchIdToEventsMap().get(Integer.valueOf(batchException702.getBatchId()));
                if (list != null) {
                    GatewaySenderEventRemoteDispatcher.this.logger.fine("The events in the batch : " + batchException702.getBatchId() + " are " + list);
                    GatewaySenderEventImpl gatewaySenderEventImpl = list.get(batchException702.getIndex());
                    if (z) {
                        GatewaySenderEventRemoteDispatcher.this.logger.warning(LocalizedStrings.GatewayEventRemoteDispatcher_THE_EVENT_BEING_PROCESSED_WHEN_THE_BATCHEXCEPTION_OCCURRED_WAS__0, gatewaySenderEventImpl);
                    }
                }
            }
        }

        boolean isRunning() {
            return this.ackReaderThreadRunning;
        }

        public void shutdown() {
            if (GatewaySenderEventRemoteDispatcher.this.connection != null && !GatewaySenderEventRemoteDispatcher.this.connection.isDestroyed()) {
                GatewaySenderEventRemoteDispatcher.this.connection.destroy();
                GatewaySenderEventRemoteDispatcher.this.sender.getProxy().returnConnection(GatewaySenderEventRemoteDispatcher.this.connection);
            }
            this.shutdown = true;
            interrupt();
            boolean interrupted = Thread.interrupted();
            try {
                join(15000L);
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
            } catch (InterruptedException e) {
                if (1 != 0) {
                    Thread.currentThread().interrupt();
                }
            } catch (Throwable th) {
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
                throw th;
            }
            if (isAlive()) {
                GatewaySenderEventRemoteDispatcher.this.logger.warning(LocalizedStrings.GatewaySender_ACKREADERTHREAD_IGNORED_CANCELLATION);
            }
        }
    }

    /* loaded from: input_file:WEB-INF/lib/gemfire-7.0.jar:com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher$GatewayAck.class */
    public static class GatewayAck {
        private int batchId;
        private int numEvents;
        private BatchException70 be;

        public GatewayAck(BatchException70 batchException70, int i) {
            this.be = batchException70;
            this.batchId = i;
        }

        public GatewayAck(int i, int i2) {
            this.batchId = i;
            this.numEvents = i2;
        }

        public int getNumEvents() {
            return this.numEvents;
        }

        public int getBatchId() {
            return this.batchId;
        }

        public BatchException70 getBatchException() {
            return this.be;
        }
    }

    public GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor abstractGatewaySenderEventProcessor) {
        this.processor = abstractGatewaySenderEventProcessor;
        this.sender = abstractGatewaySenderEventProcessor.getSender();
        this.logger = abstractGatewaySenderEventProcessor.getLogger();
        try {
            initializeConnection();
        } catch (GatewaySenderException e) {
            if (e.getCause() instanceof GemFireSecurityException) {
                throw e;
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    protected GatewayAck readAcknowledgement(int i) {
        Exception exc;
        ServerProxy serverProxy = new ServerProxy(this.processor.getSender().getProxy());
        this.connection = getConnection();
        GatewayAck gatewayAck = null;
        try {
            if (this.logger.fineEnabled()) {
                this.logger.fine(" Receiving ack on the thread " + this.connection);
            }
            this.connectionLifeCycleLock.readLock().lock();
            try {
                if (this.connection != null) {
                    gatewayAck = (GatewayAck) serverProxy.receiveAckFromReceiver(this.connection);
                }
                this.connectionLifeCycleLock.readLock().unlock();
            } catch (Throwable th) {
                this.connectionLifeCycleLock.readLock().unlock();
                throw th;
            }
        } catch (Exception e) {
            Throwable cause = e.getCause();
            if (cause instanceof BatchException70) {
                exc = (BatchException70) cause;
            } else {
                exc = e;
                destroyConnection();
            }
            if (this.sender.getProxy() != null && !this.sender.getProxy().isDestroyed()) {
                if (!(exc instanceof IOException) && !(exc instanceof ServerConnectivityException) && !(exc instanceof ConnectionDestroyedException)) {
                    if (!(exc instanceof CancelException)) {
                        this.logger.severe(LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH, (Throwable) exc);
                    }
                    this.processor.setIsStopped(true);
                } else if (this.processor.getLogger().fineEnabled()) {
                    this.processor.logBatchFineIOException("Because of IOException, failed to get Ack for the following ", this.processor.getBatchIdToEventsMap().get(Integer.valueOf(i + 1)), i + 1);
                }
            }
        }
        return gatewayAck;
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventDispatcher
    public boolean dispatchBatch(List list, boolean z) {
        GatewaySenderStats statistics = this.sender.getStatistics();
        boolean z2 = false;
        try {
            long startTime = statistics.startTime();
            z2 = _dispatchBatch(list);
            statistics.endBatch(startTime, list.size());
        } catch (CancelException e) {
            if (this.logger.fineEnabled()) {
                this.logger.fine("Stopping the processor because cancellation occurred while processing a batch");
            }
            this.processor.setIsStopped(true);
            throw e;
        } catch (GatewaySenderException e2) {
            Throwable cause = e2.getCause();
            if (this.sender.getProxy() != null && !this.sender.getProxy().isDestroyed()) {
                if ((cause instanceof IOException) || (cause instanceof ServerConnectivityException) || (cause instanceof ConnectionDestroyedException)) {
                    this.processor.handleException();
                    if (this.processor.getLogger().fineEnabled()) {
                        this.processor.logBatchFine("Because of IOException, failed to dispatch the following ", list);
                    }
                } else {
                    this.logger.severe(LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH, (Throwable) e2);
                    this.processor.setIsStopped(true);
                }
            }
        } catch (Exception e3) {
            this.processor.setIsStopped(true);
            this.logger.severe(LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH, (Throwable) e3);
        }
        return z2;
    }

    /* JADX WARN: Finally extract failed */
    private boolean _dispatchBatch(List list) {
        Throwable th;
        int batchId = this.processor.getBatchId();
        this.connection = getConnection();
        if (batchId != this.processor.getBatchId() || this.processor.isConnectionReset()) {
            return false;
        }
        try {
            ServerProxy serverProxy = new ServerProxy(this.sender.getProxy());
            this.connectionLifeCycleLock.readLock().lock();
            try {
                if (this.connection == null) {
                    throw new ConnectionDestroyedException();
                }
                serverProxy.dispatchBatch_NewWAN(this.connection, list, batchId, this.sender.isRemoveFromQueueOnException());
                if (this.logger.fineEnabled()) {
                    this.logger.fine(this.processor.getSender() + ": Dispatched batch (id=" + batchId + ") of " + list.size() + " events, queue size: " + this.processor.getQueue().size() + " on connection " + this.connection);
                }
                this.connectionLifeCycleLock.readLock().unlock();
                return true;
            } catch (Throwable th2) {
                this.connectionLifeCycleLock.readLock().unlock();
                throw th2;
            }
        } catch (ServerOperationException e) {
            Throwable cause = e.getCause();
            if (cause instanceof BatchException70) {
                th = (BatchException70) cause;
            } else {
                th = e;
                destroyConnection();
            }
            throw new GatewaySenderException(LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString(this, Integer.valueOf(batchId), this.connection), th);
        } catch (Exception e2) {
            Throwable cause2 = e2.getCause();
            Exception exc = cause2 instanceof IOException ? (IOException) cause2 : e2;
            destroyConnection();
            throw new GatewaySenderException(LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString(this, Integer.valueOf(batchId), this.connection), exc);
        }
    }

    public Connection getConnection() throws GatewaySenderException {
        if (this.sender.isParallel()) {
            if (this.connection == null) {
                initializeConnection();
            }
        } else if (this.connection == null || !this.connection.getServer().equals(this.sender.getServerLocation())) {
            if (this.logger.fineEnabled()) {
                this.logger.fine("Initializing new connection as serverlocation of old connection is : " + (this.connection == null ? "null" : this.connection.getServer()) + " and the serverlocation to connect is" + this.sender.getServerLocation());
            }
            initializeConnection();
        }
        Cache cache = this.sender.getCache();
        if (cache != null && !cache.isClosed() && this.sender.isPrimary() && this.connection != null && (this.ackReaderThread == null || !this.ackReaderThread.isRunning())) {
            this.ackReaderThread = new AckReaderThread(this.sender);
            this.ackReaderThread.start();
            this.ackReaderThread.waitForRunningAckReaderThreadRunningState();
        }
        return this.connection;
    }

    public boolean isConnected() {
        return this.connection != null;
    }

    public void destroyConnection() {
        this.connectionLifeCycleLock.writeLock().lock();
        try {
            Connection connection = this.connection;
            if (connection != null) {
                if (!connection.isDestroyed()) {
                    connection.destroy();
                    this.sender.getProxy().returnConnection(connection);
                }
                this.connection = null;
                this.sender.setServerLocation(null);
            }
        } finally {
            this.connectionLifeCycleLock.writeLock().unlock();
        }
    }

    private void initializeConnection() throws GatewaySenderException, GemFireSecurityException {
        String localizedString;
        Connection acquireConnection;
        this.connectionLifeCycleLock.writeLock().lock();
        try {
            if (this.sender.getProxy() == null || this.sender.getProxy().isDestroyed()) {
                this.sender.initProxy();
            } else {
                this.processor.resetBatchId();
            }
            try {
                if (this.sender.isParallel()) {
                    acquireConnection = this.sender.getProxy().acquireConnection();
                    this.sender.setServerLocation(acquireConnection.getServer());
                } else {
                    synchronized (((SerialGatewaySenderImpl) this.sender).getLockForConcurrentDispatcher()) {
                        if (this.sender.getServerLocation() != null) {
                            if (this.logger.fineEnabled()) {
                                this.logger.fine("ServerLocation is: " + this.sender.getServerLocation() + ". Connecting to this serverLocation...");
                            }
                            acquireConnection = this.sender.getProxy().acquireConnection(this.sender.getServerLocation());
                        } else {
                            if (this.logger.fineEnabled()) {
                                this.logger.fine("ServerLocation is null. Creating new connection. ");
                            }
                            acquireConnection = this.sender.getProxy().acquireConnection();
                            if (this.sender.isPrimary()) {
                                if (this.sender.getServerLocation() == null) {
                                    this.sender.setServerLocation(acquireConnection.getServer());
                                }
                                new UpdateAttributesProcessor(this.sender).distribute(false);
                            }
                        }
                    }
                }
                if (this.failedConnectCount > 0) {
                    this.logger.info(LocalizedStrings.GatewayEventRemoteDispatcher_0_USING_1_AFTER_2_FAILED_CONNECT_ATTEMPTS, new Object[]{this.processor.getSender().getId(), acquireConnection, Integer.valueOf(this.failedConnectCount)});
                    this.failedConnectCount = 0;
                } else {
                    this.logger.info(LocalizedStrings.GatewayEventRemoteDispatcher_0_USING_1, new Object[]{this.processor.getSender().getId(), acquireConnection});
                }
                this.connection = acquireConnection;
                this.connectionLifeCycleLock.writeLock().unlock();
            } catch (ServerConnectivityException e) {
                this.failedConnectCount++;
                if (e.getCause() instanceof GemFireSecurityException) {
                    Throwable cause = e.getCause();
                    if (this.failedConnectCount == 1) {
                        this.logger.warning(LocalizedStrings.GatewayEventRemoteDispatcher_0_COULD_NOT_CONNECT_1, new Object[]{this.processor.getSender().getId(), cause.getMessage()});
                    }
                    throw new GatewaySenderException(cause);
                }
                List<ServerLocation> currentServers = this.sender.getProxy().getCurrentServers();
                if (currentServers.size() == 0) {
                    localizedString = LocalizedStrings.GatewayEventRemoteDispatcher_THERE_ARE_NO_ACTIVE_SERVERS.toLocalizedString();
                } else {
                    StringBuilder sb = new StringBuilder();
                    Iterator<ServerLocation> it = currentServers.iterator();
                    while (it.hasNext()) {
                        String valueOf = String.valueOf(it.next());
                        if (sb.length() > 0) {
                            sb.append(", ");
                        }
                        sb.append(valueOf);
                    }
                    localizedString = LocalizedStrings.GatewayEventRemoteDispatcher_NO_AVAILABLE_CONNECTION_WAS_FOUND_BUT_THE_FOLLOWING_ACTIVE_SERVERS_EXIST_0.toLocalizedString(sb.toString());
                }
                IOException iOException = new IOException(localizedString);
                this.sender.setServerLocation(null);
                if (this.failedConnectCount == 1) {
                    this.logger.warning(LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT, this.processor.getSender().getId());
                }
                throw new GatewaySenderException(LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT.toLocalizedString(this.processor.getSender().getId()), iOException);
            }
        } catch (Throwable th) {
            this.connectionLifeCycleLock.writeLock().unlock();
            throw th;
        }
    }

    public void stopAckReaderThread() {
        if (this.ackReaderThread != null) {
            this.ackReaderThread.shutdown();
        }
    }
}
