package co.cask.cdap.template.etl.batch.sink;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.dataset.table.Put;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.api.templates.plugins.PluginConfig;
import co.cask.cdap.template.etl.api.Emitter;
import co.cask.cdap.template.etl.api.PipelineConfigurer;
import co.cask.cdap.template.etl.api.batch.BatchSinkContext;
import co.cask.cdap.template.etl.common.Properties;
import co.cask.cdap.template.etl.common.RecordPutTransformer;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;

@Name("Table")
@Description("CDAP Table Dataset Batch Sink")
@Plugin(type = "sink")
/* loaded from: input_file:co/cask/cdap/template/etl/batch/sink/TableSink.class */
public class TableSink extends BatchWritableSink<StructuredRecord, byte[], Put> {
    private static final String NAME_DESC = "Name of the table. If the table does not already exist, one will be created.";
    private static final String PROPERTY_SCHEMA_DESC = "Optional schema of the table as a JSON Object. If the table does not already exist, one will be created with this schema, which will allow the table to be explored through Hive.\"";
    private static final String PROPERTY_SCHEMA_ROW_FIELD_DESC = "The name of the record field that should be used as the row key when writing to the table.";
    private RecordPutTransformer recordPutTransformer;
    private final TableConfig tableConfig;

    /* loaded from: input_file:co/cask/cdap/template/etl/batch/sink/TableSink$TableConfig.class */
    public static class TableConfig extends PluginConfig {

        @Description(TableSink.NAME_DESC)
        private String name;

        @Name("schema")
        @Description(TableSink.PROPERTY_SCHEMA_DESC)
        @Nullable
        String schemaStr;

        @Name(Properties.Table.PROPERTY_SCHEMA_ROW_FIELD)
        @Description(TableSink.PROPERTY_SCHEMA_ROW_FIELD_DESC)
        String rowField;

        public TableConfig(String str, String str2, String str3) {
            this.name = str;
            this.schemaStr = str2;
            this.rowField = str3;
        }
    }

    public TableSink(TableConfig tableConfig) {
        this.tableConfig = tableConfig;
    }

    @Override // co.cask.cdap.template.etl.batch.sink.BatchWritableSink
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        Preconditions.checkArgument(!Strings.isNullOrEmpty(this.tableConfig.rowField), "Row field must be given as a property.");
    }

    public void initialize(BatchSinkContext batchSinkContext) throws Exception {
        super.initialize(batchSinkContext);
        this.recordPutTransformer = new RecordPutTransformer(this.tableConfig.rowField);
    }

    @Override // co.cask.cdap.template.etl.batch.sink.BatchWritableSink
    protected Map<String, String> getProperties() {
        HashMap newHashMap = Maps.newHashMap(this.tableConfig.getProperties().getProperties());
        newHashMap.put("name", this.tableConfig.name);
        newHashMap.put("type", Table.class.getName());
        return newHashMap;
    }

    public void transform(StructuredRecord structuredRecord, Emitter<KeyValue<byte[], Put>> emitter) throws Exception {
        Put put = this.recordPutTransformer.toPut(structuredRecord);
        emitter.emit(new KeyValue(put.getRow(), put));
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((StructuredRecord) obj, (Emitter<KeyValue<byte[], Put>>) emitter);
    }
}
