/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TlsProducerConsumerBase;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups={"broker-api"})
public class TlsProducerConsumerTest
extends TlsProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(TlsProducerConsumerTest.class);

    @Test(timeOut=30000L)
    public void testTlsLargeSizeMessage() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int MESSAGE_SIZE = 16385;
        log.info("-- message size -- {}", (Object)16385);
        this.internalSetUpForClient(true, this.pulsar.getBrokerServiceUrlTls());
        this.internalSetUpForNamespace();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
        for (int i = 0; i < 10; ++i) {
            byte[] message = new byte[16385];
            Arrays.fill(message, (byte)i);
            producer.send((Object)message);
        }
        Message msg = null;
        for (int i = 0; i < 10; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            byte[] expected = new byte[16385];
            Arrays.fill(expected, (byte)i);
            Assert.assertEquals((byte[])expected, (byte[])msg.getData());
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(timeOut=30000L)
    public void testTlsClientAuthOverBinaryProtocol() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int MESSAGE_SIZE = 16385;
        log.info("-- message size -- {}", (Object)16385);
        this.internalSetUpForNamespace();
        this.internalSetUpForClient(false, this.pulsar.getBrokerServiceUrlTls());
        try {
            this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
            Assert.fail((String)"Server should have failed the TLS handshake since client didn't .");
        }
        catch (Exception exception) {
            // empty catch block
        }
        this.internalSetUpForClient(true, this.pulsar.getBrokerServiceUrlTls());
        try {
            this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
        }
        catch (Exception ex) {
            Assert.fail((String)"Should not fail since certs are sent.");
        }
    }

    @Test(timeOut=30000L)
    public void testTlsClientAuthOverHTTPProtocol() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int MESSAGE_SIZE = 16385;
        log.info("-- message size -- {}", (Object)16385);
        this.internalSetUpForNamespace();
        this.internalSetUpForClient(false, this.pulsar.getWebServiceAddressTls());
        try {
            this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
            Assert.fail((String)"Server should have failed the TLS handshake since client didn't .");
        }
        catch (Exception exception) {
            // empty catch block
        }
        this.internalSetUpForClient(true, this.pulsar.getWebServiceAddressTls());
        try {
            this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
        }
        catch (Exception ex) {
            Assert.fail((String)"Should not fail since certs are sent.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=60000L)
    public void testTlsCertsFromDynamicStream() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topicName = "persistent://my-property/use/my-ns/my-topic1";
        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrlTls()).enableTls(true).allowTlsInsecureConnection(false).operationTimeout(1000, TimeUnit.MILLISECONDS);
        AtomicInteger index = new AtomicInteger(0);
        ByteArrayInputStream certStream = this.createByteInputStream("./src/test/resources/authentication/tls/client-cert.pem");
        ByteArrayInputStream keyStream = this.createByteInputStream("./src/test/resources/authentication/tls/client-key.pem");
        ByteArrayInputStream trustStoreStream = this.createByteInputStream("./src/test/resources/authentication/tls/cacert.pem");
        Supplier<ByteArrayInputStream> certProvider = () -> this.getStream(index, certStream);
        Supplier<ByteArrayInputStream> keyProvider = () -> this.getStream(index, keyStream);
        Supplier<ByteArrayInputStream> trustStoreProvider = () -> this.getStream(index, trustStoreStream);
        AuthenticationTls auth = new AuthenticationTls(certProvider, keyProvider, trustStoreProvider);
        clientBuilder.authentication((Authentication)auth);
        PulsarClient pulsarClient = clientBuilder.build();
        try {
            Consumer consumer = pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("my-subscriber-name").subscribe();
            PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topicName).get();
            topicRef.close(false);
            Producer producer = (Producer)pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").createAsync().get(30L, TimeUnit.SECONDS);
            for (int i = 0; i < 10; ++i) {
                producer.send((Object)("test" + i).getBytes());
            }
            Message msg = null;
            for (int i = 0; i < 10; ++i) {
                msg = consumer.receive(5, TimeUnit.SECONDS);
                String exepctedMsg = "test" + i;
                Assert.assertEquals((byte[])exepctedMsg.getBytes(), (byte[])msg.getData());
            }
            consumer.acknowledgeCumulative(msg);
            consumer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        finally {
            if (Collections.singletonList(pulsarClient).get(0) != null) {
                pulsarClient.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTlsCertsFromDynamicStreamExpiredAndRenewCert() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrlTls()).enableTls(true).allowTlsInsecureConnection(false).operationTimeout(1000, TimeUnit.MILLISECONDS);
        AtomicInteger certIndex = new AtomicInteger(1);
        AtomicInteger keyIndex = new AtomicInteger(0);
        AtomicInteger trustStoreIndex = new AtomicInteger(1);
        ByteArrayInputStream certStream = this.createByteInputStream("./src/test/resources/authentication/tls/client-cert.pem");
        ByteArrayInputStream keyStream = this.createByteInputStream("./src/test/resources/authentication/tls/client-key.pem");
        ByteArrayInputStream trustStoreStream = this.createByteInputStream("./src/test/resources/authentication/tls/cacert.pem");
        Supplier<ByteArrayInputStream> certProvider = () -> this.getStream(certIndex, certStream, keyStream);
        Supplier<ByteArrayInputStream> keyProvider = () -> this.getStream(keyIndex, keyStream);
        Supplier<ByteArrayInputStream> trustStoreProvider = () -> this.getStream(trustStoreIndex, trustStoreStream, keyStream);
        AuthenticationTls auth = new AuthenticationTls(certProvider, keyProvider, trustStoreProvider);
        clientBuilder.authentication((Authentication)auth);
        PulsarClient pulsarClient = clientBuilder.build();
        try {
            Consumer consumer = null;
            try {
                consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
                Assert.fail((String)"should have failed due to invalid tls cert");
            }
            catch (PulsarClientException pulsarClientException) {
                // empty catch block
            }
            certIndex.set(0);
            try {
                consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
                Assert.fail((String)"should have failed due to invalid tls cert");
            }
            catch (PulsarClientException pulsarClientException) {
                // empty catch block
            }
            trustStoreIndex.set(0);
            consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
            consumer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        finally {
            if (Collections.singletonList(pulsarClient).get(0) != null) {
                pulsarClient.close();
            }
        }
    }

    private ByteArrayInputStream createByteInputStream(String filePath) throws IOException {
        try (FileInputStream inStream = new FileInputStream(filePath);){
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            IOUtils.copy((InputStream)inStream, (OutputStream)baos);
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(baos.toByteArray());
            return byteArrayInputStream;
        }
    }

    private ByteArrayInputStream getStream(AtomicInteger index, ByteArrayInputStream ... streams) {
        return streams[index.intValue()];
    }
}

