package net.intelie.live;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import net.intelie.live.QueryListener;
import net.intelie.live.ScheduledQuerier;
import net.intelie.live.StopReason;
import net.intelie.live.util.QuerySpan;
import net.intelie.pipes.time.ClockScheduler;
import net.intelie.pipes.time.Period;
import net.intelie.pipes.time.SchedulerContext;
import net.intelie.pipes.time.TaskHandle;
import net.intelie.pipes.time.TimeSpan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/intelie/live/ScheduledQueryHandle.class */
public class ScheduledQueryHandle {
    private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledQueryHandle.class);
    private final Listener listener;
    private final ScheduledQuerier querier;
    private final ClockScheduler scheduler;
    private final boolean follow;
    private final Period period;
    private final EventSender status;
    private final String expression;
    private final AtomicBoolean sentEndHistory = new AtomicBoolean(false);
    private final AtomicBoolean started = new AtomicBoolean(false);
    private Query query;
    private TimeSpan span;
    private TaskHandle handle;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/intelie/live/ScheduledQueryHandle$Listener.class */
    public class Listener extends QueryListener.Empty {
        private final QueryListener listener;

        public Listener(QueryListener queryListener) {
            this.listener = queryListener;
        }

        @Override // net.intelie.live.QueryListener.Default, net.intelie.live.QueryListener
        public void onEvent(QueryEvent queryEvent, boolean z) throws Exception {
            this.listener.onEvent(queryEvent, z);
        }

        @Override // net.intelie.live.generated.QueryListenerEmptyBase
        public void onStop(StopInfo stopInfo) throws Exception {
            ScheduledQueryHandle.this.started.set(false);
            ScheduledQueryHandle.this.sentEndHistory.set(false);
            this.listener.onControl(stopInfo);
        }

        @Override // net.intelie.live.generated.QueryListenerEmptyBase
        public void onCustom(ControlEvent controlEvent) throws Exception {
            this.listener.onControl(controlEvent);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/intelie/live/ScheduledQueryHandle$MyListener.class */
    public class MyListener implements ScheduledQuerier.Listener {
        private final long timestamp;
        private final AtomicInteger size;
        private final long start;

        public MyListener(long j, AtomicInteger atomicInteger, long j2) {
            this.timestamp = j;
            this.size = atomicInteger;
            this.start = j2;
        }

        @Override // net.intelie.live.ScheduledQuerier.Listener
        public void onEvent(QueryEvent queryEvent) {
            SilentListener.send(ScheduledQueryHandle.this.listener, ScheduledQueryHandle.this.maybeAddTimestamp(queryEvent, this.timestamp), !ScheduledQueryHandle.this.sentEndHistory.get());
            this.size.addAndGet(queryEvent.size());
        }

        @Override // net.intelie.live.ScheduledQuerier.Listener
        public void onSuccess() {
            long now = ScheduledQueryHandle.this.scheduler.now();
            ScheduledQueryHandle.this.status.send(new MapBuilder().put("expression", ScheduledQueryHandle.this.expression).put("status", "success").put("events", Integer.valueOf(this.size.get())).put("duration", Long.valueOf(now - this.start)).ok());
            if (ScheduledQueryHandle.this.follow) {
                return;
            }
            SilentListener.send(ScheduledQueryHandle.this.listener, new EndHistoryInfo(now, now - this.start));
            SilentListener.send(ScheduledQueryHandle.this.listener, new StopInfo(new StopReason.NoRealtime()));
            SilentListener.send(ScheduledQueryHandle.this.listener, new DestroyInfo());
        }

        @Override // net.intelie.live.ScheduledQuerier.Listener
        public void onError(Throwable th) {
            ScheduledQueryHandle.this.sendError(th, this.start);
        }
    }

    public ScheduledQueryHandle(Query query, ScheduledQuerier scheduledQuerier, Period period, ClockScheduler clockScheduler, EventSender eventSender) {
        this.query = query;
        this.period = period;
        this.status = eventSender;
        this.listener = new Listener(query.getListener());
        this.querier = scheduledQuerier;
        this.follow = query.getFollow();
        this.expression = query.getExpression();
        this.scheduler = clockScheduler;
    }

    public void start() {
        try {
            this.span = QuerySpan.parse(this.query.getSpan());
            long now = this.scheduler.now();
            if ((this.querier instanceof ScheduledQuerier.WithHistory) && !this.span.isPoint()) {
                executeHistory();
            }
            if (!this.follow) {
                executeOnce(now);
                return;
            }
            Preconditions.checkArgument(this.period != null, "No valid @cron expression found");
            maybeSendStartAndEndHistory(now);
            this.querier.touch(now);
            SchedulerContext newContext = this.scheduler.newContext();
            this.handle = newContext.schedule(this.period, (j, j2) -> {
                executeOnce(j2);
            });
            newContext.start();
        } catch (Throwable th) {
            SilentListener.send(this.listener, new StopInfo(new StopReason.Error(th)));
            LOGGER.info("Could not start query. Not scheduling.", th);
        }
    }

    private void executeOnce(long j) {
        long now = this.scheduler.now();
        try {
            maybeSendStartAndEndHistory(now);
            this.querier.execute(j, new MyListener(j, new AtomicInteger(), now));
        } catch (Throwable th) {
            sendError(th, now);
        }
    }

    private void executeHistory() {
        long now = this.scheduler.now();
        try {
            maybeStart();
            long start = this.span.start(now);
            long end = this.span.end(now);
            ((ScheduledQuerier.WithHistory) this.querier).executeHistory(start, end, new MyListener(end, new AtomicInteger(), now));
            maybeEndHistory(end, now);
        } catch (Throwable th) {
            sendError(th, now);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public QueryEvent maybeAddTimestamp(QueryEvent queryEvent, long j) {
        ArrayList arrayList = new ArrayList();
        Iterator it = queryEvent.iterator();
        while (it.hasNext()) {
            Map map = (Map) it.next();
            if (map.get(Event.TIMESTAMP) == null) {
                LinkedHashMap linkedHashMap = new LinkedHashMap();
                linkedHashMap.put(Event.TIMESTAMP, Long.valueOf(j));
                linkedHashMap.putAll(map);
                linkedHashMap.put(Event.TIMESTAMP, Long.valueOf(j));
                map = linkedHashMap;
            }
            arrayList.add(map);
        }
        return new QueryEvent(arrayList);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendError(Throwable th, long j) {
        LOGGER.info("Could not execute query", th);
        this.status.send(new MapBuilder().put("expression", this.expression).put("status", "error").put("duration", Long.valueOf(this.scheduler.now() - j)).put("message", th.getMessage()).ok());
        SilentListener.send(this.listener, new StopInfo(new StopReason.Error(th)));
    }

    private void maybeSendStartAndEndHistory(long j) throws Exception {
        maybeStart();
        maybeEndHistory(j, j);
    }

    private void maybeEndHistory(long j, long j2) {
        if (!this.follow || this.sentEndHistory.getAndSet(true)) {
            return;
        }
        SilentListener.send(this.listener, new EndHistoryInfo(j, this.scheduler.now() - j2));
    }

    private void maybeStart() throws Exception {
        if (this.started.getAndSet(true)) {
            return;
        }
        this.listener.onControl(this.querier.metadata());
    }

    public void stop(StopReason stopReason) {
        if (this.handle != null) {
            this.handle.cancel();
        }
        synchronized (this) {
            SilentListener.send(this.listener, new StopInfo(stopReason));
        }
    }
}
