package co.cask.hydrator.plugin.realtime;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.plugin.PluginConfig;
import co.cask.cdap.etl.api.realtime.DataWriter;
import co.cask.cdap.etl.api.realtime.RealtimeContext;
import co.cask.cdap.etl.api.realtime.RealtimeSink;
import co.cask.cdap.format.StructuredRecordStringConverter;
import co.cask.hydrator.plugin.batch.ESProperties;
import com.google.common.base.Strings;
import javax.annotation.Nullable;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name("Elasticsearch")
@Description("CDAP Elasticsearch Realtime Sink takes the structured record from the input source and converts it to a JSON string, then indexes it in Elasticsearch using the index, type, and id specified by the user.")
@Plugin(type = "realtimesink")
/* loaded from: input_file:co/cask/hydrator/plugin/realtime/RealtimeElasticsearchSink.class */
public class RealtimeElasticsearchSink extends RealtimeSink<StructuredRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(RealtimeElasticsearchSink.class);
    private static final String INDEX_DESCRIPTION = "The name of the index where the data will be stored. If the index does not already exist, it will be created using Elasticsearch's default properties.";
    private static final String TYPE_DESCRIPTION = "The name of the type where the data will be stored. If it does not already exist, it will be created.";
    private static final String ID_DESCRIPTION = "The field that will determine the id for the document. It should match a field name in the structured record of the input.";
    private static final String TRANSPORT_ADDRESS_DESCRIPTION = "The addresses for nodes. Specify the address for at least one node, and separate others by commas. Other nodes will be sniffed out. For example: host1:9300,host2:9300.";
    private static final String CLUSTER_DESCRIPTION = "The name of the cluster to connect to. Defaults to 'elasticsearch'.";
    private final RealtimeESSinkConfig realtimeESSinkConfig;
    private TransportClient client;

    /* loaded from: input_file:co/cask/hydrator/plugin/realtime/RealtimeElasticsearchSink$RealtimeESSinkConfig.class */
    public static class RealtimeESSinkConfig extends PluginConfig {

        @Name(ESProperties.INDEX_NAME)
        @Description(RealtimeElasticsearchSink.INDEX_DESCRIPTION)
        private String index;

        @Name(ESProperties.TYPE_NAME)
        @Description(RealtimeElasticsearchSink.TYPE_DESCRIPTION)
        private String type;

        @Name(ESProperties.ID_FIELD)
        @Description(RealtimeElasticsearchSink.ID_DESCRIPTION)
        @Nullable
        private String idField;

        @Name(ESProperties.TRANSPORT_ADDRESSES)
        @Description(RealtimeElasticsearchSink.TRANSPORT_ADDRESS_DESCRIPTION)
        private String transportAddresses;

        @Name(ESProperties.CLUSTER)
        @Description(RealtimeElasticsearchSink.CLUSTER_DESCRIPTION)
        @Nullable
        private String cluster;

        public RealtimeESSinkConfig(String str, String str2, @Nullable String str3, String str4, @Nullable String str5) {
            this.index = str;
            this.type = str2;
            this.idField = str3;
            this.transportAddresses = str4;
            this.cluster = str5;
        }
    }

    public RealtimeElasticsearchSink(RealtimeESSinkConfig realtimeESSinkConfig) {
        this.realtimeESSinkConfig = realtimeESSinkConfig;
    }

    public void initialize(RealtimeContext realtimeContext) {
        this.realtimeESSinkConfig.cluster = Strings.isNullOrEmpty(this.realtimeESSinkConfig.cluster) ? "elasticsearch" : this.realtimeESSinkConfig.cluster;
        this.client = new TransportClient(ImmutableSettings.settingsBuilder().put("node.name", "cdap").put(ClusterName.SETTING, this.realtimeESSinkConfig.cluster).put("client.transport.sniff", true).build());
        for (String str : this.realtimeESSinkConfig.transportAddresses.split(StringUtils.DEFAULT_DELIMITER)) {
            this.client.addTransportAddress(new InetSocketTransportAddress(str.split(":")[0], Integer.valueOf(str.split(":")[1]).intValue()));
        }
    }

    public int write(Iterable<StructuredRecord> iterable, DataWriter dataWriter) throws Exception {
        int i = 0;
        BulkRequestBuilder prepareBulk = this.client.prepareBulk();
        for (StructuredRecord structuredRecord : iterable) {
            if (Strings.isNullOrEmpty(this.realtimeESSinkConfig.idField)) {
                prepareBulk.add(this.client.prepareIndex(this.realtimeESSinkConfig.index, this.realtimeESSinkConfig.type).setSource(StructuredRecordStringConverter.toJsonString(structuredRecord)));
            } else if (structuredRecord.get(this.realtimeESSinkConfig.idField) == null) {
                LOG.debug("Found null data in id field. Skipping record.");
            } else {
                prepareBulk.add(this.client.prepareIndex(this.realtimeESSinkConfig.index, this.realtimeESSinkConfig.type, structuredRecord.get(this.realtimeESSinkConfig.idField).toString()).setSource(StructuredRecordStringConverter.toJsonString(structuredRecord)));
            }
            i++;
        }
        BulkResponse actionGet = prepareBulk.execute().actionGet();
        if (actionGet.hasFailures()) {
            for (BulkItemResponse bulkItemResponse : actionGet.getItems()) {
                if (bulkItemResponse.isFailed()) {
                    i--;
                    LOG.debug(bulkItemResponse.getFailureMessage());
                }
            }
        }
        return i;
    }

    public void destroy() {
        this.client.close();
    }
}
