package fr.inria.eventcloud.proxies;

import com.hp.hpl.jena.graph.Node;
import com.hp.hpl.jena.graph.NodeFactory;
import com.hp.hpl.jena.sparql.engine.binding.Binding;
import fr.inria.eventcloud.api.CompoundEvent;
import fr.inria.eventcloud.api.Quadruple;
import fr.inria.eventcloud.api.QuadruplePattern;
import fr.inria.eventcloud.api.SubscriptionId;
import fr.inria.eventcloud.api.listeners.BindingNotificationListener;
import fr.inria.eventcloud.api.listeners.CompoundEventNotificationListener;
import fr.inria.eventcloud.api.listeners.NotificationListener;
import fr.inria.eventcloud.api.listeners.NotificationListenerType;
import fr.inria.eventcloud.api.listeners.SignalNotificationListener;
import fr.inria.eventcloud.api.properties.AlterableElaProperty;
import fr.inria.eventcloud.configuration.EventCloudProperties;
import fr.inria.eventcloud.formatters.QuadruplesFormatter;
import fr.inria.eventcloud.messages.request.ReconstructCompoundEventRequest;
import fr.inria.eventcloud.messages.request.RemoveEphemeralSubscriptionRequest;
import fr.inria.eventcloud.messages.request.UnsubscribeRequest;
import fr.inria.eventcloud.messages.response.QuadruplePatternResponse;
import fr.inria.eventcloud.pubsub.PublishSubscribeUtils;
import fr.inria.eventcloud.pubsub.Subscription;
import fr.inria.eventcloud.pubsub.Subsubscription;
import fr.inria.eventcloud.pubsub.notifications.BindingNotification;
import fr.inria.eventcloud.pubsub.notifications.Notification;
import fr.inria.eventcloud.pubsub.notifications.NotificationId;
import fr.inria.eventcloud.pubsub.notifications.PollingSignalNotification;
import fr.inria.eventcloud.pubsub.notifications.QuadruplesNotification;
import fr.inria.eventcloud.pubsub.notifications.SignalNotification;
import fr.inria.eventcloud.pubsub.solutions.BindingSolution;
import fr.inria.eventcloud.pubsub.solutions.QuadruplesSolution;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import net.sf.ehcache.Cache;
import net.sf.ehcache.CacheManager;
import net.sf.ehcache.Element;
import net.sf.ehcache.config.CacheConfiguration;
import net.sf.ehcache.config.Configuration;
import net.sf.ehcache.config.DiskStoreConfiguration;
import net.sf.ehcache.config.PersistenceConfiguration;
import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.eviction.EvictionStrategy;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.util.concurrent.IsolationLevel;
import org.objectweb.proactive.Body;
import org.objectweb.proactive.annotation.multiactivity.MemberOf;
import org.objectweb.proactive.api.PAActiveObject;
import org.objectweb.proactive.api.PAFuture;
import org.objectweb.proactive.core.component.body.ComponentEndActive;
import org.objectweb.proactive.extensions.p2p.structured.configuration.P2PStructuredProperties;
import org.objectweb.proactive.extensions.p2p.structured.utils.Files;
import org.objectweb.proactive.extensions.p2p.structured.utils.UnicodeUtils;
import org.objectweb.proactive.multiactivity.component.ComponentMultiActiveService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:fr/inria/eventcloud/proxies/SubscribeProxyImpl.class */
public class SubscribeProxyImpl extends EventCloudProxy implements ComponentEndActive, SubscribeProxy, SubscribeProxyAttributeController {
    private static final long serialVersionUID = 151;
    public static final String SUBSCRIBE_PROXY_ADL = "fr.inria.eventcloud.proxies.SubscribeProxy";
    public static final String SUBSCRIBE_SERVICES_ITF = "subscribe-services";
    public static final String SUBSCRIBE_PROXY_VN = "SubscribeProxyVN";
    private static final Logger log = LoggerFactory.getLogger(SubscribeProxyImpl.class);
    private EventsDeliveredCache eventsDeliveredCache;
    private ConcurrentMap<SubscriptionId, SubscriptionEntry<?>> subscriptions;
    private ConcurrentMap<NotificationId, BindingSolution> bindingSolutions;
    private ConcurrentMap<NotificationId, QuadruplesSolution> quadruplesSolutions;

