package co.cask.cdap.partitioned;

import co.cask.cdap.api.ProgramLifecycle;
import co.cask.cdap.api.ProgramStatus;
import co.cask.cdap.api.Resources;
import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.annotation.UseDataSet;
import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.data.batch.BatchWritable;
import co.cask.cdap.api.data.batch.Output;
import co.cask.cdap.api.dataset.Dataset;
import co.cask.cdap.api.dataset.DatasetSpecification;
import co.cask.cdap.api.dataset.lib.AbstractDataset;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.dataset.lib.PartitionDetail;
import co.cask.cdap.api.dataset.lib.PartitionKey;
import co.cask.cdap.api.dataset.lib.PartitionOutput;
import co.cask.cdap.api.dataset.lib.PartitionedFileSet;
import co.cask.cdap.api.dataset.lib.PartitionedFileSetArguments;
import co.cask.cdap.api.dataset.lib.PartitionedFileSetProperties;
import co.cask.cdap.api.dataset.lib.Partitioning;
import co.cask.cdap.api.dataset.lib.partitioned.KVTableStatePersistor;
import co.cask.cdap.api.dataset.lib.partitioned.PartitionBatchInput;
import co.cask.cdap.api.dataset.lib.partitioned.TransactionalPartitionConsumer;
import co.cask.cdap.api.dataset.module.EmbeddedDataset;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.mapreduce.MapReduceContext;
import co.cask.cdap.api.mapreduce.MapReduceTaskContext;
import co.cask.cdap.api.service.AbstractService;
import co.cask.cdap.api.service.http.AbstractHttpServiceHandler;
import co.cask.cdap.api.service.http.HttpServiceRequest;
import co.cask.cdap.api.service.http.HttpServiceResponder;
import co.cask.cdap.api.worker.AbstractWorker;
import com.google.common.base.Joiner;
import com.google.common.base.Objects;
import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.QueryParam;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.tephra.TransactionFailureException;
import org.apache.twill.filesystem.Location;

/* loaded from: input_file:co/cask/cdap/partitioned/AppWithPartitionConsumers.class */
public class AppWithPartitionConsumers extends AbstractApplication {

    /* loaded from: input_file:co/cask/cdap/partitioned/AppWithPartitionConsumers$DatasetService.class */
    public static class DatasetService extends AbstractService {

        /* loaded from: input_file:co/cask/cdap/partitioned/AppWithPartitionConsumers$DatasetService$DatasetServingHandler.class */
        public static class DatasetServingHandler extends AbstractHttpServiceHandler {

            @UseDataSet("lines")
            private PartitionedFileSet lines;

            @UseDataSet("counts")
            private IncrementingKeyValueTable keyValueTable;

            @Path("lines")
            @PUT
            public void write(HttpServiceRequest httpServiceRequest, HttpServiceResponder httpServiceResponder, @QueryParam("time") Long l) {
                PartitionOutput partitionOutput = this.lines.getPartitionOutput(PartitionKey.builder().addLongField("time", l.longValue()).build());
                Location location = partitionOutput.getLocation();
                try {
                    WritableByteChannel newChannel = Channels.newChannel(location.getOutputStream());
                    Throwable th = null;
                    try {
                        try {
                            newChannel.write(httpServiceRequest.getContent());
                            if (newChannel != null) {
                                if (0 != 0) {
                                    try {
                                        newChannel.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    newChannel.close();
                                }
                            }
                            partitionOutput.addPartition();
                            httpServiceResponder.sendStatus(200);
                        } finally {
                        }
                    } finally {
                    }
                } catch (IOException e) {
                    httpServiceResponder.sendError(400, String.format("Unable to write path '%s'", location));
                }
            }

            @GET
            @Path("counts")
            public void get(HttpServiceRequest httpServiceRequest, HttpServiceResponder httpServiceResponder, @QueryParam("word") String str) {
                Long read = this.keyValueTable.read(str);
                if (read == null) {
                    read = 0L;
                }
                httpServiceResponder.sendJson(read);
            }
        }

