package net.intelie.liverig.plugin.normalizer;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import net.intelie.live.Event;
import net.intelie.live.EventIterator;
import net.intelie.live.EventLobby;
import net.intelie.live.ExtensionQualifier;
import net.intelie.live.Live;
import net.intelie.live.ProgressInfo;
import net.intelie.live.StorageQuery;
import net.intelie.live.StorageQueryOptions;
import net.intelie.liverig.plugin.curves.NormalizerConfigFieldService;
import net.intelie.liverig.plugin.curves.StandardCurves;
import net.intelie.liverig.plugin.guava.base.Strings;
import net.intelie.pipes.time.Clock;
import net.intelie.pipes.time.TimeSpan;
import net.intelie.pipes.time.TimeSpanParser;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/intelie/liverig/plugin/normalizer/NormalizerReprocessService.class */
public class NormalizerReprocessService {
    private static final Logger LOGGER = LoggerFactory.getLogger(NormalizerReprocessService.class);
    private static final String TIMESTAMP = "timestamp";
    private static final String INDEX_TIMESTAMP = "index_timestamp";
    private static final String RAW_INDEX_TIMESTAMP = "liverig__index__timestamp";
    private static final String RAW_LIVERIG_TIMESTAMP = "liverig__timestamp";
    private final Live live;
    private final StandardCurves standardCurves;
    private final DataNormalizationService normalizer;
    private final ExecutorService executor;
    private final Clock clock;
    private final EventLobby lobby;
    private final NormalizerConfigFieldService normalizerConfigFieldService;

    public NormalizerReprocessService(Live live, StandardCurves standardCurves, DataNormalizationService dataNormalizationService, NormalizerConfigFieldService normalizerConfigFieldService) throws Exception {
        this.live = live;
        this.standardCurves = standardCurves;
        this.normalizer = dataNormalizationService;
        this.executor = live.system().requestExecutor(1, 50, "reprocess", true);
        this.clock = live.time().clock();
        this.lobby = live.engine().getEventLobby();
        this.normalizerConfigFieldService = normalizerConfigFieldService;
    }

    private static long maybeTimestamp(Object obj) {
        if (obj instanceof Number) {
            return ((Number) obj).longValue();
        }
        return 0L;
    }

    public NormalizerReprocessStatus reprocess(@NotNull ExtensionQualifier extensionQualifier, @NotNull NormalizerConfig normalizerConfig, @NotNull NormalizerReprocessRequest normalizerReprocessRequest, @Nullable ExtraFieldsSource extraFieldsSource) {
        NormalizerReprocessStatus normalizerReprocessStatus = new NormalizerReprocessStatus(normalizerReprocessRequest);
        try {
            this.executor.submit(() -> {
                reprocess(normalizerReprocessStatus, extensionQualifier, normalizerConfig, normalizerReprocessRequest, extraFieldsSource);
            });
        } catch (RejectedExecutionException e) {
            error(normalizerReprocessStatus, normalizerReprocessRequest, e, this.clock.now());
        }
        return normalizerReprocessStatus;
    }