    /* loaded from: input_file:fr/inria/eventcloud/proxies/SubscribeProxyImpl$EhcacheEventsDeliveredCache.class */
    private static class EhcacheEventsDeliveredCache extends EventsDeliveredCache {
        private CacheManager cacheManager;
        private Cache cache;
        private String proxyIdentifier;

        public EhcacheEventsDeliveredCache(String str) {
            super(EventCloudProperties.getDefaultTemporaryPath() + "ehcache" + File.separatorChar + str);
            this.proxyIdentifier = str;
            this.cache = createCache();
        }

        private Cache createCache() {
            Cache cache = new Cache(new CacheConfiguration(this.proxyIdentifier, 0).memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU).eternal(true).persistence(new PersistenceConfiguration().strategy(PersistenceConfiguration.Strategy.LOCALTEMPSWAP)).transactionalMode(CacheConfiguration.TransactionalMode.OFF));
            cache.disableDynamicFeatures();
            this.cacheManager = getOrCreateCacheManager(this.diskStorePath);
            this.cacheManager.addCache(cache);
            return this.cacheManager.getCache(this.proxyIdentifier);
        }

        private CacheManager getOrCreateCacheManager(String str) {
            CacheManager cacheManager = CacheManager.getCacheManager("default");
            if (cacheManager == null) {
                cacheManager = createCacheManager(str);
            }
            return cacheManager;
        }

        private CacheManager createCacheManager(String str) {
            return CacheManager.create(new Configuration().dynamicConfig(false).diskStore(new DiskStoreConfiguration().path(str)).name("default").updateCheck(false));
        }

        @Override // fr.inria.eventcloud.proxies.SubscribeProxyImpl.EventsDeliveredCache
        public void clear() {
            this.cacheManager.clearAllStartingWith(this.proxyIdentifier);
        }

        @Override // fr.inria.eventcloud.proxies.SubscribeProxyImpl.EventsDeliveredCache
        public void close() {
            this.cacheManager.shutdown();
            super.close();
        }

        @Override // fr.inria.eventcloud.proxies.SubscribeProxyImpl.EventsDeliveredCache
        public boolean contains(NotificationId notificationId) {
            return this.cache.getQuiet(notificationId) != null;
        }

        @Override // fr.inria.eventcloud.proxies.SubscribeProxyImpl.EventsDeliveredCache
        public boolean markAsDelivered(NotificationId notificationId, SubscriptionId subscriptionId) {
            return this.cache.putIfAbsent(new Element(notificationId, subscriptionId)) == null;
        }

