package co.cask.cdap.internal.app.runtime.batch;

import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.dataset.lib.PartitionDetail;
import co.cask.cdap.api.dataset.lib.PartitionFilter;
import co.cask.cdap.api.dataset.lib.PartitionKey;
import co.cask.cdap.api.dataset.lib.PartitionedFileSet;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.internal.app.deploy.pipeline.ApplicationWithPrograms;
import co.cask.cdap.internal.app.runtime.BasicArguments;
import co.cask.cdap.internal.app.runtime.batch.AppWithMapReduceUsingAvroDynamicPartitioner;
import co.cask.cdap.proto.Notification;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.test.XSlowTests;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.tephra.TransactionAware;
import org.apache.tephra.TransactionExecutor;
import org.apache.twill.filesystem.Location;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({XSlowTests.class})
/* loaded from: input_file:co/cask/cdap/internal/app/runtime/batch/DynamicPartitionerWithAvroTest.class */
public class DynamicPartitionerWithAvroTest extends MapReduceRunnerTestBase {
    @Test
    public void testDynamicPartitionerWithAvroMultiWriter() throws Exception {
        runDynamicPartitionerMapReduce(ImmutableList.of(createRecord("bob", 95111), createRecord("sally", 98123), createRecord("jane", 84125), createRecord("john", 84125)), true, true);
    }

    @Test
    public void testDynamicPartitionerWithAvroSingleWriter() throws Exception {
        runDynamicPartitionerMapReduce(ImmutableList.of(createRecord("bob", 95111), createRecord("sally", 98123), createRecord("jane", 84125), createRecord("john", 84125)), false, true);
    }

    @Test
    public void testDynamicPartitionerWithAvroSingleWriterWithUnorderedData() throws Exception {
        runDynamicPartitionerMapReduce(ImmutableList.of(createRecord("bob", 95111), createRecord("jane", 84125), createRecord("sally", 98123), createRecord("john", 84125)), false, false);
    }

    private void runDynamicPartitionerMapReduce(final List<? extends GenericRecord> list, boolean z, boolean z2) throws Exception {
        ApplicationWithPrograms deployApp = deployApp(AppWithMapReduceUsingAvroDynamicPartitioner.class);
        final long currentTimeMillis = System.currentTimeMillis();
        final Multimap<PartitionKey, GenericRecord> groupByPartitionKey = groupByPartitionKey(list, currentTimeMillis);
        final KeyValueTable dataset = datasetCache.getDataset(AppWithMapReduceUsingAvroDynamicPartitioner.INPUT_DATASET);
        Transactions.createTransactionExecutor(txExecutorFactory, dataset).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.batch.DynamicPartitionerWithAvroTest.1
            public void apply() {
                for (int i = 0; i < list.size(); i++) {
                    dataset.write(Integer.toString(i), ((GenericRecord) list.get(i)).toString());
                }
            }
        });
        ImmutableMap of = ImmutableMap.of("output.partition.key", Long.toString(currentTimeMillis), "dataset.OUTPUT_DATASET_NAME.output.dynamic.partitioner.allow.concurrency", Boolean.toString(z));
        long currentTimeMillis2 = System.currentTimeMillis();
        Assert.assertEquals(Boolean.valueOf(z2), Boolean.valueOf(runProgram(deployApp, AppWithMapReduceUsingAvroDynamicPartitioner.DynamicPartitioningMapReduce.class, new BasicArguments(of))));
        if (z2) {
            List<Notification> dataNotifications = getDataNotifications(currentTimeMillis2);
            Assert.assertEquals(1L, dataNotifications.size());
            Assert.assertEquals(NamespaceId.DEFAULT.dataset(AppWithMapReduceUsingAvroDynamicPartitioner.OUTPUT_DATASET), DatasetId.fromString((String) dataNotifications.get(0).getProperties().get("datasetId")));
            final TransactionAware transactionAware = (PartitionedFileSet) datasetCache.getDataset(AppWithMapReduceUsingAvroDynamicPartitioner.OUTPUT_DATASET);
            final Location baseLocation = transactionAware.getEmbeddedFileSet().getBaseLocation();
            Transactions.createTransactionExecutor(txExecutorFactory, transactionAware).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.batch.DynamicPartitionerWithAvroTest.2
                public void apply() throws IOException {
                    HashMap hashMap = new HashMap();
                    for (PartitionDetail partitionDetail : transactionAware.getPartitions((PartitionFilter) null)) {
                        hashMap.put(partitionDetail.getPartitionKey(), partitionDetail);
                        Assert.assertEquals(AppWithMapReduceUsingAvroDynamicPartitioner.DynamicPartitioningMapReduce.METADATA, partitionDetail.getMetadata().asMap());
                    }
                    Assert.assertEquals(3L, hashMap.size());
                    Assert.assertEquals(groupByPartitionKey.keySet(), hashMap.keySet());
                    for (Map.Entry entry : hashMap.entrySet()) {
                        PartitionDetail partitionDetail2 = (PartitionDetail) entry.getValue();
                        String relativePath = partitionDetail2.getRelativePath();
                        Assert.assertEquals(Long.toString(currentTimeMillis) + "/" + ((Integer) ((PartitionKey) entry.getKey()).getField("zip")).intValue(), relativePath);
                        Assert.assertEquals(baseLocation.append(relativePath), partitionDetail2.getLocation());
                    }
                    for (Map.Entry entry2 : groupByPartitionKey.asMap().entrySet()) {
                        Assert.assertEquals(new HashSet((Collection) entry2.getValue()), DynamicPartitionerWithAvroTest.this.readOutput(((PartitionDetail) hashMap.get(entry2.getKey())).getLocation()));
                    }
                }
            });
        }
    }

    private Multimap<PartitionKey, GenericRecord> groupByPartitionKey(List<? extends GenericRecord> list, long j) {
        HashMultimap create = HashMultimap.create();
        for (GenericRecord genericRecord : list) {
            create.put(PartitionKey.builder().addLongField("time", j).addIntField("zip", ((Integer) genericRecord.get("zip")).intValue()).build(), genericRecord);
        }
        return create;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Set<GenericRecord> readOutput(Location location) throws IOException {
        GenericDatumReader genericDatumReader = new GenericDatumReader(AppWithMapReduceUsingAvroDynamicPartitioner.SCHEMA);
        HashSet hashSet = new HashSet();
        for (Location location2 : location.list()) {
            if (location2.getName().endsWith(".avro")) {
                DataFileStream dataFileStream = new DataFileStream(location2.getInputStream(), genericDatumReader);
                Iterables.addAll(hashSet, dataFileStream);
                dataFileStream.close();
            }
        }
        return hashSet;
    }

    private GenericData.Record createRecord(String str, int i) {
        GenericData.Record record = new GenericData.Record(AppWithMapReduceUsingAvroDynamicPartitioner.SCHEMA);
        record.put("name", str);
        record.put("zip", Integer.valueOf(i));
        return record;
    }
}
