package co.cask.hydrator.plugin.db.batch.source;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Macro;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.batch.Input;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.plugin.EndpointPluginContext;
import co.cask.cdap.api.plugin.PluginProperties;
import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.etl.api.PipelineConfigurer;
import co.cask.cdap.etl.api.batch.BatchRuntimeContext;
import co.cask.cdap.etl.api.batch.BatchSourceContext;
import co.cask.hydrator.common.ReferenceBatchSource;
import co.cask.hydrator.common.ReferencePluginConfig;
import co.cask.hydrator.common.SourceInputFormatProvider;
import co.cask.hydrator.plugin.DBConfig;
import co.cask.hydrator.plugin.DBManager;
import co.cask.hydrator.plugin.DBRecord;
import co.cask.hydrator.plugin.DBUtils;
import co.cask.hydrator.plugin.DriverCleanup;
import co.cask.hydrator.plugin.FieldCase;
import co.cask.hydrator.plugin.StructuredRecordUtils;
import com.google.common.base.Strings;
import java.io.IOException;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import javax.annotation.Nullable;
import javax.ws.rs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name("Database")
@Description("Reads from a database table(s) using a configurable SQL query. Outputs one record for each row returned by the query.")
@Plugin(type = "batchsource")
/* loaded from: input_file:co/cask/hydrator/plugin/db/batch/source/DBSource.class */
public class DBSource extends ReferenceBatchSource<LongWritable, DBRecord, StructuredRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(DBSource.class);
    private final DBSourceConfig sourceConfig;
    private final DBManager dbManager;
    private Class<? extends Driver> driverClass;

    /* loaded from: input_file:co/cask/hydrator/plugin/db/batch/source/DBSource$DBSourceConfig.class */
    public static class DBSourceConfig extends DBConfig {
        public static final String IMPORT_QUERY = "importQuery";
        public static final String BOUNDING_QUERY = "boundingQuery";
        public static final String SPLIT_BY = "splitBy";
        public static final String NUM_SPLITS = "numSplits";
        public static final String SCHEMA = "schema";

        @Name(IMPORT_QUERY)
        @Description("The SELECT query to use to import data from the specified table. You can specify an arbitrary number of columns to import, or import all columns using *. The Query should contain the '$CONDITIONS' string unless numSplits is set to one. For example, 'SELECT * FROM table WHERE $CONDITIONS'. The '$CONDITIONS' stringwill be replaced by 'splitBy' field limits specified by the bounding query.")
        @Macro
        String importQuery;

        @Name(BOUNDING_QUERY)
        @Macro
        @Nullable
        @Description("Bounding Query should return the min and max of the values of the 'splitBy' field. For example, 'SELECT MIN(id),MAX(id) FROM table'. This is required unless numSplits is set to one.")
        String boundingQuery;

        @Name(SPLIT_BY)
        @Macro
        @Nullable
        @Description("Field Name which will be used to generate splits. This is required unless numSplits is set to one.")
        String splitBy;

        @Name(NUM_SPLITS)
        @Macro
        @Nullable
        @Description("The number of splits to generate. If set to one, the boundingQuery is not needed, and no $CONDITIONS string needs to be specified in the importQuery. If not specified, the execution framework will pick a value.")
        Integer numSplits;

        @Name(SCHEMA)
        @Description("The schema of records output by the source. This will be used in place of whatever schema comes back from the query. This should only be used if there is a bug in your jdbc driver. For example, if a column is not correctly getting marked as nullable.")
        @Nullable
        String schema;

        /* JADX INFO: Access modifiers changed from: private */
        public String getImportQuery() {
            return cleanQuery(this.importQuery);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String getBoundingQuery() {
            return cleanQuery(this.boundingQuery);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void validate() {
            boolean z = false;
            if (!containsMacro(NUM_SPLITS) && this.numSplits != null) {
                if (this.numSplits.intValue() < 1) {
                    throw new IllegalArgumentException("Invalid value for numSplits. Must be at least 1, but got " + this.numSplits);
                }
                if (this.numSplits.intValue() == 1) {
                    z = true;
                }
            }
            if (!z && !containsMacro(IMPORT_QUERY) && !getImportQuery().contains("$CONDITIONS")) {
                throw new IllegalArgumentException(String.format("Import Query %s must contain the string '$CONDITIONS'.", this.importQuery));
            }
            if (!z && !containsMacro(SPLIT_BY) && (this.splitBy == null || this.splitBy.isEmpty())) {
                throw new IllegalArgumentException("The splitBy must be specified if numSplits is not set to 1.");
            }
            if (z || containsMacro(BOUNDING_QUERY)) {
                return;
            }
            if (this.boundingQuery == null || this.boundingQuery.isEmpty()) {
                throw new IllegalArgumentException("The boundingQuery must be specified if numSplits is not set to 1.");
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Schema getSchema() {
            try {
                return Schema.parseJson(this.schema);
            } catch (IOException e) {
                throw new IllegalArgumentException(String.format("Unable to parse schema '%s'. Reason: %s", this.schema, e.getMessage()), e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:co/cask/hydrator/plugin/db/batch/source/DBSource$GetSchemaRequest.class */
    public class GetSchemaRequest {
        public String connectionString;

        @Nullable
        public String user;

        @Nullable
        public String password;
        public String jdbcPluginName;

        @Nullable
        public String jdbcPluginType;
        public String query;

        GetSchemaRequest() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String getJDBCPluginType() {
            return this.jdbcPluginType == null ? "jdbc" : this.jdbcPluginType;
        }
    }

    public DBSource(DBSourceConfig dBSourceConfig) {
        super(new ReferencePluginConfig(dBSourceConfig.referenceName));
        this.sourceConfig = dBSourceConfig;
        this.dbManager = new DBManager(dBSourceConfig);
    }

    @Override // co.cask.hydrator.common.ReferenceBatchSource
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        this.dbManager.validateJDBCPluginPipeline(pipelineConfigurer, getJDBCPluginId());
        this.sourceConfig.validate();
        if (Strings.isNullOrEmpty(this.sourceConfig.schema)) {
            return;
        }
        pipelineConfigurer.getStageConfigurer().setOutputSchema(this.sourceConfig.getSchema());
    }

    @Path("getSchema")
    public Schema getSchema(GetSchemaRequest getSchemaRequest, EndpointPluginContext endpointPluginContext) throws IllegalAccessException, SQLException, InstantiationException {
        try {
            DriverCleanup loadPluginClassAndGetDriver = loadPluginClassAndGetDriver(getSchemaRequest, endpointPluginContext);
            try {
                Connection connection = getConnection(getSchemaRequest.connectionString, getSchemaRequest.user, getSchemaRequest.password);
                Throwable th = null;
                try {
                    String str = getSchemaRequest.query;
                    Statement createStatement = connection.createStatement();
                    createStatement.setMaxRows(1);
                    if (str.contains("$CONDITIONS")) {
                        str = removeConditionsClause(str);
                    }
                    Schema recordOf = Schema.recordOf("outputSchema", DBUtils.getSchemaFields(createStatement.executeQuery(str)));
                    if (connection != null) {
                        if (0 != 0) {
                            try {
                                connection.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            connection.close();
                        }
                    }
                    return recordOf;
                } catch (Throwable th3) {
                    if (connection != null) {
                        if (0 != 0) {
                            try {
                                connection.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            connection.close();
                        }
                    }
                    throw th3;
                }
            } finally {
                loadPluginClassAndGetDriver.destroy();
            }
        } catch (Exception e) {
            LOG.error("Exception while performing getSchema", e);
            throw e;
        }
    }

    private static String removeConditionsClause(String str) {
        String upperCase = str.replaceAll("\\s{2,}", " ").toUpperCase();
        if (upperCase.contains("WHERE $CONDITIONS AND")) {
            upperCase = upperCase.replace("$CONDITIONS AND", "");
        } else if (upperCase.contains("WHERE $CONDITIONS")) {
            upperCase = upperCase.replace("WHERE $CONDITIONS", "");
        } else if (upperCase.contains("AND $CONDITIONS")) {
            upperCase = upperCase.replace("AND $CONDITIONS", "");
        }
        return upperCase;
    }

    private DriverCleanup loadPluginClassAndGetDriver(GetSchemaRequest getSchemaRequest, EndpointPluginContext endpointPluginContext) throws IllegalAccessException, InstantiationException, SQLException {
        Class loadPluginClass = endpointPluginContext.loadPluginClass(getSchemaRequest.getJDBCPluginType(), getSchemaRequest.jdbcPluginName, PluginProperties.builder().build());
        if (loadPluginClass == null) {
            throw new InstantiationException(String.format("Unable to load Driver class with plugin type %s and plugin name %s", getSchemaRequest.getJDBCPluginType(), getSchemaRequest.jdbcPluginName));
        }
        try {
            return DBUtils.ensureJDBCDriverIsAvailable(loadPluginClass, getSchemaRequest.connectionString, getSchemaRequest.getJDBCPluginType(), getSchemaRequest.jdbcPluginName);
        } catch (IllegalAccessException | InstantiationException | SQLException e) {
            LOG.error("Unable to load or register driver {}", loadPluginClass, e);
            throw e;
        }
    }

    private Connection getConnection(String str, @Nullable String str2, @Nullable String str3) throws SQLException {
        return str2 == null ? DriverManager.getConnection(str) : DriverManager.getConnection(str, str2, str3);
    }

    public void prepareRun(BatchSourceContext batchSourceContext) throws Exception {
        this.sourceConfig.validate();
        LOG.debug("pluginType = {}; pluginName = {}; connectionString = {}; importQuery = {}; boundingQuery = {}", new Object[]{this.sourceConfig.jdbcPluginType, this.sourceConfig.jdbcPluginName, this.sourceConfig.connectionString, this.sourceConfig.getImportQuery(), this.sourceConfig.getBoundingQuery()});
        Configuration configuration = new Configuration();
        configuration.clear();
        Class loadPluginClass = batchSourceContext.loadPluginClass(getJDBCPluginId());
        if (this.sourceConfig.user == null && this.sourceConfig.password == null) {
            DBConfiguration.configureDB(configuration, loadPluginClass.getName(), this.sourceConfig.connectionString);
        } else {
            DBConfiguration.configureDB(configuration, loadPluginClass.getName(), this.sourceConfig.connectionString, this.sourceConfig.user, this.sourceConfig.password);
        }
        DataDrivenETLDBInputFormat.setInput(configuration, DBRecord.class, this.sourceConfig.getImportQuery(), this.sourceConfig.getBoundingQuery(), this.sourceConfig.getEnableAutoCommit().booleanValue());
        if (this.sourceConfig.numSplits == null || this.sourceConfig.numSplits.intValue() != 1) {
            if (!this.sourceConfig.getImportQuery().contains("$CONDITIONS")) {
                throw new IllegalArgumentException(String.format("Import Query %s must contain the string '$CONDITIONS'.", this.sourceConfig.importQuery));
            }
            configuration.set("mapreduce.jdbc.input.orderby", this.sourceConfig.splitBy);
        }
        if (this.sourceConfig.numSplits != null) {
            configuration.setInt("mapreduce.job.maps", this.sourceConfig.numSplits.intValue());
        }
        if (this.sourceConfig.schema != null) {
            configuration.set(DBUtils.OVERRIDE_SCHEMA, this.sourceConfig.schema);
        }
        batchSourceContext.setInput(Input.of(this.sourceConfig.referenceName, new SourceInputFormatProvider((Class<? extends InputFormat>) DataDrivenETLDBInputFormat.class, configuration)));
    }

    public void initialize(BatchRuntimeContext batchRuntimeContext) throws Exception {
        super.initialize(batchRuntimeContext);
        this.driverClass = batchRuntimeContext.loadPluginClass(getJDBCPluginId());
    }

    public void transform(KeyValue<LongWritable, DBRecord> keyValue, Emitter<StructuredRecord> emitter) throws Exception {
        emitter.emit(StructuredRecordUtils.convertCase(((DBRecord) keyValue.getValue()).getRecord(), FieldCase.toFieldCase(this.sourceConfig.columnNameCase)));
    }

    public void destroy() {
        try {
            DBUtils.cleanup(this.driverClass);
        } finally {
            this.dbManager.destroy();
        }
    }

    private String getJDBCPluginId() {
        return String.format("%s.%s.%s", "source", this.sourceConfig.jdbcPluginType, this.sourceConfig.jdbcPluginName);
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((KeyValue<LongWritable, DBRecord>) obj, (Emitter<StructuredRecord>) emitter);
    }
}
