package co.cask.hydrator.plugin.realtime.sink;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.etl.api.realtime.DataWriter;
import co.cask.cdap.etl.api.realtime.RealtimeContext;
import co.cask.hydrator.common.ReferencePluginConfig;
import co.cask.hydrator.common.ReferenceRealtimeSink;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import java.util.ArrayList;
import org.bson.Document;

@Name("MongoDB")
@Description("CDAP MongoDB Realtime Sink takes StructuredRecord from the previous stage and converts it to BSONDocument and then writes to MongoDB")
@Plugin(type = "realtimesink")
/* loaded from: input_file:co/cask/hydrator/plugin/realtime/sink/MongoDBRealtimeSink.class */
public class MongoDBRealtimeSink extends ReferenceRealtimeSink<StructuredRecord> {
    private final MongoDBConfig config;
    private MongoClient mongoClient;
    private MongoDatabase mongoDatabase;

    /* loaded from: input_file:co/cask/hydrator/plugin/realtime/sink/MongoDBRealtimeSink$MongoDBConfig.class */
    public static class MongoDBConfig extends ReferencePluginConfig {

        @Name("connectionString")
        @Description("MongoDB Connection String (see http://docs.mongodb.org/manual/reference/connection-string); Example: 'mongodb://localhost:27017/analytics.users'.")
        private String connectionString;

        @Name(Properties.DB_NAME)
        @Description("MongoDB Database Name")
        private String dbName;

        @Name(Properties.COLLECTION_NAME)
        @Description("MongoDB Collection Name")
        private String collectionName;

        public MongoDBConfig(String str, String str2, String str3, String str4) {
            super(str);
            this.connectionString = str2;
            this.dbName = str3;
            this.collectionName = str4;
        }
    }

    /* loaded from: input_file:co/cask/hydrator/plugin/realtime/sink/MongoDBRealtimeSink$Properties.class */
    public static class Properties {
        public static final String CONNECTION_STRING = "connectionString";
        public static final String DB_NAME = "dbName";
        public static final String COLLECTION_NAME = "collectionName";
    }

    public MongoDBRealtimeSink(MongoDBConfig mongoDBConfig) {
        super(mongoDBConfig);
        this.config = mongoDBConfig;
    }

    @Override // co.cask.hydrator.common.ReferenceRealtimeSink
    public void initialize(RealtimeContext realtimeContext) throws Exception {
        super.initialize(realtimeContext);
        this.mongoClient = new MongoClient(new MongoClientURI(this.config.connectionString));
        this.mongoDatabase = this.mongoClient.getDatabase(this.config.dbName);
    }

    public int write(Iterable<StructuredRecord> iterable, DataWriter dataWriter) throws Exception {
        int i = 0;
        MongoCollection<Document> collection = this.mongoDatabase.getCollection(this.config.collectionName);
        ArrayList arrayList = new ArrayList();
        for (StructuredRecord structuredRecord : iterable) {
            Document document = new Document();
            for (Schema.Field field : structuredRecord.getSchema().getFields()) {
                document.append(field.getName(), structuredRecord.get(field.getName()));
            }
            arrayList.add(document);
            i++;
        }
        collection.insertMany(arrayList);
        return i;
    }
}
