package net.pincette.mongo.streams;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Optional;
import javax.json.JsonObject;
import javax.json.JsonValue;
import net.pincette.json.JsonUtil;
import org.apache.kafka.streams.kstream.KStream;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:net/pincette/mongo/streams/Probe.class */
public class Probe {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/pincette/mongo/streams/Probe$Running.class */
    public static class Running {
        private long count = 0;
        private Instant minute = Instant.now().truncatedTo(ChronoUnit.MINUTES);

        private Running() {
        }
    }

    private Probe() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static KStream<String, JsonObject> stage(KStream<String, JsonObject> kStream, JsonValue jsonValue) {
        net.pincette.util.Util.must(JsonUtil.isObject(jsonValue));
        String string = jsonValue.asJsonObject().getString("name");
        Running running = new Running();
        kStream.mapValues(jsonObject -> {
            return updateRunning(running, string).orElse(null);
        }).filter((str, jsonObject2) -> {
            return jsonObject2 != null;
        }).to(jsonValue.asJsonObject().getString("topic"));
        return kStream;
    }

    private static JsonObject toJson(Running running, String str) {
        return JsonUtil.createObjectBuilder().add("name", str).add("minute", running.minute.toString()).add("count", running.count).build();
    }

    private static Optional<JsonObject> updateRunning(Running running, String str) {
        Instant truncatedTo = Instant.now().truncatedTo(ChronoUnit.MINUTES);
        if (Duration.between(running.minute, truncatedTo).getSeconds() <= 59) {
            running.count++;
            return Optional.empty();
        }
        JsonObject json = toJson(running, str);
        running.count = 0L;
        running.minute = truncatedTo;
        return Optional.of(json);
    }
}