    private void reprocess(@NotNull NormalizerReprocessStatus normalizerReprocessStatus, @NotNull ExtensionQualifier extensionQualifier, @NotNull NormalizerConfig normalizerConfig, @NotNull NormalizerReprocessRequest normalizerReprocessRequest, @Nullable ExtraFieldsSource extraFieldsSource) {
        NormalizerReprocessReporter normalizerReprocessReporter = new NormalizerReprocessReporter(this.lobby);
        long now = this.clock.now();
        try {
            normalizerReprocessReporter.reportEventStart(normalizerReprocessRequest.getParentId(), Integer.parseInt(normalizerReprocessRequest.getId()), normalizerReprocessRequest.getType(), normalizerConfig.name());
            if (!normalizerConfig.enabled()) {
                normalizerReprocessReporter.reportEventDisable(normalizerReprocessRequest.getParentId(), Integer.parseInt(normalizerReprocessRequest.getId()), normalizerReprocessRequest.getType(), normalizerConfig.name(), this.clock.now() - now);
                normalizerReprocessStatus.setDisable(this.clock.now());
                LOGGER.info("Reprocess {} disabled", normalizerReprocessRequest);
                return;
            }
            boolean useSourceTimestamp = normalizerReprocessRequest.useSourceTimestamp();
            long expandSpan = useSourceTimestamp ? normalizerReprocessRequest.getExpandSpan() : 0L;
            TimeSpan parse = new TimeSpanParser().parse(normalizerReprocessRequest.getSpan());
            long start = parse.start(now);
            long end = parse.end(now);
            long j = start - expandSpan;
            long j2 = end + expandSpan;
            if (start > end) {
                throw new IllegalArgumentException("startTime>endTime");
            }
            LOGGER.info("Reprocessing {} start {} end {} (expanded start {} end {})", new Object[]{normalizerReprocessRequest, Long.valueOf(start), Long.valueOf(end), Long.valueOf(j), Long.valueOf(j2)});
            if (normalizerReprocessRequest.isPurge()) {
                EventIterator query = this.live.engine().getMainStorage().query(new StorageQuery(normalizerConfig.event_type()).withSelect(new String[]{TIMESTAMP, INDEX_TIMESTAMP}).withFlags(new StorageQuery.Flag[]{StorageQuery.Flag.NOASYNC, StorageQuery.Flag.NOCACHE}), j, now, new StorageQueryOptions(true, true));
                Throwable th = null;
                try {
                    try {
                        int i = 0;
                        int i2 = 0;
                        int count = query.count();
                        normalizerReprocessStatus.setPurgeProgress(new ProgressInfo(0L, count, 0L));
                        while (query.moveNext()) {
                            Event current = query.current();
                            long maybeTimestamp = useSourceTimestamp ? maybeTimestamp(current.get(INDEX_TIMESTAMP)) : current.timestamp();
                            if (maybeTimestamp >= start && maybeTimestamp < end) {
                                query.delete();
                            }
                            i++;
                            if (i - i2 > count / 1000) {
                                i2 = i;
                                normalizerReprocessStatus.setPurgeProgress(new ProgressInfo(Math.min(i, count), count, 0L));
                            }
                        }
                        normalizerReprocessStatus.setPurgeProgress(new ProgressInfo(count, count, 0L));
                        if (query != null) {
                            if (0 != 0) {
                                try {
                                    query.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                query.close();
                            }
                        }
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } finally {
                }
            }
            String registerIfNoCollisions = this.normalizerConfigFieldService.registerIfNoCollisions(normalizerConfig.normalizationConfigFields(this.standardCurves));
            AtomicReference<NormalizationLatch> atomicReference = new AtomicReference<>();
            try {
                Live.Action reprocess = normalizerConfig.reprocess(this.live, extensionQualifier, this.standardCurves, this.normalizer, extraFieldsSource, "from ts " + j + " to ts " + j2, normalizerReprocessStatus, useSourceTimestamp ? "(liverig__index__timestamp:[" + start + ", " + end + ") || " + RAW_LIVERIG_TIMESTAMP + ":[" + start + ", " + end + "))" : null, atomicReference, registerIfNoCollisions);
                Throwable th4 = null;
                String awaitWithReason = atomicReference.get().awaitWithReason();
                if (Strings.isNullOrEmpty(awaitWithReason)) {
                    normalizerReprocessReporter.reportEventEnd(normalizerReprocessRequest.getParentId(), Integer.parseInt(normalizerReprocessRequest.getId()), normalizerReprocessRequest.getType(), normalizerConfig.name(), this.clock.now() - now);
                    normalizerReprocessStatus.setDone(true, this.clock.now());
                } else {
                    normalizerReprocessReporter.reportEventError(normalizerReprocessRequest.getParentId(), Integer.parseInt(normalizerReprocessRequest.getId()), normalizerReprocessRequest.getType(), normalizerConfig.name(), this.clock.now() - now, awaitWithReason);
                    normalizerReprocessStatus.setError(awaitWithReason, this.clock.now());
                }
                if (reprocess != null) {
                    if (0 != 0) {
                        try {
                            reprocess.close();
                        } catch (Throwable th5) {
                            th4.addSuppressed(th5);
                        }
                    } else {
                        reprocess.close();
                    }
                }
            } finally {
            }
        } catch (Throwable th6) {
            if (th6 instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            normalizerReprocessReporter.reportEventError(normalizerReprocessRequest.getParentId(), Integer.parseInt(normalizerReprocessRequest.getId()), normalizerReprocessRequest.getType(), normalizerConfig.name(), this.clock.now() - now, th6.getMessage());
            error(normalizerReprocessStatus, normalizerReprocessRequest, th6, this.clock.now());
        }
    }

    private void error(@NotNull NormalizerReprocessStatus normalizerReprocessStatus, @NotNull NormalizerReprocessRequest normalizerReprocessRequest, Throwable th, long j) {
        normalizerReprocessStatus.setError(th.getClass().getName() + ": " + th.getMessage(), j);
        LOGGER.error("Error reprocessing {}", normalizerReprocessRequest, th);
    }
}
