package net.openhft.chronicle.engine.map;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import net.openhft.chronicle.engine.api.EngineReplication;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.api.tree.View;
import net.openhft.chronicle.engine.map.replication.Bootstrap;
import net.openhft.chronicle.engine.server.internal.MapWireHandler;
import net.openhft.chronicle.engine.server.internal.ReplicationHandler;
import net.openhft.chronicle.network.connection.AbstractAsyncSubscription;
import net.openhft.chronicle.network.connection.AbstractAsyncTemporarySubscription;
import net.openhft.chronicle.network.connection.AbstractStatelessClient;
import net.openhft.chronicle.network.connection.TcpChannelHub;
import net.openhft.chronicle.threads.HandlerPriority;
import net.openhft.chronicle.threads.api.EventHandler;
import net.openhft.chronicle.threads.api.EventLoop;
import net.openhft.chronicle.threads.api.InvalidEventHandlerException;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.WriteMarshallable;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/openhft/chronicle/engine/map/ReplicationHub.class */
public class ReplicationHub extends AbstractStatelessClient implements View {
    private static final Logger LOG = LoggerFactory.getLogger(ChronicleMapKeyValueStore.class);
    private final EventLoop eventLoop;
    private final AtomicBoolean isClosed;

    public ReplicationHub(@NotNull RequestContext requestContext, @NotNull TcpChannelHub tcpChannelHub, EventLoop eventLoop, AtomicBoolean atomicBoolean) {
        super(tcpChannelHub, 0L, toUri(requestContext));
        this.eventLoop = eventLoop;
        this.isClosed = atomicBoolean;
    }

    private static String toUri(@NotNull RequestContext requestContext) {
        StringBuilder sb = new StringBuilder(requestContext.fullName() + "?view=Replication");
        if (requestContext.keyType() != String.class) {
            sb.append("&keyType=").append(requestContext.keyType().getName());
        }
        if (requestContext.valueType() != String.class) {
            sb.append("&valueType=").append(requestContext.valueType().getName());
        }
        return sb.toString();
    }

    public void bootstrap(@NotNull final EngineReplication engineReplication, final byte b) throws InterruptedException {
        this.hub.subscribe(new AbstractAsyncSubscription(this.hub, this.csp, b) { // from class: net.openhft.chronicle.engine.map.ReplicationHub.1
            @Override // net.openhft.chronicle.network.connection.AbstractAsyncSubscription
            public void onSubscribe(WireOut wireOut) {
                wireOut.writeEventName(ReplicationHandler.EventId.identifier).marshallable(WriteMarshallable.EMPTY);
            }

            @Override // net.openhft.chronicle.network.connection.AsyncSubscription
            public void onConsumer(WireIn wireIn) {
                byte b2 = b;
                EngineReplication engineReplication2 = engineReplication;
                wireIn.readDocument((Consumer) null, wireIn2 -> {
                    ReplicationHub.this.onConnected(b2, wireIn2.read(ReplicationHandler.EventId.identifierReply).int8(), engineReplication2);
                });
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onConnected(byte b, byte b2, EngineReplication engineReplication) {
        final EngineReplication.ModificationIterator acquireModificationIterator = engineReplication.acquireModificationIterator(b2);
        long lastModificationTime = engineReplication.lastModificationTime(b2);
        final Bootstrap bootstrap = new Bootstrap();
        bootstrap.lastUpdatedTime(lastModificationTime);
        bootstrap.identifier(b);
        (v0) -> {
            return v0.typedMarshallable();
        };
        subscribe(engineReplication, b);
        this.hub.subscribe(new AbstractAsyncSubscription(this.hub, this.csp, b) { // from class: net.openhft.chronicle.engine.map.ReplicationHub.2
            @Override // net.openhft.chronicle.network.connection.AbstractAsyncSubscription
            public void onSubscribe(WireOut wireOut) {
                wireOut.writeEventName(MapWireHandler.EventId.bootstap).typedMarshallable(bootstrap);
            }

            @Override // net.openhft.chronicle.network.connection.AsyncSubscription
            public void onConsumer(WireIn wireIn) {
                EngineReplication.ModificationIterator modificationIterator = acquireModificationIterator;
                wireIn.readDocument((Consumer) null, wireIn2 -> {
                    try {
                        ReplicationHub.this.publish(modificationIterator, wireIn2.read(ReplicationHandler.EventId.bootstrapReply).typedMarshallable());
                    } catch (Exception e) {
                        ReplicationHub.LOG.error("", e);
                    }
                });
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void publish(@NotNull final EngineReplication.ModificationIterator modificationIterator, @NotNull Bootstrap bootstrap) throws InterruptedException {
        final TcpChannelHub tcpChannelHub = this.hub;
        EventLoop eventLoop = this.eventLoop;
        eventLoop.getClass();
        modificationIterator.setModificationNotifier(eventLoop::unpause);
        this.eventLoop.addHandler(new EventHandler() { // from class: net.openhft.chronicle.engine.map.ReplicationHub.3
            public boolean action() throws InvalidEventHandlerException {
                if (!modificationIterator.hasNext()) {
                    return false;
                }
                if (ReplicationHub.this.isClosed.get()) {
                    throw new InvalidEventHandlerException();
                }
                TcpChannelHub tcpChannelHub2 = tcpChannelHub;
                EngineReplication.ModificationIterator modificationIterator2 = modificationIterator;
                tcpChannelHub2.lock(() -> {
                    modificationIterator2.forEach(replicationEntry -> {
                        ReplicationHub.this.sendEventAsyncWithoutLock(ReplicationHandler.EventId.replicationEvent, valueOut -> {
                            valueOut.typedMarshallable(replicationEntry);
                        });
                    });
                });
                return true;
            }

            @NotNull
            public HandlerPriority priority() {
                return HandlerPriority.MEDIUM;
            }
        });
        modificationIterator.dirtyEntries(bootstrap.lastUpdatedTime());
    }

    private void subscribe(@NotNull final EngineReplication engineReplication, final byte b) {
        this.hub.subscribe(new AbstractAsyncTemporarySubscription(this.hub, this.csp, b) { // from class: net.openhft.chronicle.engine.map.ReplicationHub.4
            @Override // net.openhft.chronicle.network.connection.AbstractAsyncSubscription
            public void onSubscribe(@NotNull WireOut wireOut) {
                wireOut.writeEventName(ReplicationHandler.EventId.replicationSubscribe).int8(b);
            }

            @Override // net.openhft.chronicle.network.connection.AsyncSubscription
            public void onConsumer(@NotNull WireIn wireIn) {
                EngineReplication engineReplication2 = engineReplication;
                wireIn.readDocument((Consumer) null, wireIn2 -> {
                    engineReplication2.applyReplication((EngineReplication.ReplicationEntry) wireIn2.read(ReplicationHandler.EventId.replicactionReply).typedMarshallable());
                });
            }
        });
    }
}
