package co.cask.hydrator.plugin.batch.source;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.batch.Input;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.etl.api.PipelineConfigurer;
import co.cask.cdap.etl.api.batch.BatchRuntimeContext;
import co.cask.cdap.etl.api.batch.BatchSourceContext;
import co.cask.hydrator.common.ReferenceBatchSource;
import co.cask.hydrator.common.ReferencePluginConfig;
import co.cask.hydrator.common.SourceInputFormatProvider;
import co.cask.hydrator.common.batch.JobUtils;
import co.cask.hydrator.plugin.batch.ESProperties;
import co.cask.hydrator.plugin.batch.RecordWritableConverter;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.mr.EsInputFormat;

@Name("Elasticsearch")
@Description("Elasticsearch Batch Source pulls documents from Elasticsearch according to the query specified by the user and converts each document to a structured record ")
@Plugin(type = "batchsource")
/* loaded from: input_file:co/cask/hydrator/plugin/batch/source/ElasticsearchSource.class */
public class ElasticsearchSource extends ReferenceBatchSource<Text, MapWritable, StructuredRecord> {
    private static final String INDEX_DESCRIPTION = "The name of the index to query.";
    private static final String TYPE_DESCRIPTION = "The name of the type where the data is stored.";
    private static final String QUERY_DESCRIPTION = "The query to use to import data from the specified index. See Elasticsearch for query examples.";
    private static final String HOST_DESCRIPTION = "The hostname and port for the Elasticsearch instance; for example, localhost:9200.";
    private static final String SCHEMA_DESCRIPTION = "The schema or mapping of the data in Elasticsearch.";
    private final ESConfig config;
    private Schema schema;

    /* loaded from: input_file:co/cask/hydrator/plugin/batch/source/ElasticsearchSource$ESConfig.class */
    public static class ESConfig extends ReferencePluginConfig {

        @Name("es.host")
        @Description(ElasticsearchSource.HOST_DESCRIPTION)
        private String hostname;

        @Name(ESProperties.INDEX_NAME)
        @Description(ElasticsearchSource.INDEX_DESCRIPTION)
        private String index;

        @Name(ESProperties.TYPE_NAME)
        @Description(ElasticsearchSource.TYPE_DESCRIPTION)
        private String type;

        @Name("query")
        @Description(ElasticsearchSource.QUERY_DESCRIPTION)
        private String query;

        @Name("schema")
        @Description(ElasticsearchSource.SCHEMA_DESCRIPTION)
        private String schema;

        public ESConfig(String str, String str2, String str3, String str4, String str5, String str6) {
            super(str);
            this.hostname = str2;
            this.index = str3;
            this.type = str4;
            this.schema = str6;
            this.query = str5;
        }
    }

    public ElasticsearchSource(ESConfig eSConfig) {
        super(eSConfig);
        this.config = eSConfig;
    }

    private String getResource() {
        return String.format("%s/%s", this.config.index, this.config.type);
    }

    public void initialize(BatchRuntimeContext batchRuntimeContext) {
        this.schema = parseSchema();
    }

    @Override // co.cask.hydrator.common.ReferenceBatchSource
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        try {
            pipelineConfigurer.getStageConfigurer().setOutputSchema(Schema.parseJson(this.config.schema));
        } catch (IOException e) {
            throw new IllegalArgumentException("Invalid output schema : " + e.getMessage(), e);
        }
    }

    public void prepareRun(BatchSourceContext batchSourceContext) throws Exception {
        Job createInstance = JobUtils.createInstance();
        Configuration configuration = createInstance.getConfiguration();
        createInstance.setSpeculativeExecution(false);
        configuration.set(ConfigurationOptions.ES_NODES, this.config.hostname);
        configuration.set(ConfigurationOptions.ES_RESOURCE, getResource());
        configuration.set(ConfigurationOptions.ES_QUERY, this.config.query);
        createInstance.setMapOutputKeyClass(Text.class);
        createInstance.setMapOutputValueClass(MapWritable.class);
        batchSourceContext.setInput(Input.of(this.config.referenceName, new SourceInputFormatProvider((Class<? extends InputFormat>) EsInputFormat.class, configuration)).alias(this.config.index));
    }

    public void transform(KeyValue<Text, MapWritable> keyValue, Emitter<StructuredRecord> emitter) throws Exception {
        emitter.emit(RecordWritableConverter.convertToRecord(keyValue.getValue(), this.schema));
    }

    private Schema parseSchema() {
        try {
            return Schema.parseJson(this.config.schema);
        } catch (IOException e) {
            throw new IllegalArgumentException("Invalid schema: " + e.getMessage());
        }
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((KeyValue<Text, MapWritable>) obj, (Emitter<StructuredRecord>) emitter);
    }
}