        protected void configure() {
            setName("DatasetService");
            addHandler(new DatasetServingHandler());
        }
    }

    /* loaded from: input_file:co/cask/cdap/partitioned/AppWithPartitionConsumers$IncrementingKeyValueTable.class */
    public static class IncrementingKeyValueTable extends AbstractDataset implements BatchWritable<byte[], Long> {
        private final KeyValueTable keyValueTable;

        public IncrementingKeyValueTable(DatasetSpecification datasetSpecification, @EmbeddedDataset("store") KeyValueTable keyValueTable) {
            super(datasetSpecification.getName(), keyValueTable, new Dataset[0]);
            this.keyValueTable = keyValueTable;
        }

        public void write(byte[] bArr, Long l) {
            this.keyValueTable.increment(bArr, l.longValue());
        }

        @Nullable
        public Long read(String str) {
            byte[] read = this.keyValueTable.read(str);
            if (read == null) {
                return null;
            }
            return Long.valueOf(Bytes.toLong(read));
        }
    }

    /* loaded from: input_file:co/cask/cdap/partitioned/AppWithPartitionConsumers$WordCountMapReduce.class */
    public static class WordCountMapReduce extends AbstractMapReduce {
        public static final String NAME = "WordCountMapReduce";
        private PartitionBatchInput.BatchPartitionCommitter batchPartitionCommitter;

        /* loaded from: input_file:co/cask/cdap/partitioned/AppWithPartitionConsumers$WordCountMapReduce$Counter.class */
        public static class Counter extends Reducer<Text, IntWritable, byte[], Long> implements ProgramLifecycle<MapReduceTaskContext<byte[], Long>> {
            private MapReduceTaskContext<byte[], Long> mapReduceTaskContext;

            public void initialize(MapReduceTaskContext<byte[], Long> mapReduceTaskContext) throws Exception {
                this.mapReduceTaskContext = mapReduceTaskContext;
            }

            public void destroy() {
            }

            public void reduce(Text text, Iterable<IntWritable> iterable, Reducer<Text, IntWritable, byte[], Long>.Context context) throws IOException, InterruptedException {
                long j = 0;
                while (iterable.iterator().hasNext()) {
                    j += r0.next().get();
                }
                this.mapReduceTaskContext.write("counts", text.getBytes(), Long.valueOf(j));
                this.mapReduceTaskContext.write("outputLines", (Object) null, Long.valueOf(j));
            }

            public /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
                reduce((Text) obj, (Iterable<IntWritable>) iterable, (Reducer<Text, IntWritable, byte[], Long>.Context) context);
            }
        }

        /* loaded from: input_file:co/cask/cdap/partitioned/AppWithPartitionConsumers$WordCountMapReduce$Tokenizer.class */
        public static class Tokenizer extends Mapper<LongWritable, Text, Text, IntWritable> {
            private Text word = new Text();
            private static final IntWritable ONE = new IntWritable(1);

            public void map(LongWritable longWritable, Text text, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
                for (String str : text.toString().split(" ")) {
                    this.word.set(str);
                    context.write(this.word, ONE);
                }
            }

            public /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
                map((LongWritable) obj, (Text) obj2, (Mapper<LongWritable, Text, Text, IntWritable>.Context) context);
            }
        }

        public void configure() {
            setMapperResources(new Resources(1024));
            setReducerResources(new Resources(1024));
        }

        public void initialize() throws Exception {
            MapReduceContext context = getContext();
            this.batchPartitionCommitter = PartitionBatchInput.setInput(context, "lines", new KVTableStatePersistor("consumingState", "state.key"));
            HashMap hashMap = new HashMap();
            PartitionedFileSetArguments.setOutputPartitionKey(hashMap, PartitionKey.builder().addLongField("time", context.getLogicalStartTime()).build());
            context.addOutput(Output.ofDataset("outputLines", hashMap));
            context.addOutput(Output.ofDataset("counts"));
            Job job = (Job) context.getHadoopJob();
            job.setMapperClass(Tokenizer.class);
            job.setReducerClass(Counter.class);
            job.setNumReduceTasks(1);
        }

