package net.openhft.chronicle.engine.server.internal;

import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import net.openhft.chronicle.engine.api.EngineReplication;
import net.openhft.chronicle.engine.api.pubsub.Replication;
import net.openhft.chronicle.engine.map.replication.Bootstrap;
import net.openhft.chronicle.engine.server.internal.MapWireHandler;
import net.openhft.chronicle.engine.tree.HostIdentifier;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.threads.HandlerPriority;
import net.openhft.chronicle.threads.api.EventHandler;
import net.openhft.chronicle.threads.api.EventLoop;
import net.openhft.chronicle.threads.api.InvalidEventHandlerException;
import net.openhft.chronicle.wire.ParameterizeWireKey;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireKey;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:net/openhft/chronicle/engine/server/internal/ReplicationHandler.class */
public class ReplicationHandler<E> extends AbstractHandler {
    private Replication replication;
    private Queue<Consumer<Wire>> publisher;
    private HostIdentifier hostId;
    private long tid;
    private AtomicBoolean isClosed;
    private EventLoop eventLoop;
    private final StringBuilder eventName = new StringBuilder();

    @Nullable
    private final BiConsumer<WireIn, Long> dataConsumer = new BiConsumer<WireIn, Long>() { // from class: net.openhft.chronicle.engine.server.internal.ReplicationHandler.1
        @Override // java.util.function.BiConsumer
        public void accept(@NotNull WireIn wireIn, final Long l) {
            ReplicationHandler.this.eventName.setLength(0);
            ValueIn readEventName = wireIn.readEventName(ReplicationHandler.this.eventName);
            if (!EventId.replicationSubscribe.contentEquals(ReplicationHandler.this.eventName)) {
                if (EventId.replicationEvent.contentEquals(ReplicationHandler.this.eventName)) {
                    ReplicationHandler.this.replication.applyReplication(readEventName.typedMarshallable());
                    return;
                } else {
                    ReplicationHandler.this.outWire.writeDocument(true, wireOut -> {
                        ReplicationHandler.this.outWire.writeEventName(CoreFields.tid).int64(ReplicationHandler.this.tid);
                    });
                    ReplicationHandler.this.writeData(wireIn.bytes(), wireOut2 -> {
                        if (EventId.identifier.contentEquals(ReplicationHandler.this.eventName)) {
                            ReplicationHandler.this.outWire.write(EventId.identifierReply).int8(ReplicationHandler.this.hostId.hostId());
                            return;
                        }
                        if (MapWireHandler.EventId.bootstap.contentEquals(ReplicationHandler.this.eventName)) {
                            byte identifier = readEventName.typedMarshallable().identifier();
                            Bootstrap bootstrap = new Bootstrap();
                            bootstrap.identifier(ReplicationHandler.this.hostId.hostId());
                            bootstrap.lastUpdatedTime(ReplicationHandler.this.replication.lastModificationTime(identifier));
                            ReplicationHandler.this.outWire.write(EventId.bootstrapReply).typedMarshallable(bootstrap);
                        }
                    });
                    return;
                }
            }
            final byte int8 = readEventName.int8();
            final EngineReplication.ModificationIterator acquireModificationIterator = ReplicationHandler.this.replication.acquireModificationIterator(int8);
            acquireModificationIterator.setModificationNotifier(() -> {
                acquireModificationIterator.forEach(replicationEntry -> {
                    ReplicationHandler.this.publisher.add(wire -> {
                        System.out.println("publish from server response from replicationSubscribelocalIdentifier=" + ReplicationHandler.this.hostId + " ,remoteIdentifier=" + ((int) int8) + " event=" + replicationEntry);
                        wire.writeDocument(true, wireOut3 -> {
                            wireOut3.writeEventName(CoreFields.tid).int64(l.longValue());
                        });
                        wire.writeNotReadyDocument(false, wireOut4 -> {
                            wireOut4.write(EventId.replicationReply).typedMarshallable(replicationEntry);
                        });
                    });
                });
            });
            EventLoop eventLoop = ReplicationHandler.this.eventLoop;
            eventLoop.getClass();
            acquireModificationIterator.setModificationNotifier(eventLoop::unpause);
            ReplicationHandler.this.eventLoop.addHandler(new EventHandler() { // from class: net.openhft.chronicle.engine.server.internal.ReplicationHandler.1.1
                public boolean action() throws InvalidEventHandlerException {
                    if (ReplicationHandler.this.isClosed.get()) {
                        throw new InvalidEventHandlerException();
                    }
                    AtomicBoolean atomicBoolean = new AtomicBoolean();
                    EngineReplication.ModificationIterator modificationIterator = acquireModificationIterator;
                    byte b = int8;
                    Long l2 = l;
                    modificationIterator.forEach(replicationEntry -> {
                        ReplicationHandler.this.publisher.add(wire -> {
                            if (replicationEntry.identifier() != ReplicationHandler.this.hostId.hostId()) {
                                return;
                            }
                            atomicBoolean.set(true);
                            System.out.println("publish from server response from itterator localIdentifier=" + ReplicationHandler.this.hostId + " ,remoteIdentifier=" + ((int) b) + " event=" + replicationEntry);
                            wire.writeDocument(true, wireOut3 -> {
                                wireOut3.writeEventName(CoreFields.tid).int64(l2.longValue());
                            });
                            wire.writeNotReadyDocument(false, wireOut4 -> {
                                wireOut4.write(EventId.replicationReply).typedMarshallable(replicationEntry);
                            });
                        });
                    });
                    return atomicBoolean.get();
                }

                @NotNull
                public HandlerPriority priority() {
                    return HandlerPriority.MEDIUM;
                }
            });
            try {
                acquireModificationIterator.dirtyEntries(0L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };

    /* loaded from: input_file:net/openhft/chronicle/engine/server/internal/ReplicationHandler$EventId.class */
    public enum EventId implements ParameterizeWireKey {
        publish(new WireKey[0]),
        onEndOfSubscription(new WireKey[0]),
        apply(new WireKey[0]),
        replicationEvent(new WireKey[0]),
        replicationSubscribe(new WireKey[0]),
        replicationReply(new WireKey[0]),
        bootstrapReply(new WireKey[0]),
        identifierReply(new WireKey[0]),
        identifier(new WireKey[0]);

        private final WireKey[] params;

        EventId(WireKey... wireKeyArr) {
            this.params = wireKeyArr;
        }

        @NotNull
        public <P extends WireKey> P[] params() {
            return (P[]) this.params;
        }

        @Override // java.lang.Enum
        public /* bridge */ /* synthetic */ CharSequence name() {
            return super.name();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void process(@NotNull WireIn wireIn, Queue<Consumer<Wire>> queue, long j, Wire wire, HostIdentifier hostIdentifier, Replication replication, AtomicBoolean atomicBoolean, EventLoop eventLoop) {
        this.isClosed = atomicBoolean;
        this.eventLoop = eventLoop;
        setOutWire(wire);
        this.hostId = hostIdentifier;
        this.publisher = queue;
        this.replication = replication;
        this.tid = j;
        this.dataConsumer.accept(wireIn, Long.valueOf(j));
    }
}
