package net.intelie.liverig.client;

import java.io.Closeable;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import net.intelie.liverig.client.Storage;
import net.intelie.liverig.protocol.AccessDenied;
import net.intelie.liverig.protocol.BlockingQueueProcessingTask;
import net.intelie.liverig.protocol.CloseNotifier;
import net.intelie.liverig.protocol.CommandReceiver;
import net.intelie.liverig.protocol.Components;
import net.intelie.liverig.protocol.ConnectionTimeouts;
import net.intelie.liverig.protocol.Counters;
import net.intelie.liverig.protocol.Parameters;
import net.intelie.liverig.protocol.RemoteControlData;
import net.intelie.liverig.protocol.RemoteControlReceiver;
import net.intelie.liverig.protocol.SequenceNumber;
import net.intelie.liverig.protocol.SequenceNumberRange;
import net.intelie.liverig.protocol.Source;
import net.intelie.liverig.protocol.Timestamp;
import net.intelie.liverig.util.SafeConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:net/intelie/liverig/client/OpenConnection.class */
public class OpenConnection implements Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger(OpenConnection.class);
    private static final Marker CONTROL_MARKER = MarkerFactory.getMarker("CONTROL");
    private static final byte[] EMPTY = new byte[0];
    private final Storage storage;
    private final RemoteControlHandler remoteControlHandler;
    private final CloseNotifier<OpenConnection> closeNotifier;
    private final Runnable afterHandshake;
    private final SocketAddress remoteAddress;
    private final Source protocol;
    private volatile boolean closed;
    private final ResendTask resendTask;
    private final ConnectionTimeouts timeouts;
    private final ConcurrentMap<Short, RemoteControlRequestHandle> remoteControlRequests = new ConcurrentHashMap();
    private final ExecutorService remoteControlExecutor;

    /* loaded from: input_file:net/intelie/liverig/client/OpenConnection$Control.class */
    private class Control implements CommandReceiver {
        private final long idleTimeout;
        private final long keepaliveTimeout;

        private Control(long j, long j2) {
            this.idleTimeout = j;
            this.keepaliveTimeout = j2;
        }

        @Override // net.intelie.liverig.protocol.CommandReceiver
        public void authenticate(String str, String str2) throws AccessDenied {
            OpenConnection.LOGGER.error(OpenConnection.CONTROL_MARKER, "received unexpected AUTHENTICATE from {} ({}:{})", new Object[]{OpenConnection.this.remoteAddress, str, str2});
        }

        @Override // net.intelie.liverig.protocol.CommandReceiver
        public void sourceHelloReceived(Map<String, String> map) {
            OpenConnection.LOGGER.error(OpenConnection.CONTROL_MARKER, "received unexpected HELLO from {} ({})", OpenConnection.this.remoteAddress, map);
        }

        @Override // net.intelie.liverig.protocol.CommandReceiver
        public void sinkHelloReceived(Map<String, String> map) {
            OpenConnection.LOGGER.info(OpenConnection.CONTROL_MARKER, "received HELLO from {} ({})", OpenConnection.this.remoteAddress, map);
            OpenConnection.this.timeouts.helloReceived(this.idleTimeout, this.keepaliveTimeout, TimeUnit.MILLISECONDS);
            OpenConnection.this.afterHandshake.run();
        }

        @Override // net.intelie.liverig.protocol.CommandReceiver
        public void ackReceived(SequenceNumber sequenceNumber) {
            OpenConnection.LOGGER.info(OpenConnection.CONTROL_MARKER, "received ACK from {} ({})", OpenConnection.this.remoteAddress, sequenceNumber);
            OpenConnection.this.timeouts.received();
            OpenConnection.this.storage.mayForget(sequenceNumber);
            OpenConnection.this.resendTask.mayForget(sequenceNumber);
        }

        @Override // net.intelie.liverig.protocol.CommandReceiver
        public void resendRequestReceived(SequenceNumberRange sequenceNumberRange) {
            OpenConnection.LOGGER.info(OpenConnection.CONTROL_MARKER, "received RESEND from {} ({})", OpenConnection.this.remoteAddress, sequenceNumberRange);
            OpenConnection.this.timeouts.received();
            try {
                OpenConnection.this.resendTask.put(sequenceNumberRange);
            } catch (InterruptedException e) {
                OpenConnection.LOGGER.warn("resend request interrupted", e);
                Thread.currentThread().interrupt();
            }
        }

        @Override // net.intelie.liverig.protocol.CommandReceiver
        public void echoReceived(Timestamp timestamp, Timestamp timestamp2, ByteBuffer byteBuffer) {
            OpenConnection.LOGGER.info(OpenConnection.CONTROL_MARKER, "received ECHO from {} ({}-{}, {} bytes)", new Object[]{OpenConnection.this.remoteAddress, timestamp, timestamp2, Integer.valueOf(byteBuffer.remaining())});
            OpenConnection.this.timeouts.received();
        }

        @Override // net.intelie.liverig.protocol.CommandReceiver
        public void closeReceived(String str) {
            if (str.isEmpty()) {
                OpenConnection.LOGGER.info(OpenConnection.CONTROL_MARKER, "received CLOSE from {}", OpenConnection.this.remoteAddress);
            } else {
                OpenConnection.LOGGER.warn(OpenConnection.CONTROL_MARKER, "received CLOSE from {} ({})", OpenConnection.this.remoteAddress, str);
            }
        }

        @Override // net.intelie.liverig.protocol.CommandReceiver
        public void resentLastReceived(SequenceNumberRange sequenceNumberRange, SequenceNumber sequenceNumber) {
            OpenConnection.LOGGER.warn(OpenConnection.CONTROL_MARKER, "received unexpected RESENT_LAST from {} ({} {})", new Object[]{OpenConnection.this.remoteAddress, sequenceNumberRange, sequenceNumber});
        }

        @Override // net.intelie.liverig.protocol.CommandReceiver
        public void closed(Exception exc) {
            if (exc != null) {
                OpenConnection.LOGGER.warn("connection to {} closed by exception", OpenConnection.this.remoteAddress, exc);
            } else {
                OpenConnection.LOGGER.info("connection to {} closed", OpenConnection.this.remoteAddress);
            }
            OpenConnection.this.closing();
        }
    }

    /* loaded from: input_file:net/intelie/liverig/client/OpenConnection$RemoteControl.class */
    private class RemoteControl implements RemoteControlReceiver {
        private RemoteControl() {
        }

        @Override // net.intelie.liverig.protocol.RemoteControlReceiver
        public void requestReceived(Timestamp timestamp, RemoteControlData remoteControlData) {
            OpenConnection.LOGGER.info(OpenConnection.CONTROL_MARKER, "received remote control request from {} ({}: {} bytes)", new Object[]{OpenConnection.this.remoteAddress, Short.valueOf(remoteControlData.getTag()), Integer.valueOf(remoteControlData.size())});
            short tag = remoteControlData.getTag();
            if (OpenConnection.this.remoteControlHandler == null) {
                OpenConnection.LOGGER.warn("Remote control disabled");
                OpenConnection.this.trySendRemoteControlFailure(tag, new RemoteControlDisabled("Remote control disabled"));
                return;
            }
            RemoteControlRequestContext remoteControlRequestContext = new RemoteControlRequestContext(tag);
            RemoteControlRequestHandle remoteControlRequestHandle = new RemoteControlRequestHandle(remoteControlRequestContext);
            if (OpenConnection.this.remoteControlRequests.putIfAbsent(Short.valueOf(tag), remoteControlRequestHandle) != null) {
                OpenConnection.LOGGER.warn("ignoring duplicate remote control tag {}", Short.valueOf(tag));
                return;
            }
            try {
                remoteControlRequestHandle.setFuture(OpenConnection.this.remoteControlExecutor.submit(new RemoteControlTask(remoteControlData.getArgs(), remoteControlRequestContext)));
            } catch (Exception e) {
                OpenConnection.LOGGER.warn("Rejected execution of remote control request {}", Short.valueOf(tag), e);
                OpenConnection.this.trySendRemoteControlFailure(tag, e);
                OpenConnection.this.remoteControlRequests.remove(Short.valueOf(tag));
            }
        }

        @Override // net.intelie.liverig.protocol.RemoteControlReceiver
        public void successReceived(Timestamp timestamp, RemoteControlData remoteControlData) {
            OpenConnection.LOGGER.warn(OpenConnection.CONTROL_MARKER, "received unexpected remote control success from {} ({}: {} bytes)", new Object[]{OpenConnection.this.remoteAddress, Short.valueOf(remoteControlData.getTag()), Integer.valueOf(remoteControlData.size())});
        }

        @Override // net.intelie.liverig.protocol.RemoteControlReceiver
        public void failureReceived(Timestamp timestamp, RemoteControlData remoteControlData) {
            OpenConnection.LOGGER.warn(OpenConnection.CONTROL_MARKER, "received unexpected remote control failure from {} ({}: {} bytes)", new Object[]{OpenConnection.this.remoteAddress, Short.valueOf(remoteControlData.getTag()), Integer.valueOf(remoteControlData.size())});
        }

        @Override // net.intelie.liverig.protocol.RemoteControlReceiver
        public void cancelReceived(Timestamp timestamp, RemoteControlData remoteControlData) {
            OpenConnection.LOGGER.info(OpenConnection.CONTROL_MARKER, "received remote control cancel from {} ({}: {} bytes)", new Object[]{OpenConnection.this.remoteAddress, Short.valueOf(remoteControlData.getTag()), Integer.valueOf(remoteControlData.size())});
            RemoteControlRequestHandle remoteControlRequestHandle = (RemoteControlRequestHandle) OpenConnection.this.remoteControlRequests.get(Short.valueOf(remoteControlData.getTag()));
            if (remoteControlRequestHandle == null) {
                return;
            }
            remoteControlRequestHandle.cancel();
            canceled(remoteControlRequestHandle.getContext());
        }

        private void canceled(RemoteControlRequestContext remoteControlRequestContext) {
            if (remoteControlRequestContext.start()) {
                short tag = remoteControlRequestContext.getTag();
                try {
                    OpenConnection.this.trySendRemoteControlCanceled(tag);
                } finally {
                    OpenConnection.this.remoteControlRequests.remove(Short.valueOf(tag));
                }
            }
        }
    }

    /* loaded from: input_file:net/intelie/liverig/client/OpenConnection$RemoteControlDisabled.class */
    private static class RemoteControlDisabled extends RuntimeException {
        RemoteControlDisabled(String str) {
            super(str);
        }

        @Override // java.lang.Throwable
        public String toString() {
            return getClass().getSimpleName();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/intelie/liverig/client/OpenConnection$RemoteControlRequestHandle.class */
    public static final class RemoteControlRequestHandle {
        private final RemoteControlRequestContext context;
        private volatile Future<?> future;

        private RemoteControlRequestHandle(RemoteControlRequestContext remoteControlRequestContext) {
            this.context = remoteControlRequestContext;
        }

        public RemoteControlRequestContext getContext() {
            return this.context;
        }

        public Future<?> getFuture() {
            return this.future;
        }

        public void setFuture(Future<?> future) {
            this.future = future;
            if (this.context.isCanceled()) {
                cancel();
            }
        }

        public void cancel() {
            this.context.cancel(this.future);
        }
    }

    /* loaded from: input_file:net/intelie/liverig/client/OpenConnection$RemoteControlTask.class */
    private class RemoteControlTask implements Runnable {
        private final byte[][] args;
        private final RemoteControlRequestContext context;

        private RemoteControlTask(byte[][] bArr, RemoteControlRequestContext remoteControlRequestContext) {
            this.args = bArr;
            this.context = remoteControlRequestContext;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.context.start()) {
                short tag = this.context.getTag();
                try {
                    RemoteControlData remoteControlData = new RemoteControlData(tag, OpenConnection.this.remoteControlHandler.handleRequest(this.args, this.context));
                    if (this.context.isCanceled()) {
                        OpenConnection.this.trySendRemoteControlCanceled(tag);
                    } else {
                        OpenConnection.this.trySendRemoteControlSuccess(remoteControlData);
                    }
                } catch (Exception e) {
                    OpenConnection.this.trySendRemoteControlFailure(tag, e);
                } catch (InterruptedException e2) {
                    if (this.context.isCanceled()) {
                        OpenConnection.this.trySendRemoteControlCanceled(tag);
                    } else {
                        OpenConnection.this.trySendRemoteControlFailure(tag, e2);
                    }
                } catch (Throwable th) {
                    OpenConnection.LOGGER.error("Unexpected error", th);
                    OpenConnection.this.trySendRemoteControlFailure(tag, th);
                } finally {
                    OpenConnection.this.remoteControlRequests.remove(Short.valueOf(tag));
                }
            }
        }

        public String toString() {
            return "RemoteControlTask{tag=" + ((int) this.context.getTag()) + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/intelie/liverig/client/OpenConnection$ResendTask.class */
    public class ResendTask extends BlockingQueueProcessingTask<SequenceNumberRangeAndInFlightSnapshot> {
        private final Set<SequenceNumber> inFlight;
        private Event next;

        public ResendTask(int i, ExecutorService executorService) {
            super(new ArrayBlockingQueue(i), executorService);
            this.inFlight = ConcurrentHashMap.newKeySet();
        }

        public void stop() {
            cancel(true);
        }

        public void addInFlight(SequenceNumber sequenceNumber) {
            this.inFlight.add(sequenceNumber);
        }

        public void mayForget(SequenceNumber sequenceNumber) {
            this.inFlight.remove(sequenceNumber);
        }

        public void put(SequenceNumberRange sequenceNumberRange) throws InterruptedException {
            super.put((ResendTask) new SequenceNumberRangeAndInFlightSnapshot(sequenceNumberRange, new HashSet(this.inFlight)));
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // net.intelie.liverig.protocol.BlockingQueueProcessingTask
        public void process(SequenceNumberRangeAndInFlightSnapshot sequenceNumberRangeAndInFlightSnapshot) {
            if (OpenConnection.this.closed) {
                return;
            }
            try {
                SequenceNumberRange sequenceNumberRange = sequenceNumberRangeAndInFlightSnapshot.sequenceNumberRange;
                Set set = sequenceNumberRangeAndInFlightSnapshot.inFlightSnapshot;
                OpenConnection.this.storage.retrieveRange(sequenceNumberRange, event -> {
                    if (OpenConnection.this.closed) {
                        throw new Storage.CancelRetrieve();
                    }
                    if (set.add(event.sequenceNumber())) {
                        if (this.next != null) {
                            OpenConnection.this.tryResend(this.next, false);
                        }
                        this.next = event;
                    }
                });
                if (this.next != null) {
                    OpenConnection.this.trySendResentLast(sequenceNumberRange, this.next.sequenceNumber());
                    OpenConnection.this.tryResend(this.next, true);
                } else {
                    OpenConnection.this.trySendResentLast(sequenceNumberRange, null);
                }
            } catch (Storage.CancelRetrieve e) {
            } catch (Exception e2) {
                OpenConnection.LOGGER.error("Exception in resend task", e2);
            }
            this.next = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/intelie/liverig/client/OpenConnection$SequenceNumberRangeAndInFlightSnapshot.class */
    public static final class SequenceNumberRangeAndInFlightSnapshot {
        private final SequenceNumberRange sequenceNumberRange;
        private final Set<SequenceNumber> inFlightSnapshot;

        private SequenceNumberRangeAndInFlightSnapshot(SequenceNumberRange sequenceNumberRange, Set<SequenceNumber> set) {
            this.sequenceNumberRange = sequenceNumberRange;
            this.inFlightSnapshot = set;
        }
    }

    /* loaded from: input_file:net/intelie/liverig/client/OpenConnection$Timeouts.class */
    private class Timeouts extends ConnectionTimeouts {
        public Timeouts(ScheduledExecutorService scheduledExecutorService, long j) {
            super(scheduledExecutorService, j, TimeUnit.MILLISECONDS);
        }

        @Override // net.intelie.liverig.protocol.ConnectionTimeouts
        protected void keepalive() {
            try {
                OpenConnection.this.protocol.sendEchoRequest(OpenConnection.EMPTY);
            } catch (Exception e) {
                try {
                    OpenConnection.this.close();
                } catch (Exception e2) {
                    e.addSuppressed(e2);
                }
                OpenConnection.LOGGER.warn("Exception sending keepalive ping to {}", OpenConnection.this.remoteAddress, e);
            }
        }

        @Override // net.intelie.liverig.protocol.ConnectionTimeouts
        protected void timeout() {
            try {
                OpenConnection.LOGGER.warn("Timeout from {}", OpenConnection.this.remoteAddress);
                OpenConnection.this.close();
            } catch (Exception e) {
                OpenConnection.LOGGER.warn("Exception closing connection to {}", OpenConnection.this.remoteAddress, e);
            }
        }
    }

    public OpenConnection(Configuration configuration, Storage storage, Components components, Counters counters, SocketChannel socketChannel, int i, boolean z, RemoteControlHandler remoteControlHandler, CloseNotifier<OpenConnection> closeNotifier, Runnable runnable) throws IOException {
        this.storage = storage;
        this.remoteControlHandler = remoteControlHandler;
        this.closeNotifier = closeNotifier;
        this.afterHandshake = runnable;
        this.resendTask = new ResendTask(configuration.resendRequestQueueCapacity(), components.getExecutorServiceFor(Components.ExecutorServiceRole.RESEND));
        this.remoteControlExecutor = components.getExecutorServiceFor(Components.ExecutorServiceRole.REMOTE_CONTROL);
        this.remoteAddress = socketChannel.getRemoteAddress();
        this.protocol = new Source(socketChannel, newParameters(configuration, i, z), counters, components, new Control(configuration.idleTimeout(), configuration.keepaliveTimeout()), new RemoteControl());
        this.timeouts = new Timeouts(components.getScheduledExecutorServiceFor(Components.ScheduledExecutorServiceRole.TIMEOUT), configuration.helloTimeout());
        try {
            this.protocol.start();
        } catch (Exception e) {
            close(false);
            throw e;
        }
    }

    private static Parameters newParameters(Configuration configuration, int i, boolean z) {
        Parameters parameters = new Parameters();
        parameters.version = configuration.program_name_version();
        parameters.username = configuration.instance();
        parameters.password = configuration.password();
        parameters.epoch = i;
        parameters.zlib = configuration.compression();
        parameters.primary = z;
        parameters.resendThrottleRate = configuration.resendRate();
        parameters.sslContext = configuration.sslContext();
        return parameters;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        close(true);
    }

    private void close(boolean z) throws IOException {
        closing(z);
        this.protocol.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closing() {
        closing(true);
    }

    private void closing(boolean z) {
        this.closed = true;
        if (z) {
            this.closeNotifier.closed(this);
        }
        this.remoteControlRequests.values().forEach(SafeConsumer.safeConsumer((v0) -> {
            v0.cancel();
        }));
        this.resendTask.stop();
        this.timeouts.close();
    }

    public void sendClose() throws IOException {
        LOGGER.info("sending close {}", this.remoteAddress);
        try {
            this.closed = true;
            this.protocol.sendClose();
        } catch (ClosedChannelException e) {
            close();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void trySendRealtime(Event event) {
        if (this.closed) {
            return;
        }
        try {
            LOGGER.info("sending event {}", event.sequenceNumber());
            this.resendTask.addInFlight(event.sequenceNumber());
            this.protocol.sendRealtimeData(event.sequenceNumber(), event.metadata(), event.data());
        } catch (Exception e) {
            LOGGER.warn("failed send event {}", event.sequenceNumber(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void trySendBacklog(Event event) {
        if (this.closed) {
            return;
        }
        try {
            LOGGER.info("sending backlog event {}", event.sequenceNumber());
            this.resendTask.addInFlight(event.sequenceNumber());
            this.protocol.sendResentData(event.sequenceNumber(), event.metadata(), event.data(), true);
        } catch (Exception e) {
            LOGGER.warn("failed send backlog event {}", event.sequenceNumber(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void tryResend(Event event, boolean z) {
        if (this.closed) {
            return;
        }
        try {
            LOGGER.info("resending event {}", event.sequenceNumber());
            this.protocol.sendResentData(event.sequenceNumber(), event.metadata(), event.data(), z);
        } catch (Exception e) {
            LOGGER.warn("failed resend event {}", event.sequenceNumber(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void trySendResentLast(SequenceNumberRange sequenceNumberRange, SequenceNumber sequenceNumber) {
        if (this.closed) {
            return;
        }
        try {
            LOGGER.info("last resent event for {} is {}", sequenceNumberRange, sequenceNumber);
            this.protocol.sendResentLast(sequenceNumberRange, sequenceNumber);
        } catch (Exception e) {
            LOGGER.warn("failed send last resent event {}", sequenceNumber);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void trySendRemoteControlSuccess(RemoteControlData remoteControlData) {
        if (this.closed) {
            return;
        }
        try {
            LOGGER.info("sending remote control success {} ({} bytes)", Short.valueOf(remoteControlData.getTag()), Integer.valueOf(remoteControlData.size()));
            this.protocol.sendRemoteControlSuccess(remoteControlData, true);
        } catch (Exception e) {
            LOGGER.warn("failed send remote control success {}", Short.valueOf(remoteControlData.getTag()), e);
            trySendRemoteControlCanceled(remoteControlData.getTag());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void trySendRemoteControlFailure(short s, Throwable th) {
        if (this.closed) {
            return;
        }
        try {
            String th2 = th.toString();
            LOGGER.info("sending remote control failure {} ({})", Short.valueOf(s), th2);
            this.protocol.sendRemoteControlFailure(s, th2.getBytes(StandardCharsets.UTF_8), true);
        } catch (Exception e) {
            LOGGER.warn("failed send remote control failure {}", Short.valueOf(s), e);
            trySendRemoteControlCanceled(s);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void trySendRemoteControlCanceled(short s) {
        if (this.closed) {
            return;
        }
        try {
            LOGGER.info("sending remote control canceled {}", Short.valueOf(s));
            this.protocol.sendRemoteControlCanceled(s, true);
        } catch (Exception e) {
            LOGGER.warn("failed send remote control canceled {}", Short.valueOf(s), e);
        }
    }
}
