/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.schema.compatibility;

import com.google.common.collect.Sets;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.schema.Schemas;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"schema"})
public class SchemaCompatibilityCheckTest
extends MockedPulsarServiceBaseTest {
    private static final Logger log = LoggerFactory.getLogger(SchemaCompatibilityCheckTest.class);
    private static final String CLUSTER_NAME = "test";

    @Override
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        TenantInfo tenantInfo = TenantInfo.builder().allowedClusters(Collections.singleton(CLUSTER_NAME)).build();
        this.admin.tenants().createTenant("public", tenantInfo);
    }

    @Override
    @AfterMethod(alwaysRun=true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    @DataProvider(name="CanReadLastSchemaCompatibilityStrategy")
    public Object[] canReadLastSchemaCompatibilityStrategy() {
        return new Object[]{SchemaCompatibilityStrategy.BACKWARD, SchemaCompatibilityStrategy.FORWARD_TRANSITIVE, SchemaCompatibilityStrategy.FORWARD, SchemaCompatibilityStrategy.FULL};
    }

    @DataProvider(name="ReadAllCheckSchemaCompatibilityStrategy")
    public Object[] readAllCheckSchemaCompatibilityStrategy() {
        return new Object[]{SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE, SchemaCompatibilityStrategy.FULL_TRANSITIVE};
    }

    @DataProvider(name="AllCheckSchemaCompatibilityStrategy")
    public Object[] allCheckSchemaCompatibilityStrategy() {
        return new Object[]{SchemaCompatibilityStrategy.BACKWARD, SchemaCompatibilityStrategy.FORWARD_TRANSITIVE, SchemaCompatibilityStrategy.FORWARD, SchemaCompatibilityStrategy.FULL, SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE, SchemaCompatibilityStrategy.FULL_TRANSITIVE};
    }

    @Test(dataProvider="CanReadLastSchemaCompatibilityStrategy")
    public void testConsumerCompatibilityCheckCanReadLastTest(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
        String tenant = "public";
        String topic = "test-consumer-compatibility";
        String namespace = "test-namespace-" + SchemaCompatibilityCheckTest.randomName(16);
        String fqtn = TopicName.get((String)TopicDomain.persistent.value(), (String)"public", (String)namespace, (String)"test-consumer-compatibility").toString();
        NamespaceName namespaceName = NamespaceName.get((String)"public", (String)namespace);
        this.admin.namespaces().createNamespace("public/" + namespace, (Set)Sets.newHashSet((Object[])new String[]{CLUSTER_NAME}));
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
        this.admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
        this.admin.schemas().createSchema(fqtn, Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(false).withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo());
        Consumer consumerThree = this.pulsarClient.newConsumer(Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonThree.class).build())).subscriptionName(CLUSTER_NAME).topic(new String[]{fqtn}).subscribe();
        Producer producerOne = this.pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class)).topic(fqtn).create();
        Schemas.PersonOne personOne = new Schemas.PersonOne();
        personOne.setId(1);
        producerOne.send((Object)personOne);
        Message message = null;
        try {
            message = consumerThree.receive();
            message.getValue();
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)(e instanceof SchemaSerializationException));
            consumerThree.acknowledge(message);
        }
        Producer producerTwo = this.pulsarClient.newProducer(Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build())).topic(fqtn).create();
        Schemas.PersonTwo personTwo = new Schemas.PersonTwo();
        personTwo.setId(1);
        personTwo.setName("Jerry");
        producerTwo.send((Object)personTwo);
        message = consumerThree.receive();
        Schemas.PersonThree personThree = (Schemas.PersonThree)message.getValue();
        consumerThree.acknowledge(message);
        Assert.assertEquals((int)personThree.getId(), (int)1);
        Assert.assertEquals((String)personThree.getName(), (String)"Jerry");
        consumerThree.close();
        producerOne.close();
        producerTwo.close();
    }

    @Test(dataProvider="ReadAllCheckSchemaCompatibilityStrategy")
    public void testConsumerCompatibilityReadAllCheckTest(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
        String tenant = "public";
        String topic = "test-consumer-compatibility";
        String namespace = "test-namespace-" + SchemaCompatibilityCheckTest.randomName(16);
        String fqtn = TopicName.get((String)TopicDomain.persistent.value(), (String)"public", (String)namespace, (String)"test-consumer-compatibility").toString();
        NamespaceName namespaceName = NamespaceName.get((String)"public", (String)namespace);
        this.admin.namespaces().createNamespace("public/" + namespace, (Set)Sets.newHashSet((Object[])new String[]{CLUSTER_NAME}));
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
        this.admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
        this.admin.schemas().createSchema(fqtn, Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(false).withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo());
        try {
            this.pulsarClient.newConsumer(Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonThree.class).build())).subscriptionName(CLUSTER_NAME).topic(new String[]{fqtn}).subscribe();
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Unable to read schema"));
        }
    }

    @Test(dataProvider="AllCheckSchemaCompatibilityStrategy")
    public void testBrokerAllowAutoUpdateSchemaDisabled(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
        String tenant = "public";
        String topic = "test-consumer-compatibility";
        String namespace = "test-namespace-" + SchemaCompatibilityCheckTest.randomName(16);
        String fqtn = TopicName.get((String)TopicDomain.persistent.value(), (String)"public", (String)namespace, (String)"test-consumer-compatibility").toString();
        NamespaceName namespaceName = NamespaceName.get((String)"public", (String)namespace);
        this.admin.namespaces().createNamespace("public/" + namespace, (Set)Sets.newHashSet((Object[])new String[]{CLUSTER_NAME}));
        Assert.assertEquals((Object)this.admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()), (Object)SchemaCompatibilityStrategy.UNDEFINED);
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
        this.admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
        this.pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false);
        ProducerBuilder producerThreeBuilder = this.pulsarClient.newProducer(Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build())).topic(fqtn);
        try {
            producerThreeBuilder.create();
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Schema not found and schema auto updating is disabled."));
        }
        this.pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(true);
        Policies policies = this.admin.namespaces().getPolicies(namespaceName.toString());
        Assert.assertTrue((boolean)policies.is_allow_auto_update_schema);
        ConsumerBuilder comsumerBuilder = this.pulsarClient.newConsumer(Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build())).subscriptionName(CLUSTER_NAME).topic(new String[]{fqtn});
        Producer producer = producerThreeBuilder.create();
        Consumer consumerTwo = comsumerBuilder.subscribe();
        producer.send((Object)new Schemas.PersonTwo(2, "Lucy"));
        Message message = consumerTwo.receive();
        Schemas.PersonTwo personTwo = (Schemas.PersonTwo)message.getValue();
        consumerTwo.acknowledge(message);
        Assert.assertEquals((int)personTwo.getId(), (int)2);
        Assert.assertEquals((String)personTwo.getName(), (String)"Lucy");
        producer.close();
        consumerTwo.close();
        this.pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false);
        producer = producerThreeBuilder.create();
        consumerTwo = comsumerBuilder.subscribe();
        producer.send((Object)new Schemas.PersonTwo(2, "Lucy"));
        message = consumerTwo.receive();
        personTwo = (Schemas.PersonTwo)message.getValue();
        consumerTwo.acknowledge(message);
        Assert.assertEquals((int)personTwo.getId(), (int)2);
        Assert.assertEquals((String)personTwo.getName(), (String)"Lucy");
        consumerTwo.close();
        producer.close();
    }

    @Test(dataProvider="AllCheckSchemaCompatibilityStrategy")
    public void testIsAutoUpdateSchema(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
        String tenant = "public";
        String topic = "test-consumer-compatibility";
        String namespace = "test-namespace-" + SchemaCompatibilityCheckTest.randomName(16);
        String fqtn = TopicName.get((String)TopicDomain.persistent.value(), (String)"public", (String)namespace, (String)"test-consumer-compatibility").toString();
        NamespaceName namespaceName = NamespaceName.get((String)"public", (String)namespace);
        this.admin.namespaces().createNamespace("public/" + namespace, (Set)Sets.newHashSet((Object[])new String[]{CLUSTER_NAME}));
        Assert.assertEquals((Object)this.admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()), (Object)SchemaCompatibilityStrategy.UNDEFINED);
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
        this.admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
        this.admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
        ProducerBuilder producerThreeBuilder = this.pulsarClient.newProducer(Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build())).topic(fqtn);
        try {
            producerThreeBuilder.create();
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Schema not found and schema auto updating is disabled."));
        }
        this.admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), true);
        ConsumerBuilder comsumerBuilder = this.pulsarClient.newConsumer(Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build())).subscriptionName(CLUSTER_NAME).topic(new String[]{fqtn});
        Producer producer = producerThreeBuilder.create();
        Consumer consumerTwo = comsumerBuilder.subscribe();
        producer.send((Object)new Schemas.PersonTwo(2, "Lucy"));
        Message message = consumerTwo.receive();
        Schemas.PersonTwo personTwo = (Schemas.PersonTwo)message.getValue();
        consumerTwo.acknowledge(message);
        Assert.assertEquals((int)personTwo.getId(), (int)2);
        Assert.assertEquals((String)personTwo.getName(), (String)"Lucy");
        producer.close();
        consumerTwo.close();
        this.admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
        producer = producerThreeBuilder.create();
        consumerTwo = comsumerBuilder.subscribe();
        producer.send((Object)new Schemas.PersonTwo(2, "Lucy"));
        message = consumerTwo.receive();
        personTwo = (Schemas.PersonTwo)message.getValue();
        consumerTwo.acknowledge(message);
        Assert.assertEquals((int)personTwo.getId(), (int)2);
        Assert.assertEquals((String)personTwo.getName(), (String)"Lucy");
        consumerTwo.close();
        producer.close();
    }

    @Test
    public void testSchemaComparison() throws Exception {
        String tenant = "public";
        String topic = "test-schema-comparison";
        String namespace = "test-namespace-" + SchemaCompatibilityCheckTest.randomName(16);
        String fqtn = TopicName.get((String)TopicDomain.persistent.value(), (String)"public", (String)namespace, (String)"test-schema-comparison").toString();
        NamespaceName namespaceName = NamespaceName.get((String)"public", (String)namespace);
        this.admin.namespaces().createNamespace("public/" + namespace, (Set)Sets.newHashSet((Object[])new String[]{CLUSTER_NAME}));
        Assert.assertEquals((Object)this.admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()), (Object)SchemaCompatibilityStrategy.UNDEFINED);
        byte[] changeSchemaBytes = (new String(Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo().getSchema(), StandardCharsets.UTF_8) + "/n   /n   /n").getBytes();
        SchemaInfo schemaInfo = SchemaInfo.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build();
        this.admin.schemas().createSchema(fqtn, schemaInfo);
        this.admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
        ProducerBuilder producerOneBuilder = this.pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class)).topic(fqtn);
        producerOneBuilder.create().close();
        Assert.assertEquals((byte[])changeSchemaBytes, (byte[])this.admin.schemas().getSchemaInfo(fqtn).getSchema());
        ProducerBuilder producerThreeBuilder = this.pulsarClient.newProducer(Schema.AVRO(Schemas.PersonThree.class)).topic(fqtn);
        try {
            producerThreeBuilder.create();
            Assert.fail();
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Schema not found and schema auto updating is disabled."));
        }
    }

    @Test(dataProvider="AllCheckSchemaCompatibilityStrategy")
    public void testProducerSendWithOldSchemaAndConsumerCanRead(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
        String tenant = "public";
        String topic = "test-consumer-compatibility";
        String namespace = "test-namespace-" + SchemaCompatibilityCheckTest.randomName(16);
        String fqtn = TopicName.get((String)TopicDomain.persistent.value(), (String)"public", (String)namespace, (String)"test-consumer-compatibility").toString();
        NamespaceName namespaceName = NamespaceName.get((String)"public", (String)namespace);
        this.admin.namespaces().createNamespace("public/" + namespace, (Set)Sets.newHashSet((Object[])new String[]{CLUSTER_NAME}));
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
        this.admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
        this.admin.schemas().createSchema(fqtn, Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(false).withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo());
        Producer producerOne = this.pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class)).topic(fqtn).create();
        Schemas.PersonOne personOne = new Schemas.PersonOne(10);
        Consumer consumerOne = this.pulsarClient.newConsumer(Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonOne.class).build())).subscriptionName(CLUSTER_NAME).topic(new String[]{fqtn}).subscribe();
        producerOne.send((Object)personOne);
        Message message = consumerOne.receive();
        personOne = (Schemas.PersonOne)message.getValue();
        Assert.assertEquals((int)personOne.getId(), (int)10);
        consumerOne.close();
        producerOne.close();
    }

    @Test
    public void testAutoProduceSchemaAlwaysCompatible() throws Exception {
        String tenant = "public";
        String topic = "topic" + SchemaCompatibilityCheckTest.randomName(16);
        String namespace = "test-namespace-" + SchemaCompatibilityCheckTest.randomName(16);
        String topicName = TopicName.get((String)TopicDomain.persistent.value(), (String)"public", (String)namespace, (String)topic).toString();
        NamespaceName namespaceName = NamespaceName.get((String)"public", (String)namespace);
        this.admin.namespaces().createNamespace("public/" + namespace, (Set)Sets.newHashSet((Object[])new String[]{CLUSTER_NAME}));
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
        Producer producer = this.pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).subscriptionName("my-sub").topic(new String[]{topicName}).subscribe();
        producer.close();
        consumer.close();
    }

    @Test(dataProvider="CanReadLastSchemaCompatibilityStrategy")
    public void testConsumerWithNotCompatibilitySchema(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
        String tenant = "public";
        String topic = "test-consumer-compatibility";
        String namespace = "test-namespace-" + SchemaCompatibilityCheckTest.randomName(16);
        String fqtn = TopicName.get((String)TopicDomain.persistent.value(), (String)"public", (String)namespace, (String)"test-consumer-compatibility").toString();
        NamespaceName namespaceName = NamespaceName.get((String)"public", (String)namespace);
        this.admin.namespaces().createNamespace("public/" + namespace, (Set)Sets.newHashSet((Object[])new String[]{CLUSTER_NAME}));
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
        this.admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
        this.admin.schemas().createSchema(fqtn, Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(false).withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo());
        try {
            this.pulsarClient.newConsumer(Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonFour.class).build())).subscriptionName(CLUSTER_NAME).topic(new String[]{fqtn}).subscribe();
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Unable to read schema"));
        }
    }

    public static String randomName(int numChars) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < numChars; ++i) {
            sb.append((char)(ThreadLocalRandom.current().nextInt(26) + 97));
        }
        return sb.toString();
    }
}

