package net.openhft.chronicle.engine.map;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.threads.EventHandler;
import net.openhft.chronicle.core.threads.EventLoop;
import net.openhft.chronicle.core.threads.HandlerPriority;
import net.openhft.chronicle.core.threads.InvalidEventHandlerException;
import net.openhft.chronicle.engine.api.EngineReplication;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.map.CMap2EngineReplicator;
import net.openhft.chronicle.engine.map.replication.Bootstrap;
import net.openhft.chronicle.engine.server.internal.MapWireHandler;
import net.openhft.chronicle.engine.server.internal.ReplicationHandler;
import net.openhft.chronicle.network.connection.AbstractAsyncSubscription;
import net.openhft.chronicle.network.connection.AbstractAsyncTemporarySubscription;
import net.openhft.chronicle.network.connection.AbstractStatelessClient;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.network.connection.TcpChannelHub;
import net.openhft.chronicle.wire.ReadMarshallable;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.Wires;
import net.openhft.chronicle.wire.WriteMarshallable;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:net/openhft/chronicle/engine/map/ReplicationHub.class */
public class ReplicationHub extends AbstractStatelessClient {
    private static final Logger LOG;
    final ThreadLocal<CMap2EngineReplicator.VanillaReplicatedEntry> vre;
    private final EventLoop eventLoop;
    private final AtomicBoolean isClosed;
    private final Function<Bytes, Wire> wireType;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/openhft/chronicle/engine/map/ReplicationHub$RepEventHandler.class */
    public class RepEventHandler implements EventHandler, Consumer<EngineReplication.ReplicationEntry> {
        final Wire wire;
        private final TcpChannelHub hub;
        private final EngineReplication.ModificationIterator mi;
        private final byte remoteIdentifier;
        boolean hasLogged;
        final Bytes bytes = Bytes.elasticByteBuffer();
        boolean hasSentLastUpdateTime = false;
        long lastUpdateTime = 0;

        public RepEventHandler(TcpChannelHub tcpChannelHub, EngineReplication.ModificationIterator modificationIterator, byte b) {
            this.hub = tcpChannelHub;
            this.mi = modificationIterator;
            this.remoteIdentifier = b;
            this.wire = (Wire) ReplicationHub.this.wireType.apply(this.bytes);
        }

        public boolean action() throws InvalidEventHandlerException {
            if (this.hub.isOutBytesLocked() || !this.hub.isOutBytesEmpty()) {
                return false;
            }
            if (ReplicationHub.this.isClosed.get()) {
                throw new InvalidEventHandlerException();
            }
            this.bytes.clear();
            if (this.mi.hasNext() || this.hasSentLastUpdateTime || this.lastUpdateTime <= 0) {
                this.mi.nextEntry(this);
                if (this.bytes.readRemaining() <= 0) {
                    return false;
                }
                ReplicationHub.this.sendBytes(this.bytes, false);
                return true;
            }
            this.wire.writeNotReadyDocument(false, wireOut -> {
                wireOut.writeEventName(CoreFields.lastUpdateTime).int64(this.lastUpdateTime);
                wireOut.write(() -> {
                    return "id";
                }).int8(this.remoteIdentifier);
            });
            this.hasSentLastUpdateTime = true;
            if (!this.hasLogged) {
                this.hasLogged = true;
            }
            if (this.bytes.readRemaining() <= 0) {
                return false;
            }
            ReplicationHub.this.sendBytes(this.bytes, false);
            return true;
        }

        @Override // java.util.function.Consumer
        public void accept(EngineReplication.ReplicationEntry replicationEntry) {
            long max = Math.max(this.lastUpdateTime, replicationEntry.timestamp());
            if (max > this.lastUpdateTime) {
                this.hasSentLastUpdateTime = false;
                this.lastUpdateTime = max;
            }
            if (Jvm.isDebug() && ReplicationHub.LOG.isDebugEnabled()) {
                ReplicationHub.LOG.debug("*****\t\t\t\tSENT : CLIENT :replicatedEntry latency=" + (System.currentTimeMillis() - replicationEntry.timestamp()) + "ms");
            }
            this.wire.writeNotReadyDocument(false, wireOut -> {
                wireOut.writeEventName(ReplicationHandler.EventId.replicationEvent).typedMarshallable(replicationEntry);
            });
        }

        public HandlerPriority priority() {
            return HandlerPriority.REPLICATION;
        }
    }

    public ReplicationHub(@NotNull RequestContext requestContext, @NotNull TcpChannelHub tcpChannelHub, @NotNull EventLoop eventLoop, @NotNull AtomicBoolean atomicBoolean, @NotNull Function<Bytes, Wire> function) {
        super(tcpChannelHub, 0L, toUri(requestContext));
        this.vre = ThreadLocal.withInitial(CMap2EngineReplicator.VanillaReplicatedEntry::new);
        this.eventLoop = eventLoop;
        this.isClosed = atomicBoolean;
        this.wireType = function;
    }

    private static String toUri(@NotNull RequestContext requestContext) {
        StringBuilder sb = new StringBuilder(requestContext.fullName() + "?view=Replication");
        if (requestContext.keyType() != String.class) {
            sb.append("&keyType=").append(requestContext.keyType().getName());
        }
        if (requestContext.valueType() != String.class) {
            sb.append("&valueType=").append(requestContext.valueType().getName());
        }
        return sb.toString();
    }

