package io.github.devlibx.miscellaneous.flink.store.aerospike;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Bin;
import com.aerospike.client.Host;
import com.aerospike.client.Record;
import com.aerospike.client.policy.ClientPolicy;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.WritePolicy;
import com.google.common.base.Strings;
import io.gitbub.devlibx.easy.helper.json.JsonUtils;
import io.gitbub.devlibx.easy.helper.map.StringObjectMap;
import io.github.devlibx.easy.flink.utils.v2.config.AerospikeConfig;
import io.github.devlibx.easy.flink.utils.v2.config.Configuration;
import io.github.devlibx.miscellaneous.flink.store.GenericState;
import io.github.devlibx.miscellaneous.flink.store.IGenericStateStore;
import io.github.devlibx.miscellaneous.flink.store.Key;
import java.io.Serializable;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/github/devlibx/miscellaneous/flink/store/aerospike/AerospikeBackedStateStore.class */
public class AerospikeBackedStateStore implements IGenericStateStore, Serializable {
    private static final Logger log = LoggerFactory.getLogger(AerospikeBackedStateStore.class);
    private final AerospikeConfig aerospikeConfig;
    private final Configuration configuration;
    private final AerospikeClient aerospikeClient;
    private final boolean throwExceptionOnWriteError;
    private final boolean throwExceptionOnReadError;
    private final StringObjectMap aerospikeExtraProperties;

    public AerospikeBackedStateStore(AerospikeConfig aerospikeConfig, Configuration configuration) {
        this.aerospikeConfig = aerospikeConfig;
        this.configuration = configuration;
        this.aerospikeExtraProperties = aerospikeConfig.getProperties() == null ? new StringObjectMap() : aerospikeConfig.getProperties();
        if (!aerospikeConfig.isEnabled() || aerospikeConfig.hosts == null || aerospikeConfig.hosts.isEmpty()) {
            this.aerospikeClient = null;
            this.throwExceptionOnWriteError = false;
            this.throwExceptionOnReadError = false;
            return;
        }
        Host[] hostArr = new Host[aerospikeConfig.hosts.size()];
        for (int i = 0; i < aerospikeConfig.hosts.size(); i++) {
            hostArr[i] = new Host(((AerospikeConfig.Host) aerospikeConfig.hosts.get(i)).getHost(), ((AerospikeConfig.Host) aerospikeConfig.hosts.get(i)).getPort());
        }
        this.throwExceptionOnWriteError = this.aerospikeExtraProperties.getBoolean("throwExceptionOnWriteError", false).booleanValue();
        this.throwExceptionOnReadError = this.aerospikeExtraProperties.getBoolean("throwExceptionOnReadError", false).booleanValue();
        ClientPolicy clientPolicy = new ClientPolicy();
        if (!Strings.isNullOrEmpty(aerospikeConfig.getClusterName())) {
            clientPolicy.clusterName = aerospikeConfig.getClusterName();
        }
        if (!Strings.isNullOrEmpty(aerospikeConfig.getUser())) {
            clientPolicy.user = aerospikeConfig.getUser();
        }
        if (!Strings.isNullOrEmpty(aerospikeConfig.getPassword())) {
            clientPolicy.password = aerospikeConfig.getPassword();
        }
        clientPolicy.timeout = this.aerospikeExtraProperties.getInt("timeout", 1000).intValue();
        clientPolicy.writePolicyDefault = new WritePolicy();
        clientPolicy.writePolicyDefault.socketTimeout = this.aerospikeExtraProperties.getInt("writePolicy.socketTimeout", 1000).intValue();
        clientPolicy.readPolicyDefault = new Policy();
        clientPolicy.readPolicyDefault.socketTimeout = this.aerospikeExtraProperties.getInt("readPolicy.socketTimeout", 1000).intValue();
        this.aerospikeClient = new AerospikeClient(clientPolicy, hostArr);
    }

    @Override // io.github.devlibx.miscellaneous.flink.store.IGenericStateStore
    public void persist(Key key, GenericState genericState) {
        if (this.aerospikeClient != null) {
            String str = "";
            try {
                str = key.getKey() + "#" + key.getSubKey();
                com.aerospike.client.Key key2 = new com.aerospike.client.Key(this.aerospikeConfig.namespace, this.aerospikeConfig.set, str);
                Bin bin = new Bin("data", JsonUtils.asJson(genericState.getData()));
                Bin bin2 = new Bin("updated_at", System.currentTimeMillis());
                WritePolicy writePolicy = new WritePolicy();
                if (this.aerospikeExtraProperties.getBoolean("enable-send-key", false).booleanValue()) {
                    writePolicy.sendKey = true;
                }
                writePolicy.setTimeout(this.aerospikeConfig.getProperties().getInt("writePolicy.timeout", 1000).intValue());
                DateTime now = DateTime.now();
                if (genericState.getTtl() != null && genericState.getTtl().isAfter(now)) {
                    writePolicy.expiration = ((int) (genericState.getTtl().getMillis() - now.getMillis())) / 1000;
                }
                this.aerospikeClient.put(writePolicy, key2, new Bin[]{bin, bin2});
                if (this.aerospikeExtraProperties.getBoolean("debug-aerospike-enabled-write", false).booleanValue()) {
                    log.info("write to AS: key={}, data={}", key2, bin);
                }
            } catch (Exception e) {
                log.info("failed to write to AS: key={}, data={}", str, genericState);
                if (this.throwExceptionOnWriteError) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    @Override // io.github.devlibx.miscellaneous.flink.store.IGenericStateStore
    public GenericState get(Key key) {
        if (this.aerospikeClient == null) {
            return null;
        }
        String str = "";
        try {
            str = key.getKey() + "#" + key.getSubKey();
            com.aerospike.client.Key key2 = new com.aerospike.client.Key(this.aerospikeConfig.namespace, this.aerospikeConfig.set, str);
            Policy policy = new Policy();
            policy.setTimeout(this.aerospikeConfig.getProperties().getInt("readPolicy.timeout", 1000).intValue());
            Record record = this.aerospikeClient.get(policy, key2);
            if (this.aerospikeExtraProperties.getBoolean("debug-aerospike-enabled-read", false).booleanValue()) {
                log.info("read from AS: key={}, data={}", str, record);
            }
            GenericState genericState = new GenericState();
            genericState.setData(JsonUtils.convertAsStringObjectMap(record.getString("data")));
            return genericState;
        } catch (Exception e) {
            log.info("failed to read from AS: key={}", str);
            if (this.throwExceptionOnReadError) {
                throw new RuntimeException(e);
            }
            return null;
        }
    }
}
