package co.cask.hydrator.format.plugin;

import co.cask.cdap.api.data.batch.Input;
import co.cask.cdap.api.data.batch.InputFormatProvider;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.plugin.PluginConfig;
import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.etl.api.PipelineConfigurer;
import co.cask.cdap.etl.api.batch.BatchSource;
import co.cask.cdap.etl.api.batch.BatchSourceContext;
import co.cask.hydrator.common.LineageRecorder;
import co.cask.hydrator.common.SourceInputFormatProvider;
import co.cask.hydrator.common.batch.JobUtils;
import co.cask.hydrator.format.FileFormat;
import co.cask.hydrator.format.RegexPathFilter;
import co.cask.hydrator.format.input.EmptyInputFormat;
import co.cask.hydrator.format.plugin.FileSourceProperties;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

/* loaded from: input_file:lib/format-common-2.1.2.jar:co/cask/hydrator/format/plugin/AbstractFileSource.class */
public abstract class AbstractFileSource<T extends PluginConfig & FileSourceProperties> extends BatchSource<NullWritable, StructuredRecord, StructuredRecord> {
    private static final String FORMAT_PLUGIN_ID = "format";
    private final T config;

    protected AbstractFileSource(T t) {
        this.config = t;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        this.config.validate();
        Schema schema = this.config.getSchema();
        FileFormat format = this.config.getFormat();
        if (format != null && ((InputFormatProvider) pipelineConfigurer.usePlugin("inputformat", format.name().toLowerCase(), FORMAT_PLUGIN_ID, this.config.getProperties())) == null) {
            throw new IllegalArgumentException(String.format("Could not find the '%s' input format.", format.name().toLowerCase()));
        }
        String pathField = this.config.getPathField();
        if (pathField != null && schema != null) {
            Schema.Field field = schema.getField(pathField);
            if (field == null) {
                throw new IllegalArgumentException(String.format("Path field '%s' is not present in the schema. Please add it to the schema as a string field.", pathField));
            }
            Schema schema2 = field.getSchema();
            Schema.Type type = schema2.isNullable() ? schema2.getNonNullable().getType() : schema2.getType();
            if (type != Schema.Type.STRING) {
                throw new IllegalArgumentException(String.format("Path field '%s' must be of type 'string', but found '%s'.", pathField, type));
            }
        }
        pipelineConfigurer.getStageConfigurer().setOutputSchema(this.config.getSchema());
    }

    public void prepareRun(BatchSourceContext batchSourceContext) throws Exception {
        String inputFormatClassName;
        this.config.validate();
        InputFormatProvider inputFormatProvider = (InputFormatProvider) batchSourceContext.newPluginInstance(FORMAT_PLUGIN_ID);
        Job createInstance = JobUtils.createInstance();
        Configuration configuration = createInstance.getConfiguration();
        Pattern filePattern = this.config.getFilePattern();
        if (filePattern != null) {
            RegexPathFilter.configure(configuration, filePattern);
            FileInputFormat.setInputPathFilter(createInstance, RegexPathFilter.class);
        }
        FileInputFormat.setInputDirRecursive(createInstance, this.config.shouldReadRecursively());
        Schema schema = this.config.getSchema();
        LineageRecorder lineageRecorder = new LineageRecorder(batchSourceContext, this.config.getReferenceName());
        lineageRecorder.createExternalDataset(schema);
        if (schema != null && schema.getFields() != null) {
            recordLineage(lineageRecorder, (List) schema.getFields().stream().map((v0) -> {
                return v0.getName();
            }).collect(Collectors.toList()));
        }
        for (Map.Entry<String, String> entry : getFileSystemProperties(batchSourceContext).entrySet()) {
            configuration.set(entry.getKey(), entry.getValue());
        }
        Path path = new Path(this.config.getPath());
        if (FileSystem.get(path.toUri(), configuration).globStatus(path) != null) {
            FileInputFormat.addInputPath(createInstance, path);
            FileInputFormat.setMaxInputSplitSize(createInstance, this.config.getMaxSplitSize());
            inputFormatClassName = inputFormatProvider.getInputFormatClassName();
            Configuration configuration2 = createInstance.getConfiguration();
            for (Map.Entry entry2 : inputFormatProvider.getInputFormatConfiguration().entrySet()) {
                configuration2.set((String) entry2.getKey(), (String) entry2.getValue());
            }
        } else {
            if (!this.config.shouldAllowEmptyInput()) {
                throw new IOException(String.format("Input path %s does not exist", path));
            }
            inputFormatClassName = EmptyInputFormat.class.getName();
        }
        for (Map.Entry<String, String> entry3 : getFileSystemProperties(batchSourceContext).entrySet()) {
            configuration.set(entry3.getKey(), entry3.getValue());
        }
        batchSourceContext.setInput(Input.of(this.config.getReferenceName(), new SourceInputFormatProvider(inputFormatClassName, configuration)));
    }

    public void transform(KeyValue<NullWritable, StructuredRecord> keyValue, Emitter<StructuredRecord> emitter) throws Exception {
        emitter.emit(keyValue.getValue());
    }

    protected Map<String, String> getFileSystemProperties(BatchSourceContext batchSourceContext) {
        return Collections.emptyMap();
    }

    protected void recordLineage(LineageRecorder lineageRecorder, List<String> list) {
        lineageRecorder.recordRead("Read", String.format("Read from %s files.", this.config.getFormat().name().toLowerCase()), list);
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((KeyValue<NullWritable, StructuredRecord>) obj, (Emitter<StructuredRecord>) emitter);
    }
}