    public void bootstrap(@NotNull final EngineReplication engineReplication, final byte b, final byte b2) {
        this.hub.subscribe(new AbstractAsyncSubscription(this.hub, this.csp, b, "ReplicationHub bootstrap") { // from class: net.openhft.chronicle.engine.map.ReplicationHub.1
            public void onSubscribe(@NotNull WireOut wireOut) {
                if (ReplicationHub.LOG.isDebugEnabled()) {
                    ReplicationHub.LOG.debug("onSubscribe - localIdentifier=" + ((int) b) + ",remoteIdentifier=" + ((int) b2));
                }
                wireOut.writeEventName(ReplicationHandler.EventId.identifier).marshallable(WriteMarshallable.EMPTY).writeComment(toString() + ", tcpChannelHub={" + ReplicationHub.this.hub.toString() + "}");
            }

            public void onConsumer(@NotNull WireIn wireIn) {
                if (Jvm.isDebug()) {
                    System.out.println("client : bootstrap");
                }
                byte b3 = b;
                EngineReplication engineReplication2 = engineReplication;
                wireIn.readDocument((ReadMarshallable) null, wireIn2 -> {
                    ReplicationHub.this.onConnected(b3, wireIn2.read(ReplicationHandler.EventId.identifierReply).int8(), engineReplication2);
                });
            }

            @NotNull
            public String toString() {
                return "bootstrap {localIdentifier=" + ((int) b) + " ,remoteIdentifier=" + ((int) b2) + "}";
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onConnected(byte b, final byte b2, @NotNull final EngineReplication engineReplication) {
        final EngineReplication.ModificationIterator acquireModificationIterator = engineReplication.acquireModificationIterator(b2);
        if (!$assertionsDisabled && acquireModificationIterator == null) {
            throw new AssertionError();
        }
        long lastModificationTime = engineReplication.lastModificationTime(b2);
        final Bootstrap bootstrap = new Bootstrap();
        bootstrap.lastUpdatedTime(lastModificationTime);
        bootstrap.identifier(b);
        this.hub.subscribe(new AbstractAsyncTemporarySubscription(this.hub, this.csp, b, "replication onConnected") { // from class: net.openhft.chronicle.engine.map.ReplicationHub.2
            int count = 0;

            public void onSubscribe(@NotNull WireOut wireOut) {
                wireOut.writeEventName(MapWireHandler.EventId.bootstrap).typedMarshallable(bootstrap);
            }

            public void onConsumer(@NotNull WireIn wireIn) {
                if (Jvm.isDebug()) {
                    System.out.println("client : onConsumer - publishing updates");
                }
                EngineReplication.ModificationIterator modificationIterator = acquireModificationIterator;
                byte b3 = b2;
                EngineReplication engineReplication2 = engineReplication;
                wireIn.readDocument((ReadMarshallable) null, wireIn2 -> {
                    StringBuilder acquireStringBuilder = Wires.acquireStringBuilder();
                    ValueIn readEventName = wireIn2.readEventName(acquireStringBuilder);
                    if (ReplicationHandler.EventId.bootstrap.contentEquals(acquireStringBuilder)) {
                        try {
                            ReplicationHub.this.publish(modificationIterator, (Bootstrap) readEventName.typedMarshallable(), b3);
                            return;
                        } catch (Exception e) {
                            ReplicationHub.LOG.error("", e);
                            return;
                        }
                    }
                    if (!ReplicationHandler.EventId.replicationEvent.contentEquals(acquireStringBuilder)) {
                        if (CoreFields.lastUpdateTime.contentEquals(acquireStringBuilder)) {
                            if (Jvm.isDebug()) {
                                System.out.println("server : received lastUpdateTime");
                            }
                            engineReplication2.setLastModificationTime(wireIn2.read(() -> {
                                return "id";
                            }).int8(), readEventName.int64());
                            return;
                        }
                        return;
                    }
                    CMap2EngineReplicator.VanillaReplicatedEntry vanillaReplicatedEntry = ReplicationHub.this.vre.get();
                    readEventName.marshallable(vanillaReplicatedEntry);
                    if (ReplicationHub.LOG.isInfoEnabled()) {
                        long currentTimeMillis = System.currentTimeMillis() - vanillaReplicatedEntry.timestamp();
                        if (currentTimeMillis > 100) {
                            ReplicationHub.LOG.info("Rcv Clt latency=" + currentTimeMillis + "ms\t");
                            int i = this.count;
                            this.count = i + 1;
                            if (i % 10 == 0) {
                                ReplicationHub.LOG.info("");
                            }
                        }
                    }
                    engineReplication2.applyReplication(vanillaReplicatedEntry);
                });
            }
        });
    }

    void publish(@NotNull EngineReplication.ModificationIterator modificationIterator, @NotNull Bootstrap bootstrap, byte b) {
        TcpChannelHub tcpChannelHub = this.hub;
        EventLoop eventLoop = this.eventLoop;
        eventLoop.getClass();
        modificationIterator.setModificationNotifier(eventLoop::unpause);
        this.eventLoop.addHandler(true, new RepEventHandler(tcpChannelHub, modificationIterator, b));
        modificationIterator.dirtyEntries(bootstrap.lastUpdatedTime());
    }

    static {
        $assertionsDisabled = !ReplicationHub.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(ChronicleMapKeyValueStore.class);
    }
}
