package co.cask.hydrator.plugin.spark;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Macro;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.format.FormatSpecification;
import co.cask.cdap.api.data.format.Formats;
import co.cask.cdap.api.data.format.RecordFormat;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.etl.api.PipelineConfigurer;
import co.cask.cdap.etl.api.streaming.StreamingContext;
import co.cask.cdap.format.RecordFormats;
import co.cask.hydrator.common.ReferencePluginConfig;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.Files;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

@Name("File")
@Description("File streaming source. Streams data from files that are atomically moved into a specified directory.")
@Plugin(type = "streamingsource")
/* loaded from: input_file:co/cask/hydrator/plugin/spark/FileStreamingSource.class */
public class FileStreamingSource extends ReferenceStreamingSource<StructuredRecord> {
    private final Conf conf;

    /* loaded from: input_file:co/cask/hydrator/plugin/spark/FileStreamingSource$Conf.class */
    public static class Conf extends ReferencePluginConfig {
        private static final Set<String> FORMATS = ImmutableSet.of(Formats.TEXT, Formats.CSV, Formats.TSV, Formats.COMBINED_LOG_FORMAT, Formats.GROK, Formats.SYSLOG, new String[0]);

        @Description("The format of the source files. Must be text, csv, tsv, clf, grok, or syslog. Defaults to text.")
        @Macro
        @Nullable
        private String format;

        @Description("The schema of the source files.")
        private String schema;

        @Description("The path to the directory containing source files to stream.")
        @Macro
        private String path;

        @Description("Ignore files after they are older than this many seconds. Defaults to 60.")
        @Macro
        @Nullable
        private Integer ignoreThreshold;

        @Description("Comma separated list of file extensions to accept. If not specified, all files in the directory will be read. Otherwise, only files with an extension in this list will be read.")
        @Macro
        @Nullable
        private String extensions;

        public Conf() {
            super(null);
            this.path = "";
            this.format = Formats.TEXT;
            this.schema = null;
            this.ignoreThreshold = 60;
            this.extensions = null;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void validate() {
            if (!containsMacro(this.format) && !FORMATS.contains(this.format)) {
                throw new IllegalArgumentException(String.format("Invalid format '%s'. Must be one of %s", this.format, Joiner.on(',').join((Iterable<?>) FORMATS)));
            }
            getSchema();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Schema getSchema() {
            try {
                return Schema.parseJson(this.schema);
            } catch (IOException e) {
                throw new IllegalArgumentException("Unable to parse schema. Reason: " + e.getMessage());
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Set<String> getExtensions() {
            HashSet hashSet = new HashSet();
            if (this.extensions == null) {
                return hashSet;
            }
            Iterator<String> it = Splitter.on(',').trimResults().split(this.extensions).iterator();
            while (it.hasNext()) {
                hashSet.add(it.next());
            }
            return hashSet;
        }
    }

    /* loaded from: input_file:co/cask/hydrator/plugin/spark/FileStreamingSource$ExtensionFilter.class */
    private static class ExtensionFilter implements Function<Path, Boolean> {
        private final Set<String> extensions;

        ExtensionFilter(Set<String> set) {
            this.extensions = set;
        }

        public Boolean call(Path path) throws Exception {
            return Boolean.valueOf(this.extensions.contains(Files.getFileExtension(path.getName())));
        }
    }

    /* loaded from: input_file:co/cask/hydrator/plugin/spark/FileStreamingSource$FormatFunction.class */
    private static class FormatFunction implements Function<Tuple2<LongWritable, Text>, StructuredRecord> {
        private final String format;
        private final String schemaStr;
        private transient Schema schema;
        private transient RecordFormat<StreamEvent, StructuredRecord> recordFormat;

        FormatFunction(String str, String str2) {
            this.format = str;
            this.schemaStr = str2;
        }

        public StructuredRecord call(Tuple2<LongWritable, Text> tuple2) throws Exception {
            if (this.recordFormat == null) {
                this.schema = Schema.parseJson(this.schemaStr);
                this.recordFormat = RecordFormats.createInitializedFormat(new FormatSpecification(this.format, this.schema, new HashMap()));
            }
            StructuredRecord.Builder builder = StructuredRecord.builder(this.schema);
            StructuredRecord read = this.recordFormat.read(new StreamEvent(ByteBuffer.wrap(((Text) tuple2._2()).copyBytes())));
            Iterator<Schema.Field> it = read.getSchema().getFields().iterator();
            while (it.hasNext()) {
                String name = it.next().getName();
                builder.set(name, read.get(name));
            }
            return builder.build();
        }
    }

    /* loaded from: input_file:co/cask/hydrator/plugin/spark/FileStreamingSource$NoFilter.class */
    private static class NoFilter implements Function<Path, Boolean> {
        private NoFilter() {
        }

        public Boolean call(Path path) throws Exception {
            return true;
        }
    }

    public FileStreamingSource(Conf conf) {
        super(conf);
        this.conf = conf;
    }

    @Override // co.cask.hydrator.plugin.spark.ReferenceStreamingSource
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
        super.configurePipeline(pipelineConfigurer);
        this.conf.validate();
        pipelineConfigurer.getStageConfigurer().setOutputSchema(this.conf.getSchema());
    }

    public JavaDStream<StructuredRecord> getStream(StreamingContext streamingContext) throws Exception {
        this.conf.validate();
        streamingContext.registerLineage(this.conf.referenceName);
        JavaStreamingContext sparkStreamingContext = streamingContext.getSparkStreamingContext();
        Function noFilter = this.conf.extensions == null ? new NoFilter() : new ExtensionFilter(this.conf.getExtensions());
        sparkStreamingContext.ssc().conf().set("spark.streaming.fileStream.minRememberDuration", this.conf.ignoreThreshold + "s");
        return sparkStreamingContext.fileStream(this.conf.path, LongWritable.class, Text.class, TextInputFormat.class, noFilter, false).map(new FormatFunction(this.conf.format, this.conf.schema));
    }
}