        @Override // fr.inria.eventcloud.proxies.SubscribeProxyImpl.EventsDeliveredCache
        public void removeEntriesFor(SubscriptionId subscriptionId) {
            for (NotificationId notificationId : this.cache.getKeysNoDuplicateCheck()) {
                if (notificationId.isFor(subscriptionId)) {
                    this.cache.remove(notificationId);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:fr/inria/eventcloud/proxies/SubscribeProxyImpl$EventsDeliveredCache.class */
    public static abstract class EventsDeliveredCache {
        protected final String diskStorePath;

        public EventsDeliveredCache(String str) {
            this.diskStorePath = str;
        }

        public abstract void clear();

        public abstract boolean contains(NotificationId notificationId);

        public abstract boolean markAsDelivered(NotificationId notificationId, SubscriptionId subscriptionId);

        public abstract void removeEntriesFor(SubscriptionId subscriptionId);

        public void close() {
            try {
                Files.deleteDirectory(this.diskStorePath);
            } catch (IOException e) {
                throw new RuntimeException("There was an issue while trying to remove cache directory " + this.diskStorePath);
            }
        }
    }

    /* loaded from: input_file:fr/inria/eventcloud/proxies/SubscribeProxyImpl$InfinispanEventsDeliveredCache.class */
    private static class InfinispanEventsDeliveredCache extends EventsDeliveredCache {
        private final EmbeddedCacheManager cacheManager;
        private final org.infinispan.Cache<NotificationId, SubscriptionId> cache;

        public InfinispanEventsDeliveredCache(String str) {
            super(EventCloudProperties.getDefaultTemporaryPath() + "infinispan" + File.separatorChar + str);
            GlobalConfigurationBuilder globalConfigurationBuilder = new GlobalConfigurationBuilder();
            globalConfigurationBuilder.globalJmxStatistics().disable().allowDuplicateDomains(true);
            globalConfigurationBuilder.serialization().addAdvancedExternalizer(NotificationId.SERIALIZER).addAdvancedExternalizer(SubscriptionId.SERIALIZER);
            this.cacheManager = new DefaultCacheManager(globalConfigurationBuilder.build());
            this.cache = createCache();
        }

        private org.infinispan.Cache<NotificationId, SubscriptionId> createCache() {
            ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
            configurationBuilder.loaders().passivation(true).addFileCacheStore().location(this.diskStorePath).purgeOnStartup(true).async().eviction().maxEntries(((Integer) EventCloudProperties.SUBSCRIBER_CACHE_MAX_ENTRIES.getValue()).intValue()).strategy(EvictionStrategy.LRU).locking().isolationLevel(IsolationLevel.NONE);
            this.cacheManager.defineConfiguration("default", configurationBuilder.build());
            return this.cacheManager.getCache("default");
        }

        @Override // fr.inria.eventcloud.proxies.SubscribeProxyImpl.EventsDeliveredCache
        public void clear() {
            this.cache.stop();
            this.cache.start();
        }

        @Override // fr.inria.eventcloud.proxies.SubscribeProxyImpl.EventsDeliveredCache
        public void close() {
            this.cacheManager.stop();
            super.close();
        }

        @Override // fr.inria.eventcloud.proxies.SubscribeProxyImpl.EventsDeliveredCache
        public boolean contains(NotificationId notificationId) {
            return this.cache.containsKey(notificationId);
        }

        @Override // fr.inria.eventcloud.proxies.SubscribeProxyImpl.EventsDeliveredCache
        public boolean markAsDelivered(NotificationId notificationId, SubscriptionId subscriptionId) {
            return this.cache.putIfAbsent(notificationId, subscriptionId) == null;
        }

        @Override // fr.inria.eventcloud.proxies.SubscribeProxyImpl.EventsDeliveredCache
        public void removeEntriesFor(SubscriptionId subscriptionId) {
            for (NotificationId notificationId : this.cache.keySet()) {
                if (notificationId.isFor(subscriptionId)) {
                    this.cache.remove(notificationId);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:fr/inria/eventcloud/proxies/SubscribeProxyImpl$SubscriptionEntry.class */
    public static final class SubscriptionEntry<T extends NotificationListener<?>> {
        private final Subscription subscription;
        private final T listener;

        public SubscriptionEntry(Subscription subscription, T t) {
            this.subscription = subscription;
            this.listener = t;
        }
    }

    public void runComponentActivity(Body body) {
        ((EventCloudProxy) this).multiActiveService = new ComponentMultiActiveService(body);
        ((EventCloudProxy) this).multiActiveService.multiActiveServing(((Integer) EventCloudProperties.MAO_SOFT_LIMIT_SUBSCRIBE_PROXIES.getValue()).intValue(), false, false);
    }

    public void endComponentActivity(Body body) {
        this.eventsDeliveredCache.close();
    }

    @Override // fr.inria.eventcloud.proxies.SubscribeProxy
    public boolean clear() {
        this.bindingSolutions.clear();
        this.quadruplesSolutions.clear();
        this.subscriptions.clear();
        this.eventsDeliveredCache.clear();
        return true;
    }

    @Override // fr.inria.eventcloud.proxies.SubscribeProxyAttributeController
    public void setAttributes(EventCloudCache eventCloudCache, String str, AlterableElaProperty[] alterableElaPropertyArr) {
        if (this.eventCloudCache == null) {
            super.setAttributes(eventCloudCache.getTrackers());
            this.eventCloudCache = eventCloudCache;
            this.subscriptions = new ConcurrentHashMap(100, 0.9f, 2);
            this.bindingSolutions = new ConcurrentHashMap(((Integer) EventCloudProperties.MAO_SOFT_LIMIT_SUBSCRIBE_PROXIES.getValue()).intValue() * ((Integer) EventCloudProperties.AVERAGE_NB_QUADRUPLES_PER_COMPOUND_EVENT.getValue()).intValue(), 0.75f, ((Integer) EventCloudProperties.MAO_SOFT_LIMIT_SUBSCRIBE_PROXIES.getValue()).intValue());
            this.quadruplesSolutions = new ConcurrentHashMap(((Integer) EventCloudProperties.MAO_SOFT_LIMIT_SUBSCRIBE_PROXIES.getValue()).intValue() * ((Integer) EventCloudProperties.AVERAGE_NB_QUADRUPLES_PER_COMPOUND_EVENT.getValue()).intValue(), 0.75f, ((Integer) EventCloudProperties.MAO_SOFT_LIMIT_SUBSCRIBE_PROXIES.getValue()).intValue());
            String str2 = (String) EventCloudProperties.SUBSCRIBER_CACHE_ENGINE.getValue();
            if (str2.equals("ehcache")) {
                this.eventsDeliveredCache = new EhcacheEventsDeliveredCache(getComponentId());
            } else {
                if (!str2.equals("infinispan")) {
                    throw new IllegalStateException("Unknown cache engine: " + str2);
                }
                this.eventsDeliveredCache = new InfinispanEventsDeliveredCache(getComponentId());
            }
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { // from class: fr.inria.eventcloud.proxies.SubscribeProxyImpl.1
                @Override // java.lang.Runnable
                public void run() {
                    SubscribeProxyImpl.this.eventsDeliveredCache.close();
                }
            }));
        }
    }

    @MemberOf("parallelSelfCompatible")
    public void subscribe(fr.inria.eventcloud.api.Subscription subscription, BindingNotificationListener bindingNotificationListener) {
        indexSubscription(subscription, bindingNotificationListener);
    }

    @MemberOf("parallelSelfCompatible")
    public void subscribe(fr.inria.eventcloud.api.Subscription subscription, CompoundEventNotificationListener compoundEventNotificationListener) {
        indexSubscription(subscription, (EventCloudProperties.isSbce1PubSubAlgorithmUsed() || EventCloudProperties.isSbce2PubSubAlgorithmUsed()) ? PublishSubscribeUtils.removeResultVarsExceptGraphVar(subscription.getSparqlQuery()) : subscription.getSparqlQuery(), compoundEventNotificationListener);
    }

    @MemberOf("parallelSelfCompatible")
    public void subscribe(fr.inria.eventcloud.api.Subscription subscription, SignalNotificationListener signalNotificationListener) {
        indexSubscription(subscription, signalNotificationListener);
    }

    private void indexSubscription(fr.inria.eventcloud.api.Subscription subscription, NotificationListener<?> notificationListener) {
        indexSubscription(subscription, subscription.getSparqlQuery(), notificationListener);
    }

    private void indexSubscription(fr.inria.eventcloud.api.Subscription subscription, String str, NotificationListener<?> notificationListener) {
        Subscription createInternalSubscription = createInternalSubscription(subscription, ((EventCloudProxy) this).url, str, notificationListener.getType());
        if (this.subscriptions.putIfAbsent(subscription.getId(), new SubscriptionEntry<>(createInternalSubscription, notificationListener)) != null) {
            throw new IllegalArgumentException("Subscription already registered for subscription id: " + createInternalSubscription.getId());
        }
        super.m61selectPeer().subscribe(createInternalSubscription);
        log.info("New subscription has been registered from {} with id {}", PAActiveObject.getBodyOnThis().getUrl(), createInternalSubscription.getId());
    }

    private static Subscription createInternalSubscription(fr.inria.eventcloud.api.Subscription subscription, String str, String str2, NotificationListenerType notificationListenerType) {
        return new Subscription(subscription.getId(), null, subscription.getId(), subscription.getCreationTime(), str2, str, subscription.getSubscriptionDestination(), notificationListenerType);
    }

    @MemberOf("parallelSelfCompatible")
    public void unsubscribe(SubscriptionId subscriptionId) {
        SubscriptionEntry<?> remove = this.subscriptions.remove(subscriptionId);
        if (remove == null) {
            throw new IllegalArgumentException("No subscription registered with the specified subscription id: " + subscriptionId);
        }
        Subscription subscription = ((SubscriptionEntry) remove).subscription;
        for (Subsubscription subsubscription : subscription.getSubSubscriptions()) {
            super.send(new UnsubscribeRequest(subscription.getOriginalId(), subsubscription.getAtomicQuery(), subscription.getType() == NotificationListenerType.BINDING, false));
        }
        this.eventsDeliveredCache.removeEntriesFor(subscriptionId);
    }

    @Override // fr.inria.eventcloud.proxies.SubscribeProxy
    @MemberOf("parallelSelfCompatible")
    public void receiveSbce1Or2(BindingNotification bindingNotification) {
        logNotificationReception(bindingNotification);
        SubscriptionId subscriptionId = bindingNotification.getSubscriptionId();
        SubscriptionEntry<?> subscriptionEntry = this.subscriptions.get(subscriptionId);
        if (subscriptionEntry == null) {
            return;
        }
        BindingSolution bindingSolution = this.bindingSolutions.get(bindingNotification.getId());
        if (bindingSolution == null) {
            bindingSolution = new BindingSolution(((SubscriptionEntry) subscriptionEntry).subscription.getResultVars().size(), bindingNotification.getContent());
            BindingSolution putIfAbsent = this.bindingSolutions.putIfAbsent(bindingNotification.getId(), bindingSolution);
            if (putIfAbsent != null) {
                bindingSolution = putIfAbsent;
                bindingSolution.merge(bindingNotification.getContent());
            }
        } else {
            bindingSolution.merge(bindingNotification.getContent());
        }
        if (bindingSolution.isReady()) {
            if (this.eventsDeliveredCache.markAsDelivered(bindingNotification.getId(), subscriptionId)) {
                deliver((SubscriptionEntry<BindingNotificationListener>) subscriptionEntry, bindingSolution.getChunks());
            }
            this.bindingSolutions.remove(bindingNotification.getId());
        }
    }

    @Override // fr.inria.eventcloud.proxies.SubscribeProxy
    @MemberOf("parallelSelfCompatible")
    public void receiveSbce3(BindingNotification bindingNotification) {
        logNotificationReception(bindingNotification);
        SubscriptionId subscriptionId = bindingNotification.getSubscriptionId();
        SubscriptionEntry<?> subscriptionEntry = this.subscriptions.get(subscriptionId);
        if (subscriptionEntry == null || !this.eventsDeliveredCache.markAsDelivered(bindingNotification.getId(), subscriptionId)) {
            return;
        }
        deliver((SubscriptionEntry<BindingNotificationListener>) subscriptionEntry, bindingNotification.getContent());
    }

    private void deliver(SubscriptionEntry<BindingNotificationListener> subscriptionEntry, Binding binding) {
        ((SubscriptionEntry) subscriptionEntry).listener.onNotification(((SubscriptionEntry) subscriptionEntry).subscription.getId(), binding);
    }

    @Override // fr.inria.eventcloud.proxies.SubscribeProxy
    @MemberOf("parallelSelfCompatible")
    public void receiveSbce2(QuadruplesNotification quadruplesNotification) {
        logNotificationReception(quadruplesNotification);
        SubscriptionId subscriptionId = quadruplesNotification.getSubscriptionId();
        if (this.eventsDeliveredCache.contains(quadruplesNotification.getId())) {
            handleReceiveDuplicateSolution(quadruplesNotification);
            return;
        }
        if (log.isTraceEnabled()) {
            log.trace("Received quadruples notification subscriptionId=" + subscriptionId + ", contentSize=" + quadruplesNotification.getContent().size() + ", from=" + quadruplesNotification.getSource() + "\n" + QuadruplesFormatter.toString(quadruplesNotification.getContent(), true));
        }
        QuadruplesSolution quadruplesSolution = this.quadruplesSolutions.get(quadruplesNotification.getId());
        if (quadruplesSolution == null) {
            quadruplesSolution = new QuadruplesSolution(quadruplesNotification.getContent());
            QuadruplesSolution putIfAbsent = this.quadruplesSolutions.putIfAbsent(quadruplesNotification.getId(), quadruplesSolution);
            if (putIfAbsent != null) {
                quadruplesSolution = putIfAbsent;
                quadruplesSolution.merge((Collection<Quadruple>) quadruplesNotification.getContent());
            }
        } else {
            quadruplesSolution.merge((Collection<Quadruple>) quadruplesNotification.getContent());
        }
        if (quadruplesSolution.isReady() && this.eventsDeliveredCache.markAsDelivered(quadruplesNotification.getId(), subscriptionId)) {
            CompoundEvent compoundEvent = new CompoundEvent(quadruplesSolution.getChunks());
            SubscriptionEntry<?> subscriptionEntry = this.subscriptions.get(subscriptionId);
            if (subscriptionEntry != null) {
                deliver(subscriptionEntry, compoundEvent.getGraph().getURI(), compoundEvent);
            }
            this.quadruplesSolutions.remove(quadruplesNotification.getId());
        }
    }

    private void handleReceiveDuplicateSolution(QuadruplesNotification quadruplesNotification) {
        log.info("Received some quadruple duplicates for a CE that has already been delivered. They will be ignored:\n{}", quadruplesNotification);
        sendRemoveEphemeralSubscription(quadruplesNotification.getContent().get(0).getGraph(), quadruplesNotification.getSubscriptionId());
        this.quadruplesSolutions.remove(quadruplesNotification.getId());
    }

    private void sendRemoveEphemeralSubscription(Node node, SubscriptionId subscriptionId) {
        super.sendv(new RemoveEphemeralSubscriptionRequest(node, subscriptionId));
    }

    @Override // fr.inria.eventcloud.proxies.SubscribeProxy
    @MemberOf("parallelSelfCompatible")
    public void receiveSbce3(QuadruplesNotification quadruplesNotification) {
        logNotificationReception(quadruplesNotification);
        SubscriptionEntry<?> subscriptionEntry = this.subscriptions.get(quadruplesNotification.getSubscriptionId());
        if (subscriptionEntry == null || !this.eventsDeliveredCache.markAsDelivered(quadruplesNotification.getId(), quadruplesNotification.getSubscriptionId())) {
            return;
        }
        CompoundEvent compoundEvent = new CompoundEvent(quadruplesNotification.getContent());
        deliver(subscriptionEntry, compoundEvent.getGraph().getURI(), compoundEvent);
    }

    @Override // fr.inria.eventcloud.proxies.SubscribeProxy
    @MemberOf("parallelSelfCompatible")
    public void receiveSbce1Or2(SignalNotification signalNotification) {
        receive(signalNotification);
    }

    @Override // fr.inria.eventcloud.proxies.SubscribeProxy
    @MemberOf("parallelSelfCompatible")
    public void receiveSbce3(SignalNotification signalNotification) {
        receive(signalNotification);
    }

    private void receive(SignalNotification signalNotification) {
        logNotificationReception(signalNotification);
        SubscriptionEntry<?> subscriptionEntry = this.subscriptions.get(signalNotification.getSubscriptionId());
        if (subscriptionEntry == null || !this.eventsDeliveredCache.markAsDelivered(signalNotification.getId(), signalNotification.getSubscriptionId())) {
            return;
        }
        deliver((SubscriptionEntry<SignalNotificationListener>) subscriptionEntry, signalNotification.getMetaEventId());
    }

    private void deliver(SubscriptionEntry<SignalNotificationListener> subscriptionEntry, String str) {
        ((SubscriptionEntry) subscriptionEntry).listener.onNotification(((SubscriptionEntry) subscriptionEntry).subscription.getId(), str);
    }

    @Override // fr.inria.eventcloud.proxies.SubscribeProxy
    @MemberOf("parallelSelfCompatible")
    public void receiveSbce1(PollingSignalNotification pollingSignalNotification) {
        CompoundEvent reconstructCompoundEvent;
        logNotificationReception(pollingSignalNotification);
        SubscriptionId subscriptionId = pollingSignalNotification.getSubscriptionId();
        SubscriptionEntry<?> subscriptionEntry = this.subscriptions.get(subscriptionId);
        if (subscriptionEntry == null || (reconstructCompoundEvent = reconstructCompoundEvent(pollingSignalNotification.getId(), subscriptionId, NodeFactory.createURI(pollingSignalNotification.getMetaEventId()))) == null) {
            return;
        }
        deliver(subscriptionEntry, pollingSignalNotification.getMetaEventId(), reconstructCompoundEvent);
    }

    private void deliver(SubscriptionEntry<CompoundEventNotificationListener> subscriptionEntry, String str, CompoundEvent compoundEvent) {
        SubscriptionId id = ((SubscriptionEntry) subscriptionEntry).subscription.getId();
        CompoundEventNotificationListener compoundEventNotificationListener = ((SubscriptionEntry) subscriptionEntry).listener;
        compoundEventNotificationListener.onNotification(id, compoundEvent);
        sendInputOutputMonitoringReport(str, compoundEventNotificationListener.getSubscriberUrl());
        logIntegrationInformation(str);
    }

    @Override // fr.inria.eventcloud.proxies.SubscribeProxy
    @MemberOf("parallelSelfCompatible")
    public final CompoundEvent reconstructCompoundEvent(NotificationId notificationId, SubscriptionId subscriptionId, Node node) {
        if (!this.eventsDeliveredCache.markAsDelivered(notificationId, subscriptionId)) {
            return null;
        }
        int i = -1;
        ArrayList arrayList = new ArrayList();
        HashSet hashSet = new HashSet();
        QuadruplePattern quadruplePattern = new QuadruplePattern(node, Node.ANY, Node.ANY, Node.ANY);
        int i2 = 0;
        long currentTimeMillis = log.isTraceEnabled() ? System.currentTimeMillis() : 0L;
        while (arrayList.size() != i) {
            if (log.isInfoEnabled()) {
                log.info("Reconstructing compound event for subscription {} and graph value {} ({}/{})", new Object[]{subscriptionId, node, Integer.valueOf(arrayList.size()), Integer.valueOf(i)});
            }
            for (Quadruple quadruple : ((QuadruplePatternResponse) PAFuture.getFutureValue(super.send(new ReconstructCompoundEventRequest(quadruplePattern, hashSet)))).getResult()) {
                if (PublishSubscribeUtils.isMetaQuadruple(quadruple)) {
                    String literalLexicalForm = quadruple.getObject().getLiteralLexicalForm();
                    if (48 < ((Integer) P2PStructuredProperties.CAN_LOWER_BOUND.getValue()).intValue()) {
                        literalLexicalForm = UnicodeUtils.translate(literalLexicalForm, -(((Integer) P2PStructuredProperties.CAN_LOWER_BOUND.getValue()).intValue() - 48));
                    }
                    i = Integer.parseInt(literalLexicalForm);
                } else {
                    arrayList.add(quadruple);
                }
                hashSet.add(quadruple.hashValue());
            }
            if (arrayList.size() != i) {
                try {
                    Thread.sleep(((Integer) EventCloudProperties.RECONSTRUCTION_RETRY_THRESHOLD.getValue()).intValue());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    e.printStackTrace();
                }
                i2++;
            }
        }
        if (log.isTraceEnabled()) {
            log.trace("Reconstruction for eventId " + node + " has required " + (i2 + 1) + " requests and " + (System.currentTimeMillis() - currentTimeMillis) + " ms");
        }
        return new CompoundEvent(arrayList);
    }

    private void sendInputOutputMonitoringReport(String str, String str2) {
        if (this.monitoringManager != null) {
            String publicationSource = Quadruple.getPublicationSource(str);
            if (publicationSource == null) {
                publicationSource = "http://0.0.0.0";
            }
            if (str2 == null) {
                str2 = ((EventCloudProxy) this).url;
            }
            this.monitoringManager.sendInputOutputMonitoringReport(publicationSource, str2, Quadruple.getPublicationTime(str));
        }
    }

    private void logIntegrationInformation(String str) {
        if (log.isTraceEnabled()) {
            String str2 = "EventCloud Exit";
            if (str != null) {
                str2 = (str2 + " ") + Quadruple.removeMetaInformation(NodeFactory.createURI(str));
            }
            log.trace((str2 + " ") + this.eventCloudCache.getId().getStreamUrl());
        }
    }

    private void logNotificationReception(Notification<?> notification) {
        if (log.isDebugEnabled()) {
            log.debug("New notification received {} on {} for subscription id {}", new Object[]{notification.getId(), ((EventCloudProxy) this).url, notification.getSubscriptionId()});
        }
    }

    @Override // fr.inria.eventcloud.proxies.SubscribeProxy
    @MemberOf("parallelSelfCompatible")
    public Subscription find(SubscriptionId subscriptionId) {
        return ((SubscriptionEntry) this.subscriptions.get(subscriptionId)).subscription;
    }

    @MemberOf("parallelSelfCompatible")
    public String getComponentUri() {
        return ((EventCloudProxy) this).url;
    }

    private String getComponentId() {
        return ((EventCloudProxy) this).url.substring(((EventCloudProxy) this).url.lastIndexOf(47) + 1, ((EventCloudProxy) this).url.length());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String prefixName() {
        return "subscribe-proxy";
    }
}
