package net.pincette.mongo.streams;

import java.time.Duration;
import java.util.Optional;
import java.util.function.Function;
import javax.json.JsonObject;
import javax.json.JsonValue;
import net.pincette.function.SideEffect;
import net.pincette.jes.util.Kafka;
import net.pincette.json.JsonUtil;
import net.pincette.mongo.Expression;
import net.pincette.util.Pair;
import net.pincette.util.ScheduledCompletionStage;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;

/* loaded from: input_file:net/pincette/mongo/streams/Delay.class */
class Delay {
    private static final String DURATION = "duration";
    private static final String TOPIC = "topic";

    private Delay() {
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void sendAfter(String str, JsonObject jsonObject, long j, String str2, Context context) {
        ScheduledCompletionStage.composeAsyncAfter(() -> {
            return net.pincette.util.Util.tryToGetForever(() -> {
                return Kafka.send(context.producer, new ProducerRecord(str2, str, jsonObject));
            }, Util.RETRY, exc -> {
                Util.exceptionLogger(exc, "$delay", context);
            });
        }, Duration.ofMillis(j)).thenApply(bool -> {
            return (Boolean) net.pincette.util.Util.must(bool, bool -> {
                return bool.booleanValue();
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static KStream<String, JsonObject> stage(KStream<String, JsonObject> kStream, JsonValue jsonValue, Context context) {
        net.pincette.util.Util.must(JsonUtil.isObject(jsonValue));
        JsonObject asJsonObject = jsonValue.asJsonObject();
        Function function = Expression.function(asJsonObject.getValue("/duration"), context.features);
        Function function2 = Expression.function(asJsonObject.getValue("/topic"), context.features);
        return kStream.map((str, jsonObject) -> {
            return (KeyValue) Optional.of(Pair.pair((JsonValue) function.apply(jsonObject), (JsonValue) function2.apply(jsonObject))).filter(pair -> {
                return JsonUtil.isLong((JsonValue) pair.first) && JsonUtil.isString((JsonValue) pair.second);
            }).map(pair2 -> {
                return (KeyValue) SideEffect.run(() -> {
                    sendAfter(str, jsonObject, JsonUtil.asLong((JsonValue) pair2.first), JsonUtil.asString((JsonValue) pair2.second).getString(), context);
                }).andThenGet(() -> {
                    return new KeyValue(str, (Object) null);
                });
            }).orElseGet(() -> {
                return new KeyValue(str, jsonObject);
            });
        }).filter((str2, jsonObject2) -> {
            return jsonObject2 != null;
        });
    }
}
