/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.mqtt.test.server;

import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.NetClient;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttTopicSubscription;
import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode;
import io.vertx.mqtt.test.server.MqttServerBaseTest;
import java.util.ArrayList;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttClientPersistence;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(value=VertxUnitRunner.class)
public class Mqtt5ServerSubscribeTest
extends MqttServerBaseTest {
    private static final Logger log = LoggerFactory.getLogger(Mqtt5ServerSubscribeTest.class);
    private Async async;
    private int requestedQos;
    private boolean requestedRetainAsPublished;
    private boolean requestedNoLocal;
    private int requestedRetainHandling;
    private static final String MQTT_TOPIC = "/my_topic";
    private static final String MQTT_TOPIC_FAILURE = "/my_topic/failure";
    private static final String MQTT_FAILURE_REASON = "test reason";

    @Before
    public void before(TestContext context) {
        this.setUp(context);
    }

    @After
    public void after(TestContext context) {
        this.tearDown(context);
    }

    @Test
    public void subscribeQos0(TestContext context) {
        this.subscribe(context, MQTT_TOPIC, 0, true, false, 0);
    }

    @Test
    public void subscribeQos1(TestContext context) {
        this.subscribe(context, MQTT_TOPIC, 1, false, true, 1);
    }

    @Test
    public void subscribeQos2(TestContext context) {
        this.subscribe(context, MQTT_TOPIC, 2, true, true, 2);
    }

    @Test
    public void subscribeFailure(TestContext context) {
        this.subscribe(context, MQTT_TOPIC_FAILURE, 0, false, false, 0);
    }

    private void subscribe(TestContext context, String topic, int expectedQos, boolean retainAsPublished, boolean noLocal, int retainHandling) {
        block2: {
            this.async = context.async();
            try {
                MemoryPersistence persistence = new MemoryPersistence();
                MqttClient client = new MqttClient(String.format("tcp://%s:%d", "localhost", 1883), "12345", (MqttClientPersistence)persistence);
                client.connect();
                MqttSubscription subscription = new MqttSubscription(topic, expectedQos);
                this.requestedQos = expectedQos;
                subscription.setNoLocal(noLocal);
                subscription.setRetainAsPublished(retainAsPublished);
                subscription.setRetainHandling(retainHandling);
                this.requestedNoLocal = noLocal;
                this.requestedRetainAsPublished = retainAsPublished;
                this.requestedRetainHandling = retainHandling;
                client.subscribe(new MqttSubscription[]{subscription});
                this.async.await();
            }
            catch (MqttException e) {
                e.printStackTrace();
                if (!topic.equals(MQTT_TOPIC_FAILURE)) break block2;
                context.assertEquals((Object)e.getReasonCode(), (Object)143);
                context.assertEquals((Object)e.getMessage(), (Object)MQTT_FAILURE_REASON);
            }
        }
    }

    @Test
    public void subscribeUnsupportedMqttVersion(TestContext context) {
        Async async = context.async();
        NetClient client = this.vertx.createNetClient();
        client.connect(1883, "localhost", context.asyncAssertSuccess(so -> {
            so.write((Object)Buffer.buffer((byte[])new byte[]{16, 17, 0, 4, 77, 81, 84, 84, 6, 2, 0, 60, 0, 5, 49, 50, 51, 52, 53}));
            Buffer received = Buffer.buffer();
            so.handler(arg_0 -> ((Buffer)received).appendBuffer(arg_0));
            so.exceptionHandler(arg_0 -> ((TestContext)context).fail(arg_0));
            so.closeHandler(v -> {
                Buffer expected = Buffer.buffer((byte[])new byte[]{32, 2, 0, 1});
                context.assertEquals((Object)expected, (Object)received);
                async.complete();
            });
        }));
    }

    @Override
    protected void endpointHandler(MqttEndpoint endpoint, TestContext context) {
        endpoint.subscribeHandler(subscribe -> {
            MqttTopicSubscription subscription = (MqttTopicSubscription)subscribe.topicSubscriptions().get(0);
            context.assertEquals((Object)this.requestedQos, (Object)subscription.subscriptionOption().qos().value());
            context.assertEquals((Object)this.requestedNoLocal, (Object)subscription.subscriptionOption().isNoLocal());
            context.assertEquals((Object)this.requestedRetainAsPublished, (Object)subscription.subscriptionOption().isRetainAsPublished());
            context.assertEquals((Object)this.requestedRetainHandling, (Object)subscription.subscriptionOption().retainHandling().value());
            ArrayList<MqttSubAckReasonCode> reasonCodes = new ArrayList<MqttSubAckReasonCode>();
            MqttProperties subackProperties = new MqttProperties();
            if (((MqttTopicSubscription)subscribe.topicSubscriptions().get(0)).topicName().equals(MQTT_TOPIC_FAILURE)) {
                reasonCodes.add(MqttSubAckReasonCode.TOPIC_FILTER_INVALID);
                subackProperties.add((MqttProperties.MqttProperty)new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.REASON_STRING.value(), MQTT_FAILURE_REASON));
            } else {
                reasonCodes.add(MqttSubAckReasonCode.qosGranted((MqttQoS)((MqttTopicSubscription)subscribe.topicSubscriptions().get(0)).qualityOfService()));
            }
            endpoint.subscribeAcknowledge(subscribe.messageId(), reasonCodes, subackProperties);
            this.async.complete();
        });
        endpoint.accept(false);
    }
}