        public void destroy() {
            this.batchPartitionCommitter.onFinish(getContext().getState().getStatus() == ProgramStatus.COMPLETED);
        }
    }

    /* loaded from: input_file:co/cask/cdap/partitioned/AppWithPartitionConsumers$WordCountWorker.class */
    public static class WordCountWorker extends AbstractWorker {
        public static final String NAME = "WordCountWorker";

        public void run() {
            TransactionalPartitionConsumer transactionalPartitionConsumer = new TransactionalPartitionConsumer(getContext(), "lines", new KVTableStatePersistor("consumingState", "state.key"));
            final List partitions = transactionalPartitionConsumer.consumePartitions().getPartitions();
            if (partitions.isEmpty()) {
                return;
            }
            try {
                getContext().execute(new TxRunnable() { // from class: co.cask.cdap.partitioned.AppWithPartitionConsumers.WordCountWorker.1
                    public void run(DatasetContext datasetContext) throws Exception {
                        HashMap hashMap = new HashMap();
                        Iterator it = partitions.iterator();
                        while (it.hasNext()) {
                            for (String str : Bytes.toString(Bytes.toBytes(ByteBuffer.wrap(ByteStreams.toByteArray(((PartitionDetail) it.next()).getLocation().getInputStream())))).split(" ")) {
                                hashMap.put(str, Long.valueOf(((Long) Objects.firstNonNull(hashMap.get(str), 0L)).longValue() + 1));
                            }
                        }
                        IncrementingKeyValueTable dataset = datasetContext.getDataset("counts");
                        for (Map.Entry entry : hashMap.entrySet()) {
                            dataset.write(Bytes.toBytes((String) entry.getKey()), (Long) entry.getValue());
                        }
                        PartitionOutput partitionOutput = datasetContext.getDataset("outputLines").getPartitionOutput(PartitionKey.builder().addLongField("time", System.currentTimeMillis()).build());
                        Location location = partitionOutput.getLocation();
                        location.mkdirs();
                        Location append = location.append("file");
                        append.createNew();
                        OutputStream outputStream = append.getOutputStream();
                        Throwable th = null;
                        try {
                            outputStream.write(Bytes.toBytes(Joiner.on("\n").join(hashMap.values())));
                            if (outputStream != null) {
                                if (0 != 0) {
                                    try {
                                        outputStream.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    outputStream.close();
                                }
                            }
                            partitionOutput.addPartition();
                        } catch (Throwable th3) {
                            if (outputStream != null) {
                                if (0 != 0) {
                                    try {
                                        outputStream.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    outputStream.close();
                                }
                            }
                            throw th3;
                        }
                    }
                });
                transactionalPartitionConsumer.onFinish(partitions, true);
            } catch (TransactionFailureException e) {
                throw Throwables.propagate(e);
            }
        }
    }

    public void configure() {
        setName("AppWithPartitionConsumers");
        setDescription("Application with MapReduce job and Worker consuming partitions of a PartitionedFileSet Dataset");
        createDataset("consumingState", KeyValueTable.class);
        createDataset("counts", IncrementingKeyValueTable.class);
        addMapReduce(new WordCountMapReduce());
        addWorker(new WordCountWorker());
        addService(new DatasetService());
        createDataset("lines", PartitionedFileSet.class, PartitionedFileSetProperties.builder().setPartitioning(Partitioning.builder().addLongField("time").build()).setInputFormat(TextInputFormat.class).setOutputFormat(TextOutputFormat.class).setOutputProperty(TextOutputFormat.SEPERATOR, ",").build());
        createDataset("outputLines", PartitionedFileSet.class, PartitionedFileSetProperties.builder().setPartitioning(Partitioning.builder().addLongField("time").build()).setInputFormat(TextInputFormat.class).setOutputFormat(TextOutputFormat.class).setOutputProperty(TextOutputFormat.SEPERATOR, ",").setEnableExploreOnCreate(true).setExploreFormat("text").setExploreFormatProperty("delimiter", "\n").setExploreSchema("record STRING").build());
    }
}
