package net.openhft.chronicle.engine.map;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import net.openhft.chronicle.bytes.IORuntimeException;
import net.openhft.chronicle.core.pool.ClassAliasPool;
import net.openhft.chronicle.engine.api.map.MapEvent;
import net.openhft.chronicle.engine.api.pubsub.InvalidSubscriberException;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.pubsub.Subscription;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.server.internal.MapWireHandler;
import net.openhft.chronicle.engine.server.internal.PublisherHandler;
import net.openhft.chronicle.engine.server.internal.SubscriptionHandler;
import net.openhft.chronicle.engine.tree.TopologicalEvent;
import net.openhft.chronicle.network.connection.AbstractAsyncSubscription;
import net.openhft.chronicle.network.connection.AbstractStatelessClient;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.network.connection.TcpChannelHub;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.Wires;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/openhft/chronicle/engine/map/AbstractRemoteSubscription.class */
abstract class AbstractRemoteSubscription<E> extends AbstractStatelessClient implements Subscription<E> {
    private static final Logger LOG = LoggerFactory.getLogger(MapWireHandler.class);
    protected final Map<Object, Long> subscribersToTid;

    public AbstractRemoteSubscription(@NotNull TcpChannelHub tcpChannelHub, long j, @NotNull String str) {
        super(tcpChannelHub, j, str);
        this.subscribersToTid = new ConcurrentHashMap();
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.Subscription
    public void registerSubscriber(@NotNull RequestContext requestContext, @NotNull Subscriber<E> subscriber) {
        registerSubscriber0(requestContext, subscriber);
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.Subscription
    public void unregisterSubscriber(Subscriber<E> subscriber) {
        unregisterSubscriber0(subscriber);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerSubscriber0(@NotNull final RequestContext requestContext, @NotNull final Subscriber subscriber) {
        if (this.hub.outBytesLock().isHeldByCurrentThread()) {
            throw new IllegalStateException("Cannot view map while debugging");
        }
        Boolean bootstrap = requestContext.bootstrap();
        String str = this.csp;
        if (bootstrap != null) {
            str = str + "&bootstrap=" + bootstrap;
        }
        this.hub.subscribe(new AbstractAsyncSubscription(this.hub, str) { // from class: net.openhft.chronicle.engine.map.AbstractRemoteSubscription.1
            {
                AbstractRemoteSubscription.this.subscribersToTid.put(subscriber, Long.valueOf(tid()));
            }

            @Override // net.openhft.chronicle.network.connection.AbstractAsyncSubscription
            public void onSubscribe(@NotNull WireOut wireOut) {
                wireOut.writeEventName(SubscriptionHandler.SubscriptionEventID.registerSubscriber).typeLiteral(ClassAliasPool.CLASS_ALIASES.nameFor(requestContext.elementType()));
            }

            @Override // net.openhft.chronicle.network.connection.AsyncSubscription
            public void onConsumer(@NotNull WireIn wireIn) {
                Subscriber subscriber2 = subscriber;
                RequestContext requestContext2 = requestContext;
                wireIn.readDocument((Consumer) null, wireIn2 -> {
                    StringBuilder acquireStringBuilder = Wires.acquireStringBuilder();
                    ValueIn readEventName = wireIn2.readEventName(acquireStringBuilder);
                    if (PublisherHandler.EventId.onEndOfSubscription.contentEquals(acquireStringBuilder)) {
                        subscriber2.onEndOfSubscription();
                        AbstractRemoteSubscription.this.hub.unsubscribe(tid());
                    } else if (CoreFields.reply.contentEquals(acquireStringBuilder)) {
                        Class elementType = requestContext2.elementType();
                        AbstractRemoteSubscription.this.onEvent((MapEvent.class.isAssignableFrom(elementType) || TopologicalEvent.class.isAssignableFrom(elementType)) ? readEventName.typedMarshallable() : readEventName.object(requestContext2.elementType()), subscriber2);
                    }
                });
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onEvent(@Nullable Object obj, @NotNull Subscriber subscriber) {
        if (obj != null) {
            try {
                subscriber.onMessage(obj);
            } catch (InvalidSubscriberException e) {
                unregisterSubscriber(subscriber);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unregisterSubscriber0(Subscriber subscriber) {
        Long l = this.subscribersToTid.get(subscriber);
        if (l == null) {
            LOG.warn("There is subscription to unsubscribe");
            return;
        }
        this.hub.checkConnection();
        this.hub.outBytesLock().lock();
        try {
            writeMetaDataForKnownTID(l.longValue());
            this.hub.outWire().writeDocument(false, wireOut -> {
                wireOut.writeEventName(SubscriptionHandler.SubscriptionEventID.unRegisterSubscriber).text("");
            });
            try {
                this.hub.writeSocket(this.hub.outWire());
            } catch (IORuntimeException e) {
            }
        } finally {
            this.hub.outBytesLock().unlock();
        }
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.Subscription
    public int topicSubscriberCount() {
        return proxyReturnInt(SubscriptionHandler.SubscriptionEventID.topicSubscriberCount);
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.Subscription
    public int keySubscriberCount() {
        return proxyReturnInt(SubscriptionHandler.SubscriptionEventID.keySubscriberCount);
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.Subscription
    public int entrySubscriberCount() {
        return proxyReturnInt(SubscriptionHandler.SubscriptionEventID.entrySubscriberCount);
    }
}
