/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.mqtt.it;

import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.it.MqttClientBaseIT;
import java.nio.charset.Charset;
import org.junit.Assert;
import org.junit.Test;

public class MqttClientSubscribeIT
extends MqttClientBaseIT {
    private static final Logger log = LoggerFactory.getLogger(MqttClientSubscribeIT.class);
    private static final String MQTT_TOPIC = "/my_topic";
    private static final String MQTT_MESSAGE = "Hello Vert.x MQTT Client";
    private int messageId = 0;

    @Test
    public void subscribeQos2AndReceive(TestContext context) throws InterruptedException {
        this.subscribeAndReceive(context, MqttQoS.EXACTLY_ONCE);
    }

    @Test
    public void subscribeQos1AndReceive(TestContext context) throws InterruptedException {
        this.subscribeAndReceive(context, MqttQoS.AT_LEAST_ONCE);
    }

    @Test
    public void subscribeQoS0AndReceive(TestContext context) throws InterruptedException {
        this.subscribeAndReceive(context, MqttQoS.AT_MOST_ONCE);
    }

    @Test
    public void subscribeQoS0(TestContext context) throws InterruptedException {
        this.subscribe(context, MqttQoS.AT_MOST_ONCE);
    }

    @Test
    public void subscribeQoS1(TestContext context) throws InterruptedException {
        this.subscribe(context, MqttQoS.AT_LEAST_ONCE);
    }

    @Test
    public void subscribeQoS2(TestContext context) throws InterruptedException {
        this.subscribe(context, MqttQoS.EXACTLY_ONCE);
    }

    @Test
    public void unsubscribedNoMessageReceived(TestContext context) throws InterruptedException {
        Async publish = context.async(2);
        Async async = context.async();
        MqttClient subscriber1 = MqttClient.create((Vertx)Vertx.vertx());
        MqttClient subscriber2 = MqttClient.create((Vertx)Vertx.vertx());
        MqttClient publisher = MqttClient.create((Vertx)Vertx.vertx());
        subscriber1.connect(this.port, this.host, ar -> {
            Assert.assertTrue((boolean)ar.succeeded());
            subscriber1.publishHandler(message -> {
                log.error((Object)("Subscriber " + subscriber1.clientId() + " received message " + new String(message.payload().getBytes())));
                context.fail();
            });
            subscriber1.subscribe(MQTT_TOPIC, MqttQoS.AT_MOST_ONCE.value(), ar1 -> {
                Assert.assertTrue((boolean)ar1.succeeded());
                log.info((Object)("Subscriber " + subscriber1.clientId() + " subscribed to " + MQTT_TOPIC));
                subscriber1.unsubscribe(MQTT_TOPIC, ar2 -> {
                    Assert.assertTrue((boolean)ar2.succeeded());
                    log.info((Object)("Subscriber " + subscriber1.clientId() + " un-subscribed from " + MQTT_TOPIC));
                    publish.countDown();
                });
            });
        });
        subscriber2.connect(this.port, this.host, ar -> {
            Assert.assertTrue((boolean)ar.succeeded());
            subscriber2.publishHandler(message -> {
                log.error((Object)("Subscriber " + subscriber2.clientId() + " received message " + new String(message.payload().getBytes())));
                async.complete();
            });
            subscriber2.subscribe(MQTT_TOPIC, MqttQoS.AT_MOST_ONCE.value(), ar1 -> {
                Assert.assertTrue((boolean)ar1.succeeded());
                log.info((Object)("Subscriber " + subscriber2.clientId() + " subscribed to " + MQTT_TOPIC));
                publish.countDown();
            });
        });
        publish.await();
        publisher.connect(this.port, this.host, ar -> publisher.publish(MQTT_TOPIC, Buffer.buffer((byte[])MQTT_MESSAGE.getBytes()), MqttQoS.AT_MOST_ONCE, false, false, ar1 -> {
            Assert.assertTrue((boolean)ar.succeeded());
            this.messageId = (Integer)ar1.result();
            log.info((Object)("Publishing message id = " + this.messageId));
        }));
        async.await();
    }

    private void subscribeAndReceive(TestContext context, MqttQoS qos) {
        Async async = context.async();
        MqttClient client = MqttClient.create((Vertx)Vertx.vertx());
        client.publishHandler(publish -> {
            Assert.assertTrue((publish.qosLevel() == qos ? 1 : 0) != 0);
            log.info((Object)("Just received message on [" + publish.topicName() + "] payload [" + publish.payload().toString(Charset.defaultCharset()) + "] with QoS [" + publish.qosLevel() + "]"));
            client.disconnect();
            async.countDown();
        });
        client.connect(this.port, this.host, ar -> {
            Assert.assertTrue((boolean)ar.succeeded());
            client.subscribe(MQTT_TOPIC, qos.value());
            client.publish(MQTT_TOPIC, Buffer.buffer((byte[])MQTT_MESSAGE.getBytes()), qos, false, false);
        });
        async.await();
    }

    private void subscribe(TestContext context, MqttQoS qos) {
        this.messageId = 0;
        Async async = context.async();
        MqttClient client = MqttClient.create((Vertx)Vertx.vertx());
        client.subscribeCompletionHandler(suback -> {
            Assert.assertTrue((suback.messageId() == this.messageId ? 1 : 0) != 0);
            Assert.assertTrue((boolean)suback.grantedQoSLevels().contains(qos.value()));
            log.info((Object)("subscribing complete for message id = " + suback.messageId() + " with QoS " + suback.grantedQoSLevels()));
            client.disconnect();
            async.countDown();
        });
        client.connect(this.port, this.host, ar -> {
            Assert.assertTrue((boolean)ar.succeeded());
            client.subscribe(MQTT_TOPIC, qos.value(), done -> {
                Assert.assertTrue((boolean)done.succeeded());
                this.messageId = (Integer)done.result();
                log.info((Object)("subscribing on [/my_topic] with QoS [" + qos.value() + "] message id = " + this.messageId));
            });
        });
        async.await();
    }
}

