/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.admin;

import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.util.Utf8;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.rest.Topics;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.client.impl.schema.StringSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ProducerAck;
import org.apache.pulsar.websocket.data.ProducerAcks;
import org.apache.pulsar.websocket.data.ProducerMessage;
import org.apache.pulsar.websocket.data.ProducerMessages;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class TopicsTest
extends MockedPulsarServiceBaseTest {
    private Topics topics;
    private final String testLocalCluster = "test";
    private final String testTenant = "my-tenant";
    private final String testNamespace = "my-namespace";
    private final String testTopicName = "my-topic";

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        this.topics = (Topics)PowerMockito.spy((Object)new Topics());
        this.topics.setPulsar(this.pulsar);
        ((Topics)PowerMockito.doReturn((Object)TopicDomain.persistent.value()).when((Object)this.topics)).domain();
        ((Topics)PowerMockito.doReturn((Object)"test-app").when((Object)this.topics)).clientAppId();
        ((Topics)PowerMockito.doReturn((Object)PowerMockito.mock(AuthenticationDataHttps.class)).when((Object)this.topics)).clientAuthData();
        this.admin.clusters().createCluster("test", (ClusterData)new ClusterDataImpl());
        this.admin.tenants().createTenant("my-tenant", (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"role1", "role2"}), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.namespaces().createNamespace("my-tenant/my-namespace", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
    }

    @Override
    @AfterMethod
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testProduceToNonPartitionedTopic() throws Exception {
        this.admin.topics().createNonPartitionedTopic("persistent://my-tenant/my-namespace/my-topic");
        AsyncResponse asyncResponse = (AsyncResponse)PowerMockito.mock(AsyncResponse.class);
        StringSchema schema = StringSchema.utf8();
        ProducerMessages producerMessages = new ProducerMessages();
        producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)schema.getSchemaInfo()));
        producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)schema.getSchemaInfo()));
        String message = "[{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3}]";
        producerMessages.setMessages((List)ObjectMapperFactory.getThreadLocal().readValue(message, (TypeReference)new TypeReference<List<ProducerMessage>>(){}));
        this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)asyncResponse, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.OK.getStatusCode());
        Object responseEntity = ((Response)responseCaptor.getValue()).getEntity();
        Assert.assertTrue((boolean)(responseEntity instanceof ProducerAcks));
        ProducerAcks response = (ProducerAcks)responseEntity;
        Assert.assertEquals((int)response.getMessagePublishResults().size(), (int)3);
        Assert.assertEquals((long)response.getSchemaVersion(), (long)0L);
        for (int index = 0; index < response.getMessagePublishResults().size(); ++index) {
            Assert.assertEquals((int)Integer.parseInt(((ProducerAck)response.getMessagePublishResults().get(index)).getMessageId().split(":")[2]), (int)-1);
            Assert.assertEquals((int)((ProducerAck)response.getMessagePublishResults().get(index)).getErrorCode(), (int)0);
            Assert.assertTrue((((ProducerAck)response.getMessagePublishResults().get(index)).getMessageId().length() > 0 ? 1 : 0) != 0);
        }
    }

    @Test
    public void testProduceToPartitionedTopic() throws Exception {
        int index;
        this.admin.topics().createPartitionedTopic("persistent://my-tenant/my-namespace/my-topic-p", 5);
        AsyncResponse asyncResponse = (AsyncResponse)PowerMockito.mock(AsyncResponse.class);
        StringSchema schema = StringSchema.utf8();
        ProducerMessages producerMessages = new ProducerMessages();
        producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)schema.getSchemaInfo()));
        producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)schema.getSchemaInfo()));
        String message = "[{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3},{\"key\":\"my-key\",\"payload\":\"RestProducer:4\",\"eventTime\":1603045262772,\"sequenceId\":4},{\"key\":\"my-key\",\"payload\":\"RestProducer:5\",\"eventTime\":1603045262772,\"sequenceId\":5},{\"key\":\"my-key\",\"payload\":\"RestProducer:6\",\"eventTime\":1603045262772,\"sequenceId\":6},{\"key\":\"my-key\",\"payload\":\"RestProducer:7\",\"eventTime\":1603045262772,\"sequenceId\":7},{\"key\":\"my-key\",\"payload\":\"RestProducer:8\",\"eventTime\":1603045262772,\"sequenceId\":8},{\"key\":\"my-key\",\"payload\":\"RestProducer:9\",\"eventTime\":1603045262772,\"sequenceId\":9},{\"key\":\"my-key\",\"payload\":\"RestProducer:10\",\"eventTime\":1603045262772,\"sequenceId\":10}]";
        producerMessages.setMessages((List)ObjectMapperFactory.getThreadLocal().readValue(message, (TypeReference)new TypeReference<List<ProducerMessage>>(){}));
        this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic-p", false, producerMessages);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)asyncResponse, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.OK.getStatusCode());
        Object responseEntity = ((Response)responseCaptor.getValue()).getEntity();
        Assert.assertTrue((boolean)(responseEntity instanceof ProducerAcks));
        ProducerAcks response = (ProducerAcks)responseEntity;
        Assert.assertEquals((int)response.getMessagePublishResults().size(), (int)10);
        Assert.assertEquals((long)response.getSchemaVersion(), (long)0L);
        int[] messagePerPartition = new int[5];
        for (index = 0; index < response.getMessagePublishResults().size(); ++index) {
            int n = Integer.parseInt(((ProducerAck)response.getMessagePublishResults().get(index)).getMessageId().split(":")[2]);
            messagePerPartition[n] = messagePerPartition[n] + 1;
            Assert.assertEquals((int)((ProducerAck)response.getMessagePublishResults().get(index)).getErrorCode(), (int)0);
            Assert.assertTrue((((ProducerAck)response.getMessagePublishResults().get(index)).getMessageId().length() > 0 ? 1 : 0) != 0);
        }
        for (index = 0; index < messagePerPartition.length; ++index) {
            Assert.assertTrue((messagePerPartition[index] <= 2 ? 1 : 0) != 0);
        }
    }

    @Test
    public void testProduceToPartitionedTopicSpecificPartition() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://my-tenant/my-namespace/my-topic", 5);
        AsyncResponse asyncResponse = (AsyncResponse)PowerMockito.mock(AsyncResponse.class);
        StringSchema schema = StringSchema.utf8();
        ProducerMessages producerMessages = new ProducerMessages();
        producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)schema.getSchemaInfo()));
        producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)schema.getSchemaInfo()));
        String message = "[{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3},{\"key\":\"my-key\",\"payload\":\"RestProducer:4\",\"eventTime\":1603045262772,\"sequenceId\":4}]";
        producerMessages.setMessages((List)ObjectMapperFactory.getThreadLocal().readValue(message, (TypeReference)new TypeReference<List<ProducerMessage>>(){}));
        this.topics.produceOnPersistentTopicPartition(asyncResponse, "my-tenant", "my-namespace", "my-topic", 2, false, producerMessages);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)asyncResponse, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.OK.getStatusCode());
        Object responseEntity = ((Response)responseCaptor.getValue()).getEntity();
        Assert.assertTrue((boolean)(responseEntity instanceof ProducerAcks));
        ProducerAcks response = (ProducerAcks)responseEntity;
        Assert.assertEquals((int)response.getMessagePublishResults().size(), (int)4);
        Assert.assertEquals((long)response.getSchemaVersion(), (long)0L);
        for (int index = 0; index < response.getMessagePublishResults().size(); ++index) {
            Assert.assertEquals((int)Integer.parseInt(((ProducerAck)response.getMessagePublishResults().get(index)).getMessageId().split(":")[2]), (int)2);
            Assert.assertEquals((int)((ProducerAck)response.getMessagePublishResults().get(index)).getErrorCode(), (int)0);
            Assert.assertTrue((((ProducerAck)response.getMessagePublishResults().get(index)).getMessageId().length() > 0 ? 1 : 0) != 0);
        }
    }

    @Test
    public void testProduceFailed() throws Exception {
        this.admin.topics().createNonPartitionedTopic("persistent://my-tenant/my-namespace/my-topic");
        ((CompletableFuture)this.pulsar.getBrokerService().getTopic("persistent://my-tenant/my-namespace/my-topic", false).thenAccept(topic -> {
            try {
                PersistentTopic mockPersistentTopic = (PersistentTopic)PowerMockito.spy((Object)((PersistentTopic)topic.get()));
                final AtomicInteger count = new AtomicInteger();
                ((PersistentTopic)PowerMockito.doAnswer((Answer)new Answer(){

                    public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                        Topic.PublishContext publishContext = (Topic.PublishContext)invocationOnMock.getArgument(1);
                        if (count.getAndIncrement() < 2) {
                            publishContext.completed(null, -1L, -1L);
                        } else {
                            publishContext.completed((Exception)new BrokerServiceException.TopicFencedException("Fake exception"), -1L, -1L);
                        }
                        return null;
                    }
                }).when((Object)mockPersistentTopic)).publishMessage((ByteBuf)ArgumentMatchers.any(), (Topic.PublishContext)ArgumentMatchers.any());
                BrokerService mockBrokerService = (BrokerService)PowerMockito.spy((Object)this.pulsar.getBrokerService());
                ((BrokerService)PowerMockito.doReturn(CompletableFuture.completedFuture(Optional.of(mockPersistentTopic))).when((Object)mockBrokerService)).getTopic(ArgumentMatchers.anyString(), ArgumentMatchers.anyBoolean());
                ((PulsarService)PowerMockito.doReturn((Object)mockBrokerService).when((Object)this.pulsar)).getBrokerService();
                AsyncResponse asyncResponse = (AsyncResponse)PowerMockito.mock(AsyncResponse.class);
                StringSchema schema = StringSchema.utf8();
                ProducerMessages producerMessages = new ProducerMessages();
                producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)schema.getSchemaInfo()));
                producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)schema.getSchemaInfo()));
                String message = "[{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3},{\"key\":\"my-key\",\"payload\":\"RestProducer:4\",\"eventTime\":1603045262772,\"sequenceId\":4}]";
                producerMessages.setMessages((List)ObjectMapperFactory.getThreadLocal().readValue(message, (TypeReference)new TypeReference<List<ProducerMessage>>(){}));
                this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages);
                ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
                ((AsyncResponse)Mockito.verify((Object)asyncResponse, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
                Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.OK.getStatusCode());
                Object responseEntity = ((Response)responseCaptor.getValue()).getEntity();
                Assert.assertTrue((boolean)(responseEntity instanceof ProducerAcks));
                ProducerAcks response = (ProducerAcks)responseEntity;
                Assert.assertEquals((int)response.getMessagePublishResults().size(), (int)4);
                int errorResponse = 0;
                for (int index = 0; index < response.getMessagePublishResults().size(); ++index) {
                    int errorCode = ((ProducerAck)response.getMessagePublishResults().get(index)).getErrorCode();
                    if (0 == errorCode) {
                        Assert.assertEquals((int)Integer.parseInt(((ProducerAck)response.getMessagePublishResults().get(index)).getMessageId().split(":")[2]), (int)-1);
                        Assert.assertTrue((((ProducerAck)response.getMessagePublishResults().get(index)).getMessageId().length() > 0 ? 1 : 0) != 0);
                        continue;
                    }
                    ++errorResponse;
                    Assert.assertEquals((int)errorCode, (int)2);
                    Assert.assertEquals((String)((ProducerAck)response.getMessagePublishResults().get(index)).getErrorMsg(), (String)"org.apache.pulsar.broker.service.BrokerServiceException$TopicFencedException: Fake exception");
                }
                Assert.assertTrue((errorResponse == 2 ? 1 : 0) != 0);
            }
            catch (Throwable e) {
                Assert.fail((String)e.getMessage());
            }
        })).get();
    }

    @Test
    public void testLookUpWithRedirect() throws Exception {
        String topicName = "persistent://my-tenant/my-namespace/my-topic";
        URI requestPath = URI.create(this.pulsar.getWebServiceAddress() + "/topics/my-tenant/my-namespace/my-topic");
        this.admin.topics().createNonPartitionedTopic(topicName);
        PulsarService pulsar2 = this.startBroker(TopicsTest.getDefaultConf());
        ((Topics)PowerMockito.doReturn((Object)false).when((Object)this.topics)).isRequestHttps();
        UriInfo uriInfo = (UriInfo)PowerMockito.mock(UriInfo.class);
        ((UriInfo)PowerMockito.doReturn((Object)requestPath).when((Object)uriInfo)).getRequestUri();
        Whitebox.setInternalState((Object)this.topics, (String)"uri", (Object)uriInfo);
        this.topics.setPulsar(pulsar2);
        AsyncResponse asyncResponse = (AsyncResponse)PowerMockito.mock(AsyncResponse.class);
        ProducerMessages producerMessages = new ProducerMessages();
        producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)Schema.INT64.getSchemaInfo()));
        String message = "[]";
        producerMessages.setMessages((List)ObjectMapperFactory.getThreadLocal().readValue(message, (TypeReference)new TypeReference<List<ProducerMessage>>(){}));
        this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)asyncResponse, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((Object)((Response)responseCaptor.getValue()).getStatusInfo(), (Object)Response.Status.TEMPORARY_REDIRECT);
        Assert.assertEquals((String)((Response)responseCaptor.getValue()).getLocation().toString(), (String)requestPath.toString());
    }

    @Test
    public void testLookUpWithException() throws Exception {
        String topicName = "persistent://my-tenant/my-namespace/my-topic";
        this.admin.topics().createNonPartitionedTopic(topicName);
        NamespaceService nameSpaceService = (NamespaceService)PowerMockito.mock(NamespaceService.class);
        CompletableFuture future = new CompletableFuture();
        future.completeExceptionally((Throwable)new BrokerServiceException("Fake Exception"));
        CompletableFuture<Boolean> existFuture = new CompletableFuture<Boolean>();
        existFuture.complete(true);
        ((NamespaceService)PowerMockito.doReturn(future).when((Object)nameSpaceService)).getBrokerServiceUrlAsync((TopicName)ArgumentMatchers.any(), (LookupOptions)ArgumentMatchers.any());
        ((NamespaceService)PowerMockito.doReturn(existFuture).when((Object)nameSpaceService)).checkTopicExists((TopicName)ArgumentMatchers.any());
        ((PulsarService)PowerMockito.doReturn((Object)nameSpaceService).when((Object)this.pulsar)).getNamespaceService();
        AsyncResponse asyncResponse = (AsyncResponse)PowerMockito.mock(AsyncResponse.class);
        ProducerMessages producerMessages = new ProducerMessages();
        producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)Schema.INT64.getSchemaInfo()));
        String message = "[]";
        producerMessages.setMessages((List)ObjectMapperFactory.getThreadLocal().readValue(message, (TypeReference)new TypeReference<List<ProducerMessage>>(){}));
        this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse)Mockito.verify((Object)asyncResponse, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume((Throwable)responseCaptor.capture());
        Assert.assertEquals((String)((RestException)((Object)responseCaptor.getValue())).getMessage(), (String)"Can't find owner of given topic.");
    }

    @Test
    public void testLookUpTopicNotExist() throws Exception {
        String topicName = "persistent://my-tenant/my-namespace/my-topic";
        NamespaceService nameSpaceService = (NamespaceService)PowerMockito.mock(NamespaceService.class);
        CompletableFuture<Boolean> existFuture = new CompletableFuture<Boolean>();
        existFuture.complete(false);
        ((NamespaceService)PowerMockito.doReturn(existFuture).when((Object)nameSpaceService)).checkTopicExists((TopicName)ArgumentMatchers.any());
        ((PulsarService)PowerMockito.doReturn((Object)nameSpaceService).when((Object)this.pulsar)).getNamespaceService();
        AsyncResponse asyncResponse = (AsyncResponse)PowerMockito.mock(AsyncResponse.class);
        ProducerMessages producerMessages = new ProducerMessages();
        producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)Schema.INT64.getSchemaInfo()));
        String message = "[]";
        producerMessages.setMessages((List)ObjectMapperFactory.getThreadLocal().readValue(message, (TypeReference)new TypeReference<List<ProducerMessage>>(){}));
        this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse)Mockito.verify((Object)asyncResponse, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume((Throwable)responseCaptor.capture());
        Assert.assertEquals((String)((RestException)((Object)responseCaptor.getValue())).getMessage(), (String)"Fail to publish message: Topic not exist");
    }

    @Test
    public void testProduceWithLongSchema() throws Exception {
        String topicName = "persistent://my-tenant/my-namespace/my-topic";
        this.admin.topics().createNonPartitionedTopic(topicName);
        AsyncResponse asyncResponse = (AsyncResponse)PowerMockito.mock(AsyncResponse.class);
        Consumer consumer = this.pulsarClient.newConsumer(Schema.INT64).topic(new String[]{topicName}).subscriptionName("my-sub").subscriptionType(SubscriptionType.Exclusive).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        ProducerMessages producerMessages = new ProducerMessages();
        producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)Schema.INT64.getSchemaInfo()));
        String message = "[{\"key\":\"my-key\",\"payload\":\"111111111111\",\"eventTime\":1603045262772,\"sequenceId\":1},{\"key\":\"my-key\",\"payload\":\"222222222222\",\"eventTime\":1603045262772,\"sequenceId\":2},{\"key\":\"my-key\",\"payload\":\"333333333333\",\"eventTime\":1603045262772,\"sequenceId\":3},{\"key\":\"my-key\",\"payload\":\"444444444444\",\"eventTime\":1603045262772,\"sequenceId\":4},{\"key\":\"my-key\",\"payload\":\"555555555555\",\"eventTime\":1603045262772,\"sequenceId\":5}]";
        producerMessages.setMessages((List)ObjectMapperFactory.getThreadLocal().readValue(message, (TypeReference)new TypeReference<List<ProducerMessage>>(){}));
        this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)asyncResponse, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.OK.getStatusCode());
        Object responseEntity = ((Response)responseCaptor.getValue()).getEntity();
        Assert.assertTrue((boolean)(responseEntity instanceof ProducerAcks));
        ProducerAcks response = (ProducerAcks)responseEntity;
        Assert.assertEquals((int)response.getMessagePublishResults().size(), (int)5);
        Assert.assertEquals((long)response.getSchemaVersion(), (long)0L);
        for (int index = 0; index < response.getMessagePublishResults().size(); ++index) {
            Assert.assertEquals((int)Integer.parseInt(((ProducerAck)response.getMessagePublishResults().get(index)).getMessageId().split(":")[2]), (int)-1);
            Assert.assertEquals((int)((ProducerAck)response.getMessagePublishResults().get(index)).getErrorCode(), (int)0);
            Assert.assertTrue((((ProducerAck)response.getMessagePublishResults().get(index)).getMessageId().length() > 0 ? 1 : 0) != 0);
        }
        List<Long> expectedMsg = Arrays.asList(111111111111L, 222222222222L, 333333333333L, 444444444444L, 555555555555L);
        Message msg = null;
        for (int i = 0; i < 5; ++i) {
            msg = consumer.receive(2, TimeUnit.SECONDS);
            Assert.assertEquals((Object)expectedMsg.get(i), (Object)Schema.INT64.decode(msg.getData()));
            Assert.assertEquals((String)"my-key", (String)msg.getKey());
        }
    }

    @Test
    public void testProduceNoSchema() throws Exception {
        String topicName = "persistent://my-tenant/my-namespace/my-topic";
        this.admin.topics().createNonPartitionedTopic(topicName);
        AsyncResponse asyncResponse = (AsyncResponse)PowerMockito.mock(AsyncResponse.class);
        Consumer consumer = this.pulsarClient.newConsumer((Schema)StringSchema.utf8()).topic(new String[]{topicName}).subscriptionName("my-sub").subscriptionType(SubscriptionType.Exclusive).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        ProducerMessages producerMessages = new ProducerMessages();
        String message = "[{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3},{\"key\":\"my-key\",\"payload\":\"RestProducer:4\",\"eventTime\":1603045262772,\"sequenceId\":4},{\"key\":\"my-key\",\"payload\":\"RestProducer:5\",\"eventTime\":1603045262772,\"sequenceId\":5}]";
        producerMessages.setMessages((List)ObjectMapperFactory.getThreadLocal().readValue(message, (TypeReference)new TypeReference<List<ProducerMessage>>(){}));
        this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)asyncResponse, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.OK.getStatusCode());
        Object responseEntity = ((Response)responseCaptor.getValue()).getEntity();
        Assert.assertTrue((boolean)(responseEntity instanceof ProducerAcks));
        ProducerAcks response = (ProducerAcks)responseEntity;
        Assert.assertEquals((int)response.getMessagePublishResults().size(), (int)5);
        Assert.assertEquals((long)response.getSchemaVersion(), (long)0L);
        for (int index = 0; index < response.getMessagePublishResults().size(); ++index) {
            Assert.assertEquals((int)Integer.parseInt(((ProducerAck)response.getMessagePublishResults().get(index)).getMessageId().split(":")[2]), (int)-1);
            Assert.assertEquals((int)((ProducerAck)response.getMessagePublishResults().get(index)).getErrorCode(), (int)0);
            Assert.assertTrue((((ProducerAck)response.getMessagePublishResults().get(index)).getMessageId().length() > 0 ? 1 : 0) != 0);
        }
        List<String> expectedMsg = Arrays.asList("RestProducer:1", "RestProducer:2", "RestProducer:3", "RestProducer:4", "RestProducer:5");
        Message msg = null;
        for (int i = 0; i < 5; ++i) {
            msg = consumer.receive(2, TimeUnit.SECONDS);
            Assert.assertEquals((String)expectedMsg.get(i), (String)StringSchema.utf8().decode(msg.getData()));
            Assert.assertEquals((String)"my-key", (String)msg.getKey());
        }
    }

    @Test
    public void testProduceWithJsonSchema() throws Exception {
        String topicName = "persistent://my-tenant/my-namespace/my-topic";
        this.admin.topics().createNonPartitionedTopic(topicName);
        AsyncResponse asyncResponse = (AsyncResponse)PowerMockito.mock(AsyncResponse.class);
        GenericSchemaImpl jsonSchema = GenericJsonSchema.of((SchemaInfo)JSONSchema.of((SchemaDefinition)SchemaDefinition.builder().withPojo(PC.class).build()).getSchemaInfo());
        PC pc = new PC("dell", "alienware", 2021, GPU.AMD, new Seller("WA", "main street", 98004L));
        PC anotherPc = new PC("asus", "rog", 2020, GPU.NVIDIA, new Seller("CA", "back street", 90232L));
        Consumer consumer = this.pulsarClient.newConsumer((Schema)jsonSchema).topic(new String[]{topicName}).subscriptionName("my-sub").subscriptionType(SubscriptionType.Exclusive).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        ProducerMessages producerMessages = new ProducerMessages();
        producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)jsonSchema.getSchemaInfo()));
        String message = "[{\"key\":\"my-key\",\"payload\":\"" + ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)pc).replace("\"", "\\\"") + "\",\"eventTime\":1603045262772,\"sequenceId\":1},{\"key\":\"my-key\",\"payload\":\"" + ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)anotherPc).replace("\"", "\\\"") + "\",\"eventTime\":1603045262772,\"sequenceId\":2}]";
        producerMessages.setMessages((List)ObjectMapperFactory.getThreadLocal().readValue(message, (TypeReference)new TypeReference<List<ProducerMessage>>(){}));
        this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)asyncResponse, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.OK.getStatusCode());
        Object responseEntity = ((Response)responseCaptor.getValue()).getEntity();
        Assert.assertTrue((boolean)(responseEntity instanceof ProducerAcks));
        ProducerAcks response = (ProducerAcks)responseEntity;
        Assert.assertEquals((int)response.getMessagePublishResults().size(), (int)2);
        Assert.assertEquals((long)response.getSchemaVersion(), (long)0L);
        for (int index = 0; index < response.getMessagePublishResults().size(); ++index) {
            Assert.assertEquals((int)Integer.parseInt(((ProducerAck)response.getMessagePublishResults().get(index)).getMessageId().split(":")[2]), (int)-1);
            Assert.assertEquals((int)((ProducerAck)response.getMessagePublishResults().get(index)).getErrorCode(), (int)0);
            Assert.assertTrue((((ProducerAck)response.getMessagePublishResults().get(index)).getMessageId().length() > 0 ? 1 : 0) != 0);
        }
        List<PC> expected = Arrays.asList(pc, anotherPc);
        Message msg = null;
        for (int i = 0; i < 2; ++i) {
            msg = consumer.receive(2, TimeUnit.SECONDS);
            PC msgPc = (PC)ObjectMapperFactory.getThreadLocal().treeToValue((TreeNode)((GenericJsonRecord)jsonSchema.decode(msg.getData())).getJsonNode(), PC.class);
            Assert.assertEquals((String)msgPc.brand, (String)expected.get((int)i).brand);
            Assert.assertEquals((String)msgPc.model, (String)expected.get((int)i).model);
            Assert.assertEquals((int)msgPc.year, (int)expected.get((int)i).year);
            Assert.assertEquals((Object)((Object)msgPc.gpu), (Object)((Object)expected.get((int)i).gpu));
            Assert.assertEquals((String)msgPc.seller.state, (String)expected.get((int)i).seller.state);
            Assert.assertEquals((String)msgPc.seller.street, (String)expected.get((int)i).seller.street);
            Assert.assertEquals((long)msgPc.seller.zipCode, (long)expected.get((int)i).seller.zipCode);
            Assert.assertEquals((String)"my-key", (String)msg.getKey());
        }
    }

    @Test
    public void testProduceWithAvroSchema() throws Exception {
        String topicName = "persistent://my-tenant/my-namespace/my-topic";
        this.admin.topics().createNonPartitionedTopic(topicName);
        AsyncResponse asyncResponse = (AsyncResponse)PowerMockito.mock(AsyncResponse.class);
        GenericSchemaImpl avroSchema = GenericAvroSchema.of((SchemaInfo)AvroSchema.of((SchemaDefinition)SchemaDefinition.builder().withPojo(PC.class).build()).getSchemaInfo());
        PC pc = new PC("dell", "alienware", 2021, GPU.AMD, new Seller("WA", "main street", 98004L));
        PC anotherPc = new PC("asus", "rog", 2020, GPU.NVIDIA, new Seller("CA", "back street", 90232L));
        Consumer consumer = this.pulsarClient.newConsumer((Schema)avroSchema).topic(new String[]{topicName}).subscriptionName("my-sub").subscriptionType(SubscriptionType.Exclusive).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        ProducerMessages producerMessages = new ProducerMessages();
        producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)avroSchema.getSchemaInfo()));
        ReflectDatumWriter datumWriter = new ReflectDatumWriter(avroSchema.getAvroSchema());
        ByteArrayOutputStream outputStream1 = new ByteArrayOutputStream();
        ByteArrayOutputStream outputStream2 = new ByteArrayOutputStream();
        JsonEncoder encoder1 = EncoderFactory.get().jsonEncoder(avroSchema.getAvroSchema(), (OutputStream)outputStream1);
        JsonEncoder encoder2 = EncoderFactory.get().jsonEncoder(avroSchema.getAvroSchema(), (OutputStream)outputStream2);
        datumWriter.write((Object)pc, (Encoder)encoder1);
        encoder1.flush();
        datumWriter.write((Object)anotherPc, (Encoder)encoder2);
        encoder2.flush();
        String message = "[{\"key\":\"my-key\",\"payload\":\"" + outputStream1.toString().replace("\"", "\\\"") + "\",\"eventTime\":1603045262772,\"sequenceId\":1},{\"key\":\"my-key\",\"payload\":\"" + outputStream2.toString().replace("\"", "\\\"") + "\",\"eventTime\":1603045262772,\"sequenceId\":2}]";
        producerMessages.setMessages((List)ObjectMapperFactory.getThreadLocal().readValue(message, (TypeReference)new TypeReference<List<ProducerMessage>>(){}));
        this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)asyncResponse, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.OK.getStatusCode());
        Object responseEntity = ((Response)responseCaptor.getValue()).getEntity();
        Assert.assertTrue((boolean)(responseEntity instanceof ProducerAcks));
        ProducerAcks response = (ProducerAcks)responseEntity;
        Assert.assertEquals((int)response.getMessagePublishResults().size(), (int)2);
        Assert.assertEquals((long)response.getSchemaVersion(), (long)0L);
        for (int index = 0; index < response.getMessagePublishResults().size(); ++index) {
            Assert.assertEquals((int)Integer.parseInt(((ProducerAck)response.getMessagePublishResults().get(index)).getMessageId().split(":")[2]), (int)-1);
            Assert.assertEquals((int)((ProducerAck)response.getMessagePublishResults().get(index)).getErrorCode(), (int)0);
            Assert.assertTrue((((ProducerAck)response.getMessagePublishResults().get(index)).getMessageId().length() > 0 ? 1 : 0) != 0);
        }
        List<PC> expected = Arrays.asList(pc, anotherPc);
        Message msg = null;
        for (int i = 0; i < 2; ++i) {
            msg = consumer.receive(2, TimeUnit.SECONDS);
            GenericAvroRecord avroRecord = (GenericAvroRecord)avroSchema.decode(msg.getData());
            Assert.assertEquals((String)((Utf8)avroRecord.getAvroRecord().get("brand")).toString(), (String)expected.get((int)i).brand);
            Assert.assertEquals((String)((Utf8)avroRecord.getAvroRecord().get("model")).toString(), (String)expected.get((int)i).model);
            Assert.assertEquals((int)((Integer)avroRecord.getAvroRecord().get("year")), (int)expected.get((int)i).year);
            Assert.assertEquals((String)((GenericData.EnumSymbol)avroRecord.getAvroRecord().get("gpu")).toString(), (String)expected.get((int)i).gpu.toString());
            Assert.assertEquals((String)((Utf8)((GenericRecord)avroRecord.getAvroRecord().get("seller")).get("state")).toString(), (String)expected.get((int)i).seller.state);
            Assert.assertEquals((String)((Utf8)((GenericRecord)avroRecord.getAvroRecord().get("seller")).get("street")).toString(), (String)expected.get((int)i).seller.street);
            Assert.assertEquals((Object)((GenericRecord)avroRecord.getAvroRecord().get("seller")).get("zipCode"), (Object)expected.get((int)i).seller.zipCode);
            Assert.assertEquals((String)"my-key", (String)msg.getKey());
        }
    }

    @Test
    public void testProduceWithRestAndClientThenConsumeWithClient() throws Exception {
        String topicName = "persistent://my-tenant/my-namespace/my-topic";
        this.admin.topics().createNonPartitionedTopic(topicName);
        AsyncResponse asyncResponse = (AsyncResponse)PowerMockito.mock(AsyncResponse.class);
        Schema keyValueSchema = KeyValueSchemaImpl.of((Schema)StringSchema.utf8(), (Schema)StringSchema.utf8(), (KeyValueEncodingType)KeyValueEncodingType.SEPARATED);
        Producer producer = this.pulsarClient.newProducer(keyValueSchema).topic(topicName).create();
        Consumer consumer = this.pulsarClient.newConsumer(keyValueSchema).topic(new String[]{topicName}).subscriptionName("my-sub").subscriptionType(SubscriptionType.Exclusive).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        for (int i = 0; i < 3; ++i) {
            producer.newMessage(keyValueSchema).value((Object)new KeyValue((Object)"my-key", (Object)("ClientProducer:" + i))).send();
        }
        ProducerMessages producerMessages = new ProducerMessages();
        producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)StringSchema.utf8().getSchemaInfo()));
        producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)StringSchema.utf8().getSchemaInfo()));
        String message = "[{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3}]";
        producerMessages.setMessages((List)ObjectMapperFactory.getThreadLocal().readValue(message, (TypeReference)new TypeReference<List<ProducerMessage>>(){}));
        this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)asyncResponse, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.OK.getStatusCode());
        Object responseEntity = ((Response)responseCaptor.getValue()).getEntity();
        Assert.assertTrue((boolean)(responseEntity instanceof ProducerAcks));
        ProducerAcks response = (ProducerAcks)responseEntity;
        Assert.assertEquals((int)response.getMessagePublishResults().size(), (int)3);
        Assert.assertEquals((long)response.getSchemaVersion(), (long)0L);
        for (int index = 0; index < response.getMessagePublishResults().size(); ++index) {
            Assert.assertEquals((int)Integer.parseInt(((ProducerAck)response.getMessagePublishResults().get(index)).getMessageId().split(":")[2]), (int)-1);
            Assert.assertEquals((int)((ProducerAck)response.getMessagePublishResults().get(index)).getErrorCode(), (int)0);
            Assert.assertTrue((((ProducerAck)response.getMessagePublishResults().get(index)).getMessageId().length() > 0 ? 1 : 0) != 0);
        }
        List<String> expectedMsg = Arrays.asList("ClientProducer:0", "ClientProducer:1", "ClientProducer:2", "RestProducer:1", "RestProducer:2", "RestProducer:3");
        Message msg = null;
        for (int i = 0; i < 6; ++i) {
            msg = consumer.receive(2, TimeUnit.SECONDS);
            Assert.assertEquals((String)expectedMsg.get(i), (String)StringSchema.utf8().decode(msg.getData()));
            Assert.assertEquals((String)"bXkta2V5", (String)msg.getKey());
        }
    }

    @Test
    public void testProduceWithRestThenConsumeWithClient() throws Exception {
        int index;
        String topicName = "persistent://my-tenant/my-namespace/my-topic";
        this.admin.topics().createNonPartitionedTopic(topicName);
        AsyncResponse asyncResponse = (AsyncResponse)PowerMockito.mock(AsyncResponse.class);
        Schema keyValueSchema = KeyValueSchemaImpl.of((Schema)StringSchema.utf8(), (Schema)StringSchema.utf8(), (KeyValueEncodingType)KeyValueEncodingType.SEPARATED);
        Consumer consumer = this.pulsarClient.newConsumer(keyValueSchema).topic(new String[]{topicName}).subscriptionName("my-sub").subscriptionType(SubscriptionType.Exclusive).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        ProducerMessages producerMessages = new ProducerMessages();
        producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)StringSchema.utf8().getSchemaInfo()));
        producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)StringSchema.utf8().getSchemaInfo()));
        String message = "[{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3},{\"key\":\"my-key\",\"payload\":\"RestProducer:4\",\"eventTime\":1603045262772,\"sequenceId\":4},{\"key\":\"my-key\",\"payload\":\"RestProducer:5\",\"eventTime\":1603045262772,\"sequenceId\":5}]";
        producerMessages.setMessages((List)ObjectMapperFactory.getThreadLocal().readValue(message, (TypeReference)new TypeReference<List<ProducerMessage>>(){}));
        this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)asyncResponse, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.OK.getStatusCode());
        Object responseEntity = ((Response)responseCaptor.getValue()).getEntity();
        Assert.assertTrue((boolean)(responseEntity instanceof ProducerAcks));
        ProducerAcks response = (ProducerAcks)responseEntity;
        Assert.assertEquals((int)response.getMessagePublishResults().size(), (int)5);
        Assert.assertEquals((long)response.getSchemaVersion(), (long)0L);
        for (index = 0; index < response.getMessagePublishResults().size(); ++index) {
            Assert.assertEquals((int)Integer.parseInt(((ProducerAck)response.getMessagePublishResults().get(index)).getMessageId().split(":")[2]), (int)-1);
            Assert.assertEquals((int)((ProducerAck)response.getMessagePublishResults().get(index)).getErrorCode(), (int)0);
            Assert.assertTrue((((ProducerAck)response.getMessagePublishResults().get(index)).getMessageId().length() > 0 ? 1 : 0) != 0);
        }
        producerMessages = new ProducerMessages();
        producerMessages.setSchemaVersion(response.getSchemaVersion());
        message = "[{\"key\":\"my-key\",\"payload\":\"RestProducer:6\",\"eventTime\":1603045262772,\"sequenceId\":1},{\"key\":\"my-key\",\"payload\":\"RestProducer:7\",\"eventTime\":1603045262772,\"sequenceId\":2},{\"key\":\"my-key\",\"payload\":\"RestProducer:8\",\"eventTime\":1603045262772,\"sequenceId\":3},{\"key\":\"my-key\",\"payload\":\"RestProducer:9\",\"eventTime\":1603045262772,\"sequenceId\":4},{\"key\":\"my-key\",\"payload\":\"RestProducer:10\",\"eventTime\":1603045262772,\"sequenceId\":5}]";
        producerMessages.setMessages((List)ObjectMapperFactory.getThreadLocal().readValue(message, (TypeReference)new TypeReference<List<ProducerMessage>>(){}));
        this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages);
        ((AsyncResponse)Mockito.verify((Object)asyncResponse, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.OK.getStatusCode());
        responseEntity = ((Response)responseCaptor.getValue()).getEntity();
        Assert.assertTrue((boolean)(responseEntity instanceof ProducerAcks));
        response = (ProducerAcks)responseEntity;
        Assert.assertEquals((int)response.getMessagePublishResults().size(), (int)5);
        Assert.assertEquals((long)response.getSchemaVersion(), (long)0L);
        for (index = 0; index < response.getMessagePublishResults().size(); ++index) {
            Assert.assertEquals((int)Integer.parseInt(((ProducerAck)response.getMessagePublishResults().get(index)).getMessageId().split(":")[2]), (int)-1);
            Assert.assertEquals((int)((ProducerAck)response.getMessagePublishResults().get(index)).getErrorCode(), (int)0);
            Assert.assertTrue((((ProducerAck)response.getMessagePublishResults().get(index)).getMessageId().length() > 0 ? 1 : 0) != 0);
        }
        List<String> expectedMsg = Arrays.asList("RestProducer:1", "RestProducer:2", "RestProducer:3", "RestProducer:4", "RestProducer:5", "RestProducer:6", "RestProducer:7", "RestProducer:8", "RestProducer:9", "RestProducer:10");
        Message msg = null;
        for (int i = 0; i < 10; ++i) {
            msg = consumer.receive(2, TimeUnit.SECONDS);
            Assert.assertEquals((String)expectedMsg.get(i), (String)StringSchema.utf8().decode(msg.getData()));
            Assert.assertEquals((String)"bXkta2V5", (String)msg.getKey());
        }
    }

    @Test
    public void testProduceWithInCompatibleSchema() throws Exception {
        String topicName = "persistent://my-tenant/my-namespace/my-topic";
        this.admin.topics().createNonPartitionedTopic(topicName);
        AsyncResponse asyncResponse = (AsyncResponse)PowerMockito.mock(AsyncResponse.class);
        Producer producer = this.pulsarClient.newProducer((Schema)StringSchema.utf8()).topic(topicName).create();
        for (int i = 0; i < 3; ++i) {
            producer.send((Object)"message");
        }
        ProducerMessages producerMessages = new ProducerMessages();
        producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)StringSchema.utf8().getSchemaInfo()));
        producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)StringSchema.utf8().getSchemaInfo()));
        String message = "[{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3}]";
        producerMessages.setMessages((List)ObjectMapperFactory.getThreadLocal().readValue(message, (TypeReference)new TypeReference<List<ProducerMessage>>(){}));
        this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse)Mockito.verify((Object)asyncResponse, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume((Throwable)responseCaptor.capture());
        Assert.assertTrue((boolean)((RestException)((Object)responseCaptor.getValue())).getMessage().startsWith("Fail to publish message:java.util.concurrent.ExecutionException: org.apache.pulsar.broker.service.schema.exceptions.SchemaException: Unable to add schema SchemaData(type=KEY_VALUE, isDeleted=false, timestamp="));
        Assert.assertTrue((boolean)((RestException)((Object)responseCaptor.getValue())).getMessage().endsWith("user=Rest Producer, data=[0, 0, 0, 0, 0, 0, 0, 0], props={key.schema.properties={\"__charset\":\"UTF-8\"}, value.schema.properties={\"__charset\":\"UTF-8\"}, value.schema.type=STRING, key.schema.name=String, value.schema.name=String, kv.encoding.type=SEPARATED, key.schema.type=STRING}) to topic persistent://my-tenant/my-namespace/my-topic"));
    }

    private static enum GPU {
        AMD,
        NVIDIA;

    }

    private static class PC {
        public String brand;
        public String model;
        public int year;
        public GPU gpu;
        public Seller seller;

        public String getBrand() {
            return this.brand;
        }

        public String getModel() {
            return this.model;
        }

        public int getYear() {
            return this.year;
        }

        public GPU getGpu() {
            return this.gpu;
        }

        public Seller getSeller() {
            return this.seller;
        }

        public void setBrand(String brand) {
            this.brand = brand;
        }

        public void setModel(String model) {
            this.model = model;
        }

        public void setYear(int year) {
            this.year = year;
        }

        public void setGpu(GPU gpu) {
            this.gpu = gpu;
        }

        public void setSeller(Seller seller) {
            this.seller = seller;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof PC)) {
                return false;
            }
            PC other = (PC)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.getYear() != other.getYear()) {
                return false;
            }
            String this$brand = this.getBrand();
            String other$brand = other.getBrand();
            if (this$brand == null ? other$brand != null : !this$brand.equals(other$brand)) {
                return false;
            }
            String this$model = this.getModel();
            String other$model = other.getModel();
            if (this$model == null ? other$model != null : !this$model.equals(other$model)) {
                return false;
            }
            GPU this$gpu = this.getGpu();
            GPU other$gpu = other.getGpu();
            if (this$gpu == null ? other$gpu != null : !((Object)((Object)this$gpu)).equals((Object)other$gpu)) {
                return false;
            }
            Seller this$seller = this.getSeller();
            Seller other$seller = other.getSeller();
            return !(this$seller == null ? other$seller != null : !((Object)this$seller).equals(other$seller));
        }

        protected boolean canEqual(Object other) {
            return other instanceof PC;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            result = result * 59 + this.getYear();
            String $brand = this.getBrand();
            result = result * 59 + ($brand == null ? 43 : $brand.hashCode());
            String $model = this.getModel();
            result = result * 59 + ($model == null ? 43 : $model.hashCode());
            GPU $gpu = this.getGpu();
            result = result * 59 + ($gpu == null ? 43 : ((Object)((Object)$gpu)).hashCode());
            Seller $seller = this.getSeller();
            result = result * 59 + ($seller == null ? 43 : ((Object)$seller).hashCode());
            return result;
        }

        public String toString() {
            return "TopicsTest.PC(brand=" + this.getBrand() + ", model=" + this.getModel() + ", year=" + this.getYear() + ", gpu=" + (Object)((Object)this.getGpu()) + ", seller=" + this.getSeller() + ")";
        }

        public PC(String brand, String model, int year, GPU gpu, Seller seller) {
            this.brand = brand;
            this.model = model;
            this.year = year;
            this.gpu = gpu;
            this.seller = seller;
        }

        public PC() {
        }
    }

    private static class Seller {
        public String state;
        public String street;
        public long zipCode;

        public String getState() {
            return this.state;
        }

        public String getStreet() {
            return this.street;
        }

        public long getZipCode() {
            return this.zipCode;
        }

        public void setState(String state) {
            this.state = state;
        }

        public void setStreet(String street) {
            this.street = street;
        }

        public void setZipCode(long zipCode) {
            this.zipCode = zipCode;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof Seller)) {
                return false;
            }
            Seller other = (Seller)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.getZipCode() != other.getZipCode()) {
                return false;
            }
            String this$state = this.getState();
            String other$state = other.getState();
            if (this$state == null ? other$state != null : !this$state.equals(other$state)) {
                return false;
            }
            String this$street = this.getStreet();
            String other$street = other.getStreet();
            return !(this$street == null ? other$street != null : !this$street.equals(other$street));
        }

        protected boolean canEqual(Object other) {
            return other instanceof Seller;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            long $zipCode = this.getZipCode();
            result = result * 59 + (int)($zipCode >>> 32 ^ $zipCode);
            String $state = this.getState();
            result = result * 59 + ($state == null ? 43 : $state.hashCode());
            String $street = this.getStreet();
            result = result * 59 + ($street == null ? 43 : $street.hashCode());
            return result;
        }

        public String toString() {
            return "TopicsTest.Seller(state=" + this.getState() + ", street=" + this.getStreet() + ", zipCode=" + this.getZipCode() + ")";
        }

        public Seller(String state, String street, long zipCode) {
            this.state = state;
            this.street = street;
            this.zipCode = zipCode;
        }

        public Seller() {
        }
    }
}

