package io.github.devlibx.miscellaneous.flink.store;

import io.gitbub.devlibx.easy.helper.json.JsonUtils;
import io.gitbub.devlibx.easy.helper.map.StringObjectMap;
import io.github.devlibx.miscellaneous.flink.common.KeyPair;
import io.github.devlibx.miscellaneous.flink.store.GenericState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/github/devlibx/miscellaneous/flink/store/GenericTimeWindowAggregationStoreSink.class */
public class GenericTimeWindowAggregationStoreSink extends RichSinkFunction<StringObjectMap> {
    private static final Logger log = LoggerFactory.getLogger(GenericTimeWindowAggregationStoreSink.class);
    private final IGenericStateStore genericStateStore;

    public GenericTimeWindowAggregationStoreSink(IGenericStateStore iGenericStateStore) {
        this.genericStateStore = iGenericStateStore;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.genericStateStore.open(configuration);
    }

    public void invoke(StringObjectMap stringObjectMap, SinkFunction.Context context) throws Exception {
        super.invoke(stringObjectMap, context);
        KeyPair keyPair = (KeyPair) stringObjectMap.get("key_pair", KeyPair.class);
        GenericState.GenericStateBuilder data = GenericState.builder().data(JsonUtils.convertAsStringObjectMap(JsonUtils.asJson(stringObjectMap.get("aggregation"))));
        if (stringObjectMap.containsKey("ttl")) {
            if (stringObjectMap.get("ttl") instanceof Number) {
                data.ttl(stringObjectMap.getDateTimeFromMiles("ttl"));
            } else if (stringObjectMap.get("ttl") instanceof DateTime) {
                data.ttl((DateTime) stringObjectMap.get("ttl", DateTime.class));
            }
        }
        this.genericStateStore.persist(keyPair.buildKey(), data.build());
    }

    public void finish() throws Exception {
        super.finish();
        this.genericStateStore.finish();
    }
}
