package net.openhft.chronicle.engine.map;

import java.time.LocalTime;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.engine.api.map.KeyValueStore;
import net.openhft.chronicle.engine.api.map.MapEvent;
import net.openhft.chronicle.engine.api.map.MapView;
import net.openhft.chronicle.engine.api.pubsub.ISubscriber;
import net.openhft.chronicle.engine.api.pubsub.InvalidSubscriberException;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.pubsub.TopicSubscriber;
import net.openhft.chronicle.engine.api.tree.Asset;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.cfg.SubscriptionStat;
import net.openhft.chronicle.engine.query.Filter;
import net.openhft.chronicle.engine.tree.QueueView;
import net.openhft.chronicle.network.api.session.SessionDetails;
import net.openhft.chronicle.network.api.session.SessionProvider;
import net.openhft.chronicle.queue.Excerpt;
import net.openhft.chronicle.threads.api.EventLoop;
import net.openhft.chronicle.threads.api.InvalidEventHandlerException;
import net.openhft.chronicle.wire.WireKey;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/openhft/chronicle/engine/map/QueueObjectSubscription.class */
public class QueueObjectSubscription<T, M> implements ObjectSubscription<T, M> {
    private static final Logger LOG = LoggerFactory.getLogger(QueueObjectSubscription.class);
    private final Set<TopicSubscriber<T, M>> topicSubscribers;
    private final Set<Subscriber<Excerpt>> subscribers;
    private final Set<EventConsumer<T, M>> downstream;
    private final SessionProvider sessionProvider;

    @Nullable
    private final Asset asset;
    private final Map<Subscriber, Subscriber> subscriptionDelegate;
    private final Class<T> topicType;
    private Map<String, SubscriptionStat> subscriptionMonitoringMap;
    private EventLoop eventLoop;
    private KeyValueStore<T, M> kvStore;

    public QueueObjectSubscription(@NotNull RequestContext requestContext, @NotNull Asset asset) {
        this(requestContext.topicType(), requestContext.viewType(), asset);
    }

    public QueueObjectSubscription(Class cls, @Nullable Class cls2, @Nullable Asset asset) {
        this.topicSubscribers = new CopyOnWriteArraySet();
        this.subscribers = new CopyOnWriteArraySet();
        this.downstream = new CopyOnWriteArraySet();
        this.subscriptionDelegate = new IdentityHashMap();
        this.subscriptionMonitoringMap = null;
        this.asset = asset;
        if (cls2 != null && asset != null) {
            asset.addView(cls2, this);
        }
        this.sessionProvider = asset == null ? null : (SessionProvider) asset.findView(SessionProvider.class);
        this.eventLoop = (EventLoop) asset.root().acquireView(EventLoop.class);
        this.topicType = cls;
    }

