package co.cask.cdap.template.etl.batch.source;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.format.FormatSpecification;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.data.stream.Stream;
import co.cask.cdap.api.data.stream.StreamBatchReadable;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.api.stream.GenericStreamEventData;
import co.cask.cdap.api.templates.plugins.PluginConfig;
import co.cask.cdap.template.etl.api.Emitter;
import co.cask.cdap.template.etl.api.PipelineConfigurer;
import co.cask.cdap.template.etl.api.batch.BatchSource;
import co.cask.cdap.template.etl.api.batch.BatchSourceContext;
import co.cask.cdap.template.etl.common.ETLUtils;
import co.cask.cdap.template.etl.common.Properties;
import com.google.common.base.Objects;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.hadoop.io.LongWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name("Stream")
@Description("Batch source for a Stream")
@Plugin(type = "source")
/* loaded from: input_file:co/cask/cdap/template/etl/batch/source/StreamBatchSource.class */
public class StreamBatchSource extends BatchSource<LongWritable, Object, StructuredRecord> {
    private static final String FORMAT_SETTING_PREFIX = "format.setting.";
    private static final String NAME_DESCRIPTION = "Name of the stream. Must be a valid stream name.";
    private static final String DURATION_DESCRIPTION = "Size of the time window to read with each run of the pipeline. The format is expected to be a number followed by a 's', 'm', 'h', or 'd' specifying the time unit, with 's' for seconds, 'm' for minutes, 'h' for hours, and 'd' for days. For example, a value of '5m' means each run of the pipeline will read 5 minutes of events from the stream.";
    private static final String DELAY_DESCRIPTION = "Optional delay for reading stream events. The value must be of the same format as the duration value. For example, a duration of '5m' and a delay of '10m' means each run of the pipeline will read events from 15 minutes before its logical start time to 10 minutes before its logical start time. The default value is 0.";
    private static final String FORMAT_DESCRIPTION = "Optional format of the stream. Any format supported by CDAP is also supported. For example, a value of 'csv' will attempt to parse stream events as comma separated values. If no format is given, event bodies will be treated as bytes, resulting in a three field schema: 'ts' of type long, 'headers' of type map of string to string, and 'body' of type bytes.";
    private static final String SCHEMA_DESCRIPTION = "Optional schema for the body of stream events. Schema is used in conjunction with format to parse stream events. Some formats like the avro format require schema, while others do not. The schema given is for the body of the stream, so the final schema of records output by the source will contain an additional field named 'ts' for the timestamp and a field named 'headers' for the headers as as the first and second fields of the schema.";
    private StreamBatchConfig streamBatchConfig;
    private Map<Schema, Schema> schemaCache = Maps.newHashMap();
    private static final Logger LOG = LoggerFactory.getLogger(StreamBatchSource.class);
    private static final Schema DEFAULT_SCHEMA = Schema.recordOf("event", new Schema.Field[]{Schema.Field.of("ts", Schema.of(Schema.Type.LONG)), Schema.Field.of(Properties.Stream.DEFAULT_HEADERS_FIELD, Schema.mapOf(Schema.of(Schema.Type.STRING), Schema.of(Schema.Type.STRING))), Schema.Field.of(Properties.Stream.DEFAULT_BODY_FIELD, Schema.of(Schema.Type.BYTES))});

    /* loaded from: input_file:co/cask/cdap/template/etl/batch/source/StreamBatchSource$StreamBatchConfig.class */
    public static class StreamBatchConfig extends PluginConfig {

        @Description(StreamBatchSource.NAME_DESCRIPTION)
        private String name;

        @Description(StreamBatchSource.DURATION_DESCRIPTION)
        private String duration;

        @Description(StreamBatchSource.DELAY_DESCRIPTION)
        @Nullable
        private String delay;

        @Description(StreamBatchSource.FORMAT_DESCRIPTION)
        @Nullable
        private String format;

        @Description(StreamBatchSource.SCHEMA_DESCRIPTION)
        @Nullable
        private String schema;

