package co.cask.hydrator.plugin.batch.sink;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.batch.Output;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.etl.api.batch.BatchSinkContext;
import co.cask.cdap.format.StructuredRecordStringConverter;
import co.cask.hydrator.common.ReferenceBatchSink;
import co.cask.hydrator.common.ReferencePluginConfig;
import co.cask.hydrator.common.batch.JobUtils;
import co.cask.hydrator.common.batch.sink.SinkOutputFormatProvider;
import co.cask.hydrator.plugin.batch.ESProperties;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.mr.EsOutputFormat;

@Name("Elasticsearch")
@Description("Elasticsearch Batch Sink takes the structured record from the input source and converts it to a JSON string, then indexes it in Elasticsearch using the index, type, and id specified by the user.")
@Plugin(type = "batchsink")
/* loaded from: input_file:co/cask/hydrator/plugin/batch/sink/BatchElasticsearchSink.class */
public class BatchElasticsearchSink extends ReferenceBatchSink<StructuredRecord, Writable, Writable> {
    private static final String INDEX_DESCRIPTION = "The name of the index where the data will be stored. If the index does not already exist, it will be created using Elasticsearch's default properties.";
    private static final String TYPE_DESCRIPTION = "The name of the type where the data will be stored. If it does not already exist, it will be created.";
    private static final String ID_DESCRIPTION = "The field that will determine the id for the document. It should match a fieldname in the structured record of the input.";
    private static final String HOST_DESCRIPTION = "The hostname and port for the Elasticsearch server; such as localhost:9200.";
    private final ESConfig config;

    /* loaded from: input_file:co/cask/hydrator/plugin/batch/sink/BatchElasticsearchSink$ESConfig.class */
    public static class ESConfig extends ReferencePluginConfig {

        @Name("es.host")
        @Description(BatchElasticsearchSink.HOST_DESCRIPTION)
        private String hostname;

        @Name(ESProperties.INDEX_NAME)
        @Description(BatchElasticsearchSink.INDEX_DESCRIPTION)
        private String index;

        @Name(ESProperties.TYPE_NAME)
        @Description(BatchElasticsearchSink.TYPE_DESCRIPTION)
        private String type;

        @Name(ESProperties.ID_FIELD)
        @Description(BatchElasticsearchSink.ID_DESCRIPTION)
        private String idField;

        public ESConfig(String str, String str2, String str3, String str4, String str5) {
            super(str);
            this.hostname = str2;
            this.index = str3;
            this.type = str4;
            this.idField = str5;
        }
    }

    public BatchElasticsearchSink(ESConfig eSConfig) {
        super(eSConfig);
        this.config = eSConfig;
    }

    public void prepareRun(BatchSinkContext batchSinkContext) throws IOException {
        Job createInstance = JobUtils.createInstance();
        Configuration configuration = createInstance.getConfiguration();
        createInstance.setSpeculativeExecution(false);
        configuration.set(ConfigurationOptions.ES_NODES, this.config.hostname);
        configuration.set(ConfigurationOptions.ES_RESOURCE, String.format("%s/%s", this.config.index, this.config.type));
        configuration.set(ConfigurationOptions.ES_INPUT_JSON, "yes");
        configuration.set(ConfigurationOptions.ES_MAPPING_ID, this.config.idField);
        batchSinkContext.addOutput(Output.of(this.config.referenceName, new SinkOutputFormatProvider((Class<? extends OutputFormat>) EsOutputFormat.class, configuration)).alias(this.config.index));
    }

    public void transform(StructuredRecord structuredRecord, Emitter<KeyValue<Writable, Writable>> emitter) throws Exception {
        emitter.emit(new KeyValue(new Text(StructuredRecordStringConverter.toJsonString(structuredRecord)), new Text(StructuredRecordStringConverter.toJsonString(structuredRecord))));
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((StructuredRecord) obj, (Emitter<KeyValue<Writable, Writable>>) emitter);
    }
}
