package co.cask.hydrator.plugin.batch.source;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Macro;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.batch.Input;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.etl.api.PipelineConfigurer;
import co.cask.cdap.etl.api.batch.BatchRuntimeContext;
import co.cask.cdap.etl.api.batch.BatchSourceContext;
import co.cask.hydrator.common.LineageRecorder;
import co.cask.hydrator.common.ReferenceBatchSource;
import co.cask.hydrator.common.ReferencePluginConfig;
import co.cask.hydrator.common.SourceInputFormatProvider;
import co.cask.hydrator.plugin.BSONConverter;
import com.google.common.base.Strings;
import com.mongodb.hadoop.MongoInputFormat;
import com.mongodb.hadoop.splitter.MongoSplitter;
import com.mongodb.hadoop.splitter.StandaloneMongoSplitter;
import com.mongodb.hadoop.util.MongoConfigUtil;
import java.io.IOException;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.bson.BSONObject;

@Name("MongoDB")
@Description("MongoDB Batch Source will read documents from MongoDB and convert each document into a StructuredRecord with the help of the specified Schema. ")
@Plugin(type = "batchsource")
/* loaded from: input_file:co/cask/hydrator/plugin/batch/source/MongoDBBatchSource.class */
public class MongoDBBatchSource extends ReferenceBatchSource<Object, BSONObject, StructuredRecord> {
    private final MongoDBConfig config;
    private BSONConverter bsonConverter;

    /* loaded from: input_file:co/cask/hydrator/plugin/batch/source/MongoDBBatchSource$MongoDBConfig.class */
    public static class MongoDBConfig extends ReferencePluginConfig {

        @Name("connectionString")
        @Description("MongoDB Connection String (see http://docs.mongodb.org/manual/reference/connection-string); Example: 'mongodb://localhost:27017/analytics.users'.")
        @Macro
        private String connectionString;

        @Name(Properties.AUTH_CONNECTION_STRING)
        @Macro
        @Nullable
        @Description("Auxiliary MongoDB connection string to authenticate against when constructing splits.")
        private String authConnectionString;

        @Name(Properties.SCHEMA)
        @Description("The schema for the data as it will be formatted in CDAP. Sample schema: {\n    \"type\": \"record\",\n    \"name\": \"schemaBody\",\n    \"fields\": [\n        {\n            \"name\": \"name\",\n            \"type\": \"string\"\n        },\n        {\n            \"name\": \"age\",\n            \"type\": \"int\"\n        }    ]\n}")
        private String schema;

        @Name(Properties.INPUT_QUERY)
        @Macro
        @Nullable
        @Description("Optionally filter the input collection with a query. This query must be represented in JSON format, and use the MongoDB extended JSON format to represent non-native JSON data types.")
        private String inputQuery;

        @Name(Properties.INPUT_FIELDS)
        @Macro
        @Nullable
        @Description("A projection document limiting the fields that appear in each document. If no projection document is provided, all fields will be read.")
        private String inputFields;

        @Name(Properties.SPLITTER_CLASS)
        @Macro
        @Nullable
        @Description("The name of the Splitter class to use. If left empty, the MongoDB Hadoop Connector will attempt to make a best guess as to what Splitter to use.")
        private String splitterClass;

        public MongoDBConfig(String str, String str2, String str3, String str4, String str5, String str6, String str7) {
            super(str);
            this.connectionString = str2;
            this.authConnectionString = str3;
            this.schema = str4;
            this.inputQuery = str5;
            this.inputFields = str6;
            this.splitterClass = str7;
        }

        public Schema getSchema() {
            if (this.schema == null) {
                throw new IllegalArgumentException("Schema cannot be null.");
            }
            try {
                return Schema.parseJson(this.schema);
            } catch (IOException e) {
                throw new IllegalArgumentException(String.format("Unable to parse schema '%s'. Reason: %s", this.schema, e.getMessage()), e);
            }
        }
    }

    /* loaded from: input_file:co/cask/hydrator/plugin/batch/source/MongoDBBatchSource$Properties.class */
    public static class Properties {
        public static final String AUTH_CONNECTION_STRING = "authConnectionString";
        public static final String CONNECTION_STRING = "connectionString";
        public static final String SCHEMA = "schema";
        public static final String INPUT_QUERY = "inputQuery";
        public static final String INPUT_FIELDS = "inputFields";
        public static final String SPLITTER_CLASS = "splitterClass";
    }

    public MongoDBBatchSource(MongoDBConfig mongoDBConfig) {
        super(mongoDBConfig);
        this.config = mongoDBConfig;
    }

    @Override // co.cask.hydrator.common.ReferenceBatchSource
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        Schema schema = this.config.getSchema();
        BSONConverter.validateSchema(schema);
        pipelineConfigurer.getStageConfigurer().setOutputSchema(schema);
    }

    public void prepareRun(BatchSourceContext batchSourceContext) throws Exception {
        Configuration configuration = new Configuration();
        configuration.clear();
        MongoConfigUtil.setInputFormat(configuration, MongoInputFormat.class);
        MongoConfigUtil.setInputURI(configuration, this.config.connectionString);
        if (!Strings.isNullOrEmpty(this.config.inputQuery)) {
            MongoConfigUtil.setQuery(configuration, this.config.inputQuery);
        }
        if (!Strings.isNullOrEmpty(this.config.authConnectionString)) {
            MongoConfigUtil.setAuthURI(configuration, this.config.authConnectionString);
        }
        if (!Strings.isNullOrEmpty(this.config.inputFields)) {
            MongoConfigUtil.setFields(configuration, this.config.inputFields);
        }
        if (!Strings.isNullOrEmpty(this.config.splitterClass)) {
            MongoConfigUtil.setSplitterClass(configuration, getClass().getClassLoader().loadClass(String.format("%s.%s", StandaloneMongoSplitter.class.getPackage().getName(), this.config.splitterClass)).asSubclass(MongoSplitter.class));
        }
        new LineageRecorder(batchSourceContext, this.config.referenceName).createExternalDataset(this.config.getSchema());
        batchSourceContext.setInput(Input.of(this.config.referenceName, new SourceInputFormatProvider(MongoConfigUtil.getInputFormat(configuration), configuration)));
    }

    public void initialize(BatchRuntimeContext batchRuntimeContext) throws Exception {
        super.initialize(batchRuntimeContext);
        this.bsonConverter = new BSONConverter(this.config.getSchema());
    }

    public void transform(KeyValue<Object, BSONObject> keyValue, Emitter<StructuredRecord> emitter) throws Exception {
        emitter.emit(this.bsonConverter.transform((BSONObject) keyValue.getValue()));
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((KeyValue<Object, BSONObject>) obj, (Emitter<StructuredRecord>) emitter);
    }
}
