package net.intelie.liverig.plugin.collectors;

import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import net.intelie.live.Live;
import net.intelie.live.Query;
import net.intelie.live.QueryEvent;
import net.intelie.live.QueryListener;
import net.intelie.liverig.plugin.assets.Asset;
import net.intelie.liverig.plugin.assets.AssetKey;
import net.intelie.liverig.plugin.assets.AssetLoadedObserver;
import net.intelie.liverig.plugin.assets.AssetNormalizerObserver;
import net.intelie.liverig.plugin.assets.AssetTypeService;
import net.intelie.liverig.plugin.guava.annotations.VisibleForTesting;
import net.intelie.liverig.plugin.normalizer.NormalizerConfig;
import net.intelie.liverig.util.SafeConsumer;
import net.intelie.pipes.time.Clock;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/intelie/liverig/plugin/collectors/AssetObserverToUpdateCollectorStatus.class */
public class AssetObserverToUpdateCollectorStatus implements AssetNormalizerObserver, AssetLoadedObserver {
    private static final Logger LOGGER = LoggerFactory.getLogger(AssetObserverToUpdateCollectorStatus.class);

    @NotNull
    private final Live.Queries queries;

    @NotNull
    private final AssetTypeService assetTypeService;

    @NotNull
    private final String assetType;

    @NotNull
    private final AssetOfflineEventListener assetOfflineEventListener;

    @NotNull
    private final AssetInternalEventListener assetInternalEventListener;

    @NotNull
    private final AssetTypeService collectorTypeService;
    static final String OFFLINE = "OFFLINE";
    static final String ONLINE = "ONLINE";
    static final String BACKLOG = "BACKLOG";
    static final String ALL_SOURCES_OFFLINE = "ALL_SOURCES_OFFLINE";
    static final String ERROR = "ERROR";
    public static final String COLLECTORS_SUMMARY = "collector_summary";
    static final String ASSETS = "assets";
    private final CollectorService collectorService;
    private Live.Action globalQueriesAction;
    private final ConcurrentHashMap<String, Live.Action> collectorRawQueries = new ConcurrentHashMap<>();

    @VisibleForTesting
    final ConcurrentHashMap<AssetKey, Map<String, Map<String, CollectorSourceSummary>>> cacheCollectorSourcesTime = new ConcurrentHashMap<>();

    @VisibleForTesting
    final ConcurrentHashMap<InstanceSource, Set<AssetKey>> cacheCollectorAssetsRelation = new ConcurrentHashMap<>();

    /* loaded from: input_file:net/intelie/liverig/plugin/collectors/AssetObserverToUpdateCollectorStatus$InstanceSource.class */
    public static class InstanceSource {

        @NotNull
        private final String instance;

        @Nullable
        private final String source;

        @Nullable
        private final String rigName;

        InstanceSource(@NotNull String str, @Nullable String str2) {
            this(str, str2, null);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public InstanceSource(@NotNull String str, @Nullable String str2, @Nullable String str3) {
            this.instance = str;
            this.source = str2;
            this.rigName = str3;
        }

        @NotNull
        public String getInstance() {
            return this.instance;
        }

        @Nullable
        public String getSource() {
            return this.source;
        }

        @Nullable
        public String getRigName() {
            return this.rigName;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            InstanceSource instanceSource = (InstanceSource) obj;
            return Objects.equals(this.instance, instanceSource.instance) && Objects.equals(this.source, instanceSource.source) && Objects.equals(this.rigName, instanceSource.rigName);
        }

        public int hashCode() {
            return Objects.hash(this.instance, this.source, this.rigName);
        }
    }

    public AssetObserverToUpdateCollectorStatus(@NotNull Live live, @NotNull AssetTypeService assetTypeService, @NotNull AssetTypeService assetTypeService2, @NotNull String str, @NotNull CollectorService collectorService) throws Exception {
        this.queries = live.queries();
        this.assetTypeService = assetTypeService;
        this.assetType = str;
        this.collectorTypeService = assetTypeService2;
        this.collectorService = collectorService;
        Clock clock = live.time().clock();
        CollectorPartService collectorPartService = (CollectorPartService) Objects.requireNonNull(assetTypeService2.getPartService(CollectorPartService.class));
        this.assetOfflineEventListener = new AssetOfflineEventListener(assetTypeService, assetTypeService2, str);
        this.assetInternalEventListener = new AssetInternalEventListener(clock, assetTypeService, str, assetTypeService2, collectorPartService, this.cacheCollectorSourcesTime, this.cacheCollectorAssetsRelation);
        try {
            LOGGER.info("will update collectors of assets to offline");
            this.assetOfflineEventListener.changeAllInstancesToOfflineWhenPluginStarted();
        } catch (Exception e) {
            LOGGER.error("Error updating collector of assets states to offline", e);
        }
        collectorService.registerObserver(live, this::onCollectorSettingsChange);
        runGlobalQueriesToUpdateAsset();
    }

