/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.schema;

import java.util.Collections;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.service.BkEnsemblesTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class PartitionedTopicsSchemaTest
extends BkEnsemblesTestBase {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void partitionedTopicWithSchema() throws Exception {
        this.admin.namespaces().createNamespace("prop/my-test", Collections.singleton("usc"));
        String topicName = "prop/my-test/my-topic";
        this.admin.topics().createPartitionedTopic(topicName, 16);
        int N = 10;
        PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).build();
        try {
            int i;
            CompletableFuture producerFuture = client.newProducer(Schema.STRING).topic(topicName).createAsync();
            CompletableFuture consumerFuture = client.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName("sub").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribeAsync();
            CompletableFuture.allOf(producerFuture, consumerFuture).get();
            Producer producer = (Producer)producerFuture.get();
            Consumer consumer = (Consumer)consumerFuture.get();
            for (int i2 = 0; i2 < N; ++i2) {
                producer.send((Object)("Hello-" + i2));
            }
            consumer.close();
            producer.close();
            this.admin.namespaces().unload("prop/my-test");
            producerFuture = client.newProducer(Schema.STRING).topic(topicName).createAsync();
            consumerFuture = client.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName("sub").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribeAsync();
            CompletableFuture.allOf(producerFuture, consumerFuture).get();
            consumer = (Consumer)consumerFuture.get();
            TreeSet<Object> messages = new TreeSet<Object>();
            for (i = 0; i < N; ++i) {
                Message msg = consumer.receive();
                messages.add(msg.getValue());
                consumer.acknowledge(msg);
            }
            Assert.assertEquals((int)messages.size(), (int)N);
            for (i = 0; i < N; ++i) {
                Assert.assertTrue((boolean)messages.contains("Hello-" + i));
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }
}

