package co.cask.cdap.template.etl.batch.source;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.dataset.lib.FileSetProperties;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.dataset.lib.TimePartitionedFileSet;
import co.cask.cdap.api.dataset.lib.TimePartitionedFileSetArguments;
import co.cask.cdap.api.templates.plugins.PluginConfig;
import co.cask.cdap.template.etl.api.Emitter;
import co.cask.cdap.template.etl.api.PipelineConfigurer;
import co.cask.cdap.template.etl.api.batch.BatchSource;
import co.cask.cdap.template.etl.api.batch.BatchSourceContext;
import co.cask.cdap.template.etl.common.AvroToStructuredTransformer;
import co.cask.cdap.template.etl.common.ETLUtils;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import java.util.HashMap;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;

@Name("TPFSAvro")
@Description("Reads from a TimePartitionedFileSet whose data is in Avro format.")
@Plugin(type = "source")
/* loaded from: input_file:co/cask/cdap/template/etl/batch/source/TimePartitionedFileSetDatasetAvroSource.class */
public class TimePartitionedFileSetDatasetAvroSource extends BatchSource<AvroKey<GenericRecord>, NullWritable, StructuredRecord> {
    private static final String SCHEMA_DESC = "The Avro schema of the record being read from the source as a JSON Object.";
    private static final String TPFS_NAME_DESC = "Name of the TimePartitionedFileSet to read.";
    private static final String BASE_PATH_DESC = "Base path for the TimePartitionedFileSet. Defaults to the name of the dataset.";
    private static final String DURATION_DESC = "Size of the time window to read with each run of the pipeline. The format is expected to be a number followed by an 's', 'm', 'h', or 'd' specifying the time unit, with 's' for seconds, 'm' for minutes, 'h' for hours, and 'd' for days. For example, a value of '5m' means each run of the pipeline will read 5 minutes of events from the TPFS source.";
    private static final String DELAY_DESC = "Optional delay for reading from TPFS source. The value must be of the same format as the duration value. For example, a duration of '5m' and a delay of '10m' means each run of the pipeline will read 5 minutes of data from 15 minutes before its logical start time to 10 minutes before its logical start time. The default value is 0.";
    private final AvroToStructuredTransformer recordTransformer = new AvroToStructuredTransformer();
    private final TPFSAvroSourceConfig tpfsAvroSourceConfig;

    /* loaded from: input_file:co/cask/cdap/template/etl/batch/source/TimePartitionedFileSetDatasetAvroSource$TPFSAvroSourceConfig.class */
    public static class TPFSAvroSourceConfig extends PluginConfig {

        @Description(TimePartitionedFileSetDatasetAvroSource.TPFS_NAME_DESC)
        private String name;

        @Description(TimePartitionedFileSetDatasetAvroSource.SCHEMA_DESC)
        private String schema;

        @Description(TimePartitionedFileSetDatasetAvroSource.BASE_PATH_DESC)
        @Nullable
        private String basePath;

        @Description(TimePartitionedFileSetDatasetAvroSource.DURATION_DESC)
        private String duration;

        @Description(TimePartitionedFileSetDatasetAvroSource.DELAY_DESC)
        @Nullable
        private String delay;

        /* JADX INFO: Access modifiers changed from: private */
        public void validate() {
            Preconditions.checkArgument(ETLUtils.parseDuration(this.duration) > 0, "Duration must be greater than 0");
            if (Strings.isNullOrEmpty(this.delay)) {
                return;
            }
            ETLUtils.parseDuration(this.delay);
        }
    }

    public TimePartitionedFileSetDatasetAvroSource(TPFSAvroSourceConfig tPFSAvroSourceConfig) {
        this.tpfsAvroSourceConfig = tPFSAvroSourceConfig;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        String str = this.tpfsAvroSourceConfig.name;
        String str2 = this.tpfsAvroSourceConfig.basePath == null ? str : this.tpfsAvroSourceConfig.basePath;
        this.tpfsAvroSourceConfig.validate();
        pipelineConfigurer.createDataset(str, TimePartitionedFileSet.class.getName(), FileSetProperties.builder().setBasePath(str2).setInputFormat(AvroKeyInputFormat.class).setOutputFormat(AvroKeyOutputFormat.class).setEnableExploreOnCreate(true).setSerDe("org.apache.hadoop.hive.serde2.avro.AvroSerDe").setExploreInputFormat("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat").setExploreOutputFormat("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat").setTableProperty("avro.schema.literal", this.tpfsAvroSourceConfig.schema).build());
    }

    public void prepareRun(BatchSourceContext batchSourceContext) {
        long parseDuration = ETLUtils.parseDuration(this.tpfsAvroSourceConfig.duration);
        long logicalStartTime = batchSourceContext.getLogicalStartTime() - (Strings.isNullOrEmpty(this.tpfsAvroSourceConfig.delay) ? 0L : ETLUtils.parseDuration(this.tpfsAvroSourceConfig.delay));
        long j = logicalStartTime - parseDuration;
        HashMap newHashMap = Maps.newHashMap();
        TimePartitionedFileSetArguments.setInputStartTime(newHashMap, j);
        TimePartitionedFileSetArguments.setInputEndTime(newHashMap, logicalStartTime);
        batchSourceContext.setInput(this.tpfsAvroSourceConfig.name, batchSourceContext.getDataset(this.tpfsAvroSourceConfig.name, newHashMap));
        AvroJob.setInputKeySchema((Job) batchSourceContext.getHadoopJob(), new Schema.Parser().parse(this.tpfsAvroSourceConfig.schema));
    }

    public void transform(KeyValue<AvroKey<GenericRecord>, NullWritable> keyValue, Emitter<StructuredRecord> emitter) throws Exception {
        emitter.emit(this.recordTransformer.transform((GenericRecord) ((AvroKey) keyValue.getKey()).datum()));
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((KeyValue<AvroKey<GenericRecord>, NullWritable>) obj, (Emitter<StructuredRecord>) emitter);
    }
}