    private synchronized void runGlobalQueriesToUpdateAsset() {
        try {
            if (this.globalQueriesAction != null) {
                this.globalQueriesAction.undo();
            }
        } catch (Exception e) {
            LOGGER.error("Unexpected error", e);
        }
        if (this.collectorService.getCollectorsFeatureDisabled()) {
            return;
        }
        try {
            this.globalQueriesAction = this.queries.run(new Query[]{new Query(CollectorExpressions.internalQuery()).description("Monitoring internal events to create collector state").preloadWindow(false).follow().listenWith(new QueryListener.Empty() { // from class: net.intelie.liverig.plugin.collectors.AssetObserverToUpdateCollectorStatus.1
                public void onEvent(QueryEvent queryEvent, boolean z) {
                    AssetInternalEventListener assetInternalEventListener = AssetObserverToUpdateCollectorStatus.this.assetInternalEventListener;
                    assetInternalEventListener.getClass();
                    queryEvent.forEach(SafeConsumer.safeConsumer(assetInternalEventListener::processInternalEvent));
                }
            }), new Query(CollectorExpressions.statusOfflineQuery()).description("Monitoring internal events to change collector state to offline").preloadWindow(false).follow().listenWith(new QueryListener.Empty() { // from class: net.intelie.liverig.plugin.collectors.AssetObserverToUpdateCollectorStatus.2
                public void onEvent(QueryEvent queryEvent, boolean z) {
                    AssetOfflineEventListener assetOfflineEventListener = AssetObserverToUpdateCollectorStatus.this.assetOfflineEventListener;
                    assetOfflineEventListener.getClass();
                    queryEvent.forEach(SafeConsumer.safeConsumer(assetOfflineEventListener::processStatusOfflineEvent));
                }
            })});
        } catch (Exception e2) {
            LOGGER.error("Error starting query to update collector states ", e2);
        }
    }

    private void onCollectorSettingsChange() {
        runGlobalQueriesToUpdateAsset();
        this.assetTypeService.list().forEach(this::update);
    }

    private void runQueriesByAsset(@NotNull String str, @NotNull Asset asset) {
        final AssetKey assetKey = new AssetKey(this.assetType, str, asset.getName());
        if (this.collectorService.getCollectorsFeatureDisabled()) {
            stopQueriesAndClearCacheByAsset(assetKey);
            return;
        }
        NormalizerConfig normalizer = this.assetTypeService.getNormalizer(asset.getId());
        if (normalizer == null || !normalizer.enabled()) {
            stopQueriesAndClearCacheByAsset(assetKey);
            return;
        }
        String filter = normalizer.filter();
        if (filter == null) {
            stopQueriesAndClearCacheByAsset(assetKey);
            return;
        }
        Query listenWith = new Query(CollectorExpressions.assetFilterRawQuery(filter)).description("Raw filter query to update collector state for asset " + filter).preloadWindow(false).follow().listenWith(new QueryListener.Empty() { // from class: net.intelie.liverig.plugin.collectors.AssetObserverToUpdateCollectorStatus.3
            public void onEvent(QueryEvent queryEvent, boolean z) {
                AssetKey assetKey2 = assetKey;
                queryEvent.forEach(map -> {
                    AssetRawEventListener.processRawEvent(map, AssetObserverToUpdateCollectorStatus.this.cacheCollectorSourcesTime, AssetObserverToUpdateCollectorStatus.this.cacheCollectorAssetsRelation, assetKey2);
                });
            }
        });
        stopAssetFilterRawQueries(str);
        try {
            this.collectorRawQueries.put(str, this.queries.run(new Query[]{listenWith}));
        } catch (Exception e) {
            LOGGER.warn("Could not perform queries {}", listenWith);
        }
    }

    private void update(@NotNull Asset asset) {
        clearCacheByAsset(new AssetKey(this.assetType, asset.getId()));
        runQueriesByAsset(asset.getId(), asset);
    }

    @Override // net.intelie.liverig.plugin.assets.AssetNormalizerObserver
    public void normalizerSet(@NotNull String str, @Nullable NormalizerConfig normalizerConfig) {
        update((Asset) Objects.requireNonNull(this.assetTypeService.get(str)));
    }

    @Override // net.intelie.liverig.plugin.assets.AssetLoadedObserver
    public void assetLoaded(@NotNull Asset asset) {
        update(asset);
    }

    @Override // net.intelie.liverig.plugin.assets.AssetObserver
    public void assetDeleted(@NotNull String str) {
        CollectorSummaryUtil.removeAssetOfCollectorState(null, new AssetKey(this.assetType, str), this.collectorTypeService, (CollectorPartService) Objects.requireNonNull(this.collectorTypeService.getPartService(CollectorPartService.class)));
        stopQueriesAndClearCacheByAsset(new AssetKey(this.assetType, str));
    }

    @Override // net.intelie.liverig.plugin.assets.AssetObserver
    public void assetSaved(@NotNull Asset asset) {
        CollectorSummaryUtil.removeAssetOfCollectorState(null, new AssetKey(this.assetType, asset.getId()), this.collectorTypeService, (CollectorPartService) Objects.requireNonNull(this.collectorTypeService.getPartService(CollectorPartService.class)));
        update(asset);
    }

    private void stopQueriesAndClearCacheByAsset(@NotNull AssetKey assetKey) {
        stopAssetFilterRawQueries(assetKey.getAssetId());
        clearCacheByAsset(assetKey);
    }

    private void stopAssetFilterRawQueries(@NotNull String str) {
        try {
            Live.Action remove = this.collectorRawQueries.remove(str);
            if (remove != null) {
                remove.undo();
            }
        } catch (Exception e) {
            LOGGER.error("Unexpected error", e);
        }
    }

    private void clearCacheByAsset(@NotNull AssetKey assetKey) {
        HashSet hashSet = new HashSet();
        this.cacheCollectorAssetsRelation.forEach((instanceSource, set) -> {
            if (set.contains(assetKey)) {
                hashSet.add(instanceSource);
            }
        });
        hashSet.forEach(instanceSource2 -> {
            this.cacheCollectorAssetsRelation.get(instanceSource2).remove(assetKey);
        });
        this.cacheCollectorSourcesTime.remove(assetKey);
    }

    @VisibleForTesting
    ConcurrentHashMap<String, Live.Action> getCollectorRawQueries() {
        return this.collectorRawQueries;
    }
}
