/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.assertj.core.util.Sets;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class BrokerEntryMetadataE2ETest
extends BrokerTestBase {
    private static final String BATCH_HEADER = "X-Pulsar-num-batch-message";
    private static final String BATCH_SIZE_HEADER = "X-Pulsar-batch-size";

    @DataProvider(name="subscriptionTypes")
    public static Object[] subscriptionTypes() {
        return new Object[]{SubscriptionType.Exclusive, SubscriptionType.Failover, SubscriptionType.Shared, SubscriptionType.Key_Shared};
    }

    @Override
    @BeforeClass
    protected void setup() throws Exception {
        this.conf.setBrokerEntryMetadataInterceptors((Set)Sets.newTreeSet((Object[])new String[]{"org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor", "org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor"}));
        this.conf.setExposingBrokerEntryMetadataToClientEnabled(true);
        this.baseSetup();
    }

    @Override
    @AfterClass(alwaysRun=true)
    protected void cleanup() throws Exception {
        this.internalCleanup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="subscriptionTypes")
    public void testProduceAndConsume(SubscriptionType subType) throws Exception {
        String topic = this.newTopicName();
        int messages = 10;
        Producer producer = this.pulsarClient.newProducer().topic(topic).create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionType(subType).subscriptionName("my-sub").subscribe();
            try {
                for (int i = 0; i < 10; ++i) {
                    producer.send((Object)String.valueOf(i).getBytes());
                }
                int receives = 0;
                for (int i = 0; i < 10; ++i) {
                    Message received = consumer.receive();
                    ++receives;
                    Assert.assertEquals((int)i, (int)Integer.valueOf(new String((byte[])received.getValue())));
                }
                Assert.assertEquals((int)10, (int)receives);
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=20000L)
    public void testPeekMessage() throws Exception {
        String topic = this.newTopicName();
        String subscription = "my-sub";
        long eventTime = 200L;
        long deliverAtTime = 300L;
        Producer producer = this.pulsarClient.newProducer().topic(topic).create();
        try {
            long sendTime = System.currentTimeMillis();
            producer.newMessage().eventTime(200L).deliverAt(300L).value((Object)"hello".getBytes()).send();
            this.admin.topics().createSubscription(topic, "my-sub", MessageId.earliest);
            List messages = this.admin.topics().peekMessages(topic, "my-sub", 1);
            Assert.assertEquals((int)messages.size(), (int)1);
            MessageImpl message = (MessageImpl)messages.get(0);
            Assert.assertEquals((byte[])message.getData(), (byte[])"hello".getBytes());
            Assert.assertEquals((long)message.getEventTime(), (long)200L);
            Assert.assertEquals((long)message.getDeliverAtTime(), (long)300L);
            Assert.assertTrue((message.getPublishTime() >= sendTime ? 1 : 0) != 0);
            BrokerEntryMetadata entryMetadata = message.getBrokerEntryMetadata();
            Assert.assertEquals((long)entryMetadata.getIndex(), (long)0L);
            Assert.assertTrue((entryMetadata.getBrokerTimestamp() >= sendTime ? 1 : 0) != 0);
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=20000L)
    public void testGetMessageById() throws Exception {
        String topic = this.newTopicName();
        String subscription = "my-sub";
        long eventTime = 200L;
        long deliverAtTime = 300L;
        Producer producer = this.pulsarClient.newProducer().topic(topic).create();
        try {
            long sendTime = System.currentTimeMillis();
            MessageIdImpl messageId = (MessageIdImpl)producer.newMessage().eventTime(200L).deliverAt(300L).value((Object)"hello".getBytes()).send();
            this.admin.topics().createSubscription(topic, "my-sub", MessageId.earliest);
            MessageImpl message = (MessageImpl)this.admin.topics().getMessageById(topic, messageId.getLedgerId(), messageId.getEntryId());
            Assert.assertEquals((byte[])message.getData(), (byte[])"hello".getBytes());
            Assert.assertEquals((long)message.getEventTime(), (long)200L);
            Assert.assertEquals((long)message.getDeliverAtTime(), (long)300L);
            Assert.assertTrue((message.getPublishTime() >= sendTime ? 1 : 0) != 0);
            BrokerEntryMetadata entryMetadata = message.getBrokerEntryMetadata();
            Assert.assertEquals((long)entryMetadata.getIndex(), (long)0L);
            Assert.assertTrue((entryMetadata.getBrokerTimestamp() >= sendTime ? 1 : 0) != 0);
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=20000L)
    public void testExamineMessage() throws Exception {
        String topic = this.newTopicName();
        String subscription = "my-sub";
        long eventTime = 200L;
        long deliverAtTime = 300L;
        Producer producer = this.pulsarClient.newProducer().topic(topic).create();
        try {
            long sendTime = System.currentTimeMillis();
            producer.newMessage().eventTime(200L).deliverAt(300L).value((Object)"hello".getBytes()).send();
            this.admin.topics().createSubscription(topic, "my-sub", MessageId.earliest);
            MessageImpl message = (MessageImpl)this.admin.topics().examineMessage(topic, "earliest", 1L);
            Assert.assertEquals((byte[])message.getData(), (byte[])"hello".getBytes());
            Assert.assertEquals((long)message.getEventTime(), (long)200L);
            Assert.assertEquals((long)message.getDeliverAtTime(), (long)300L);
            Assert.assertTrue((message.getPublishTime() >= sendTime ? 1 : 0) != 0);
            BrokerEntryMetadata entryMetadata = message.getBrokerEntryMetadata();
            Assert.assertEquals((long)entryMetadata.getIndex(), (long)0L);
            Assert.assertTrue((entryMetadata.getBrokerTimestamp() >= sendTime ? 1 : 0) != 0);
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=20000L)
    public void testBatchMessage() throws Exception {
        String topic = this.newTopicName();
        String subscription = "my-sub";
        long eventTime = 200L;
        Producer producer = this.pulsarClient.newProducer().topic(topic).enableBatching(true).create();
        try {
            long sendTime = System.currentTimeMillis();
            MessageIdImpl messageId = (MessageIdImpl)producer.newMessage().eventTime(200L).value((Object)"hello".getBytes()).send();
            this.admin.topics().createSubscription(topic, "my-sub", MessageId.earliest);
            List messages = this.admin.topics().peekMessages(topic, "my-sub", 1);
            Assert.assertEquals((int)messages.size(), (int)1);
            MessageImpl message = (MessageImpl)messages.get(0);
            Assert.assertEquals((byte[])message.getData(), (byte[])"hello".getBytes());
            Assert.assertTrue((message.getPublishTime() >= sendTime ? 1 : 0) != 0);
            BrokerEntryMetadata entryMetadata = message.getBrokerEntryMetadata();
            Assert.assertTrue((entryMetadata.getBrokerTimestamp() >= sendTime ? 1 : 0) != 0);
            Assert.assertEquals((long)entryMetadata.getIndex(), (long)0L);
            System.out.println(message.getProperties());
            Assert.assertEquals((int)Integer.parseInt(message.getProperty(BATCH_HEADER)), (int)1);
            Assert.assertTrue((Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0 ? 1 : 0) != 0);
            message = (MessageImpl)this.admin.topics().getMessageById(topic, messageId.getLedgerId(), messageId.getEntryId());
            Assert.assertEquals((byte[])message.getData(), (byte[])"hello".getBytes());
            Assert.assertTrue((message.getPublishTime() >= sendTime ? 1 : 0) != 0);
            entryMetadata = message.getBrokerEntryMetadata();
            Assert.assertTrue((entryMetadata.getBrokerTimestamp() >= sendTime ? 1 : 0) != 0);
            Assert.assertEquals((long)entryMetadata.getIndex(), (long)0L);
            System.out.println(message.getProperties());
            Assert.assertEquals((int)Integer.parseInt(message.getProperty(BATCH_HEADER)), (int)1);
            Assert.assertTrue((Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0 ? 1 : 0) != 0);
            message = (MessageImpl)this.admin.topics().examineMessage(topic, "earliest", 1L);
            Assert.assertEquals((byte[])message.getData(), (byte[])"hello".getBytes());
            Assert.assertTrue((message.getPublishTime() >= sendTime ? 1 : 0) != 0);
            entryMetadata = message.getBrokerEntryMetadata();
            Assert.assertTrue((entryMetadata.getBrokerTimestamp() >= sendTime ? 1 : 0) != 0);
            Assert.assertEquals((long)entryMetadata.getIndex(), (long)0L);
            System.out.println(message.getProperties());
            Assert.assertEquals((int)Integer.parseInt(message.getProperty(BATCH_HEADER)), (int)1);
            Assert.assertTrue((Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0 ? 1 : 0) != 0);
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=20000L)
    public void testGetLastMessageId() throws Exception {
        String topic = "persistent://prop/ns-abc/topic-test";
        String subscription = "my-sub";
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-test").create();
        try {
            producer.newMessage().value((Object)"hello".getBytes()).send();
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic-test"}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("my-sub").subscribe();
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=20000L)
    public void testConsumerGetBrokerEntryMetadataForIndividualMessage() throws Exception {
        String topic = this.newTopicName();
        String subscription = "my-sub";
        Producer producer = this.pulsarClient.newProducer().topic(topic).enableBatching(false).create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("my-sub").subscribe();
            try {
                int i;
                long sendTime = System.currentTimeMillis();
                int messages = 10;
                for (i = 0; i < 10; ++i) {
                    producer.send((Object)String.valueOf(i).getBytes());
                }
                for (i = 0; i < 10; ++i) {
                    Message received = consumer.receive();
                    Assert.assertTrue((received.hasBrokerPublishTime() && received.getBrokerPublishTime().orElse(-1L) >= sendTime ? 1 : 0) != 0);
                    Assert.assertTrue((received.hasIndex() && received.getIndex().orElse(-1L) == (long)i ? 1 : 0) != 0);
                }
                producer.close();
                consumer.close();
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=20000L)
    public void testConsumerGetBrokerEntryMetadataForBatchMessage() throws Exception {
        String topic = this.newTopicName();
        String subscription = "my-sub";
        Producer producer = this.pulsarClient.newProducer().topic(topic).enableBatching(true).batchingMaxPublishDelay(1L, TimeUnit.MINUTES).create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("my-sub").subscribe();
            try {
                int numOfMessages;
                long sendTime = System.currentTimeMillis();
                for (numOfMessages = 0; numOfMessages < 5; ++numOfMessages) {
                    producer.sendAsync((Object)String.valueOf(numOfMessages).getBytes());
                }
                producer.flush();
                while (numOfMessages < 10) {
                    producer.sendAsync((Object)String.valueOf(numOfMessages).getBytes());
                    ++numOfMessages;
                }
                producer.flush();
                for (int i = 0; i < numOfMessages; ++i) {
                    Message received = consumer.receive();
                    Assert.assertTrue((received.hasBrokerPublishTime() && received.getBrokerPublishTime().orElse(-1L) >= sendTime ? 1 : 0) != 0);
                    Assert.assertTrue((received.hasIndex() && received.getIndex().orElse(-1L) == (long)i ? 1 : 0) != 0);
                }
                producer.close();
                consumer.close();
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testManagedLedgerTotalSize() throws Exception {
        String topic = this.newTopicName();
        int messages = 10;
        this.admin.topics().createNonPartitionedTopic(topic);
        this.admin.lookups().lookupTopic(topic);
        ManagedLedgerImpl managedLedger = ((Optional)this.pulsar.getBrokerService().getTopicIfExists(topic).get()).map(topicObject -> (ManagedLedgerImpl)((PersistentTopic)topicObject).getManagedLedger()).orElse(null);
        Assert.assertNotNull((Object)managedLedger);
        ManagedCursor cursor = managedLedger.openCursor("cursor");
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topic).create();
        try {
            for (int i = 0; i < 10; ++i) {
                producer.send((Object)("msg-" + i));
            }
            Assert.assertTrue((managedLedger.getTotalSize() > 0L ? 1 : 0) != 0);
            managedLedger.getConfig().setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
            managedLedger.getConfig().setMaxEntriesPerLedger(1);
            managedLedger.rollCurrentLedgerIfFull();
            Awaitility.await().atMost(Duration.ofSeconds(3L)).until(() -> managedLedger.getLedgersInfo().size() > 1);
            List ledgerInfoList = managedLedger.getLedgersInfoAsList();
            Assert.assertEquals((int)ledgerInfoList.size(), (int)2);
            Assert.assertEquals((long)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledgerInfoList.get(0)).getSize(), (long)managedLedger.getTotalSize());
            cursor.close();
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }
}