        /* JADX INFO: Access modifiers changed from: private */
        public void validate() {
            if (!Strings.isNullOrEmpty(this.schema)) {
                parseSchema();
            }
            ETLUtils.parseDuration(this.duration);
            if (Strings.isNullOrEmpty(this.delay)) {
                return;
            }
            ETLUtils.parseDuration(this.delay);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public FormatSpecification getFormatSpec() {
            FormatSpecification formatSpecification = null;
            if (!Strings.isNullOrEmpty(this.format)) {
                Schema parseSchema = parseSchema();
                ImmutableMap.Builder builder = ImmutableMap.builder();
                for (Map.Entry entry : getProperties().getProperties().entrySet()) {
                    if (((String) entry.getKey()).startsWith(StreamBatchSource.FORMAT_SETTING_PREFIX)) {
                        String str = (String) entry.getKey();
                        builder.put(str.substring(StreamBatchSource.FORMAT_SETTING_PREFIX.length(), str.length()), entry.getValue());
                    }
                }
                formatSpecification = new FormatSpecification(this.format, parseSchema, builder.build());
            }
            return formatSpecification;
        }

        private Schema parseSchema() {
            try {
                if (this.schema == null) {
                    return null;
                }
                return Schema.parseJson(this.schema);
            } catch (IOException e) {
                throw new IllegalArgumentException("Invalid schema: " + e.getMessage());
            }
        }
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        this.streamBatchConfig.validate();
        pipelineConfigurer.addStream(new Stream(this.streamBatchConfig.name));
    }

    public void prepareRun(BatchSourceContext batchSourceContext) {
        long parseDuration = ETLUtils.parseDuration(this.streamBatchConfig.duration);
        long logicalStartTime = batchSourceContext.getLogicalStartTime() - (Strings.isNullOrEmpty(this.streamBatchConfig.delay) ? 0L : ETLUtils.parseDuration(this.streamBatchConfig.delay));
        long j = logicalStartTime - parseDuration;
        LOG.info("Setting input to Stream : {}", this.streamBatchConfig.name);
        FormatSpecification formatSpec = this.streamBatchConfig.getFormatSpec();
        batchSourceContext.setInput(formatSpec == null ? new StreamBatchReadable(this.streamBatchConfig.name, j, logicalStartTime) : new StreamBatchReadable(this.streamBatchConfig.name, j, logicalStartTime, formatSpec));
    }

    public void transform(KeyValue<LongWritable, Object> keyValue, Emitter<StructuredRecord> emitter) throws Exception {
        if (this.streamBatchConfig.format == null) {
            StreamEvent streamEvent = (StreamEvent) keyValue.getValue();
            emitter.emit(StructuredRecord.builder(DEFAULT_SCHEMA).set("ts", Long.valueOf(((LongWritable) keyValue.getKey()).get())).set(Properties.Stream.DEFAULT_HEADERS_FIELD, (Map) Objects.firstNonNull(streamEvent.getHeaders(), ImmutableMap.of())).set(Properties.Stream.DEFAULT_BODY_FIELD, streamEvent.getBody()).build());
            return;
        }
        GenericStreamEventData genericStreamEventData = (GenericStreamEventData) keyValue.getValue();
        StructuredRecord structuredRecord = (StructuredRecord) genericStreamEventData.getBody();
        Schema schema = structuredRecord.getSchema();
        Schema schema2 = this.schemaCache.get(schema);
        if (schema2 == null) {
            ArrayList newArrayList = Lists.newArrayList();
            newArrayList.add(DEFAULT_SCHEMA.getField("ts"));
            newArrayList.add(DEFAULT_SCHEMA.getField(Properties.Stream.DEFAULT_HEADERS_FIELD));
            newArrayList.addAll(schema.getFields());
            schema2 = Schema.recordOf(schema.getRecordName(), newArrayList);
            this.schemaCache.put(schema, schema2);
        }
        Map map = (Map) Objects.firstNonNull(genericStreamEventData.getHeaders(), ImmutableMap.of());
        StructuredRecord.Builder builder = StructuredRecord.builder(schema2);
        builder.set("ts", Long.valueOf(((LongWritable) keyValue.getKey()).get()));
        builder.set(Properties.Stream.DEFAULT_HEADERS_FIELD, map);
        Iterator it = schema.getFields().iterator();
        while (it.hasNext()) {
            String name = ((Schema.Field) it.next()).getName();
            builder.set(name, structuredRecord.get(name));
        }
        emitter.emit(builder.build());
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((KeyValue<LongWritable, Object>) obj, (Emitter<StructuredRecord>) emitter);
    }
}