    public void close() {
        notifyEndOfSubscription(this.topicSubscribers);
        notifyEndOfSubscription(this.subscribers);
        notifyEndOfSubscription(this.downstream);
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.ISubscriber
    public void onEndOfSubscription() {
        throw new UnsupportedOperationException("todo");
    }

    private void notifyEndOfSubscription(@NotNull Set<? extends ISubscriber> set) {
        set.forEach(this::notifyEndOfSubscription);
        set.clear();
    }

    private void notifyEndOfSubscription(@NotNull ISubscriber iSubscriber) {
        try {
            iSubscriber.onEndOfSubscription();
        } catch (Exception e) {
            LOG.error("", e);
        }
    }

    @Override // net.openhft.chronicle.engine.map.KVSSubscription
    public void setKvStore(KeyValueStore<T, M> keyValueStore) {
        this.kvStore = keyValueStore;
    }

    @Override // net.openhft.chronicle.engine.map.KVSSubscription, net.openhft.chronicle.engine.map.EventConsumer
    public void notifyEvent(MapEvent<T, M> mapEvent) {
        throw new UnsupportedOperationException("todo");
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.SubscriptionCollection
    public int entrySubscriberCount() {
        return 0;
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.SubscriptionCollection
    public int topicSubscriberCount() {
        return this.topicSubscribers.size();
    }

    @Override // net.openhft.chronicle.engine.map.KVSSubscription
    public boolean hasSubscribers() {
        return (this.topicSubscribers.isEmpty() && this.subscribers.isEmpty() && this.downstream.isEmpty() && !this.asset.hasChildren()) ? false : true;
    }

    @Override // net.openhft.chronicle.engine.map.KVSSubscription
    public boolean needsPrevious() {
        return (this.subscribers.isEmpty() && this.downstream.isEmpty()) ? false : true;
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.SubscriptionCollection
    public void registerSubscriber(@NotNull RequestContext requestContext, @NotNull Subscriber subscriber, @NotNull Filter filter) {
        try {
            QueueView queueView = (QueueView) this.asset.acquireView(QueueView.class);
            this.eventLoop.addHandler(() -> {
                Object obj = queueView.get(requestContext.name());
                if (obj == null) {
                    return true;
                }
                subscriber.accept(obj);
                return true;
            });
        } catch (Exception e) {
            throw Jvm.rethrow(e);
        }
    }

    @Override // net.openhft.chronicle.engine.map.KVSSubscription
    public void registerTopicSubscriber(@NotNull RequestContext requestContext, @NotNull TopicSubscriber<T, M> topicSubscriber) {
        addToStats("topicSubscription");
        this.topicSubscribers.add(topicSubscriber);
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        QueueView queueView = (QueueView) this.asset.acquireView(QueueView.class);
        this.eventLoop.addHandler(() -> {
            if (atomicBoolean.get()) {
                throw new InvalidEventHandlerException();
            }
            queueView.get((charSequence, obj) -> {
                try {
                    topicSubscriber.onMessage(toT(charSequence), obj);
                } catch (InvalidSubscriberException e) {
                    atomicBoolean.set(true);
                }
            });
            return true;
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    private T toT(CharSequence charSequence) {
        if (this.topicType == CharSequence.class) {
            return charSequence;
        }
        if (this.topicType == String.class) {
            return (T) charSequence.toString();
        }
        if (this.topicType == WireKey.class) {
            return (T) () -> {
                return charSequence.toString();
            };
        }
        throw new UnsupportedOperationException("unable to convert " + ((Object) charSequence) + " to type " + this.topicType);
    }

    @Override // net.openhft.chronicle.engine.map.KVSSubscription
    public void registerDownstream(@NotNull EventConsumer<T, M> eventConsumer) {
        this.downstream.add(eventConsumer);
    }

    public void unregisterDownstream(EventConsumer<T, M> eventConsumer) {
        this.downstream.remove(eventConsumer);
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.SubscriptionCollection
    public void unregisterSubscriber(@NotNull Subscriber subscriber) {
        Subscriber subscriber2 = this.subscriptionDelegate.get(subscriber);
        Subscriber subscriber3 = subscriber2 != null ? subscriber2 : subscriber;
        if (this.subscribers.remove(subscriber3)) {
            removeFromStats("subscription");
        }
        subscriber3.onEndOfSubscription();
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.SubscriptionCollection
    public int keySubscriberCount() {
        throw new UnsupportedOperationException();
    }

    @Override // net.openhft.chronicle.engine.map.KVSSubscription
    public void registerKeySubscriber(@NotNull RequestContext requestContext, @NotNull Subscriber<T> subscriber, @NotNull Filter<T> filter) {
        throw new UnsupportedOperationException();
    }

    @Override // net.openhft.chronicle.engine.map.KVSSubscription
    public void unregisterTopicSubscriber(@NotNull TopicSubscriber topicSubscriber) {
        this.topicSubscribers.remove(topicSubscriber);
        removeFromStats("topicSubscription");
        topicSubscriber.onEndOfSubscription();
    }

    private Map getSubscriptionMap() {
        if (this.subscriptionMonitoringMap != null) {
            return this.subscriptionMonitoringMap;
        }
        Asset asset = this.asset.root().getAsset("proc/subscriptions");
        if (asset != null && asset.getView(MapView.class) != null) {
            this.subscriptionMonitoringMap = (Map) asset.getView(MapView.class);
        }
        return this.subscriptionMonitoringMap;
    }

    private void addToStats(String str) {
        SessionDetails sessionDetails;
        if (this.sessionProvider == null || (sessionDetails = this.sessionProvider.get()) == null) {
            return;
        }
        String userId = sessionDetails.userId();
        Map subscriptionMap = getSubscriptionMap();
        if (subscriptionMap != null) {
            SubscriptionStat subscriptionStat = (SubscriptionStat) subscriptionMap.get(userId + "~" + str);
            if (subscriptionStat == null) {
                subscriptionStat = new SubscriptionStat();
                subscriptionStat.setFirstSubscribed(LocalTime.now());
            }
            subscriptionStat.setTotalSubscriptions(subscriptionStat.getTotalSubscriptions() + 1);
            subscriptionStat.setActiveSubscriptions(subscriptionStat.getActiveSubscriptions() + 1);
            subscriptionStat.setRecentlySubscribed(LocalTime.now());
            subscriptionMap.put(userId + "~" + str, subscriptionStat);
        }
    }

    private void removeFromStats(String str) {
        SessionDetails sessionDetails;
        if (this.sessionProvider == null || (sessionDetails = this.sessionProvider.get()) == null) {
            return;
        }
        String userId = sessionDetails.userId();
        Map subscriptionMap = getSubscriptionMap();
        if (subscriptionMap != null) {
            SubscriptionStat subscriptionStat = (SubscriptionStat) subscriptionMap.get(userId + "~" + str);
            if (subscriptionStat == null) {
                throw new AssertionError("There should be an active subscription");
            }
            subscriptionStat.setActiveSubscriptions(subscriptionStat.getActiveSubscriptions() - 1);
            subscriptionStat.setRecentlySubscribed(LocalTime.now());
            subscriptionMap.put(userId + "~" + str, subscriptionStat);
        }
    }
}
