/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.mqtt.test.server;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttTopicSubscription;
import io.vertx.mqtt.test.server.MqttServerBaseTest;
import java.util.stream.Collectors;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class MqttServerPublishTest
extends MqttServerBaseTest {
    private static final Logger log = LoggerFactory.getLogger(MqttServerPublishTest.class);
    private Async async;
    private static final String MQTT_TOPIC = "/my_topic";
    private static final String MQTT_MESSAGE = "Hello Vert.x MQTT Server";
    private String topic;
    private String message;

    @Before
    public void before(TestContext context) {
        this.setUp(context);
    }

    @After
    public void after(TestContext context) {
        this.tearDown(context);
    }

    @Test
    public void publishQos0(TestContext context) {
        this.publish(context, MQTT_TOPIC, MQTT_MESSAGE, 0);
    }

    @Test
    public void publishQos1(TestContext context) {
        this.publish(context, MQTT_TOPIC, MQTT_MESSAGE, 1);
    }

    @Test
    public void publishQos2(TestContext context) {
        this.publish(context, MQTT_TOPIC, MQTT_MESSAGE, 2);
    }

    private void publish(TestContext context, String topic, String message, int qos) {
        this.topic = topic;
        this.message = message;
        this.async = context.async(2);
        try {
            MemoryPersistence persistence = new MemoryPersistence();
            MqttClient client = new MqttClient(String.format("tcp://%s:%d", "localhost", 1883), "12345", (MqttClientPersistence)persistence);
            client.connect();
            client.subscribe(topic, qos, new IMqttMessageListener(){

                public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                    log.info((Object)("Just received message [" + mqttMessage.toString() + "] on topic [" + topic + "] with QoS [" + mqttMessage.getQos() + "]"));
                    if (mqttMessage.getQos() == 0) {
                        MqttServerPublishTest.this.async.complete();
                    }
                }
            });
            this.async.await();
            context.assertTrue(true);
        }
        catch (MqttException e) {
            context.assertTrue(false);
            e.printStackTrace();
        }
    }

    @Override
    protected void endpointHandler(MqttEndpoint endpoint, TestContext context) {
        endpoint.subscribeHandler(subscribe -> {
            endpoint.subscribeAcknowledge(subscribe.messageId(), subscribe.topicSubscriptions().stream().map(MqttTopicSubscription::qualityOfService).collect(Collectors.toList()));
            endpoint.publish(this.topic, Buffer.buffer((String)this.message), ((MqttTopicSubscription)subscribe.topicSubscriptions().get(0)).qualityOfService(), false, false, publishSent -> {
                context.assertTrue(publishSent.succeeded());
                this.async.complete();
            });
        }).publishAcknowledgeHandler(messageId -> {
            log.info((Object)("Message [" + messageId + "] acknowledged"));
            this.async.complete();
        }).publishReceivedHandler(messageId -> endpoint.publishRelease(messageId.intValue())).publishCompletionHandler(messageId -> {
            log.info((Object)("Message [" + messageId + "] acknowledged"));
            this.async.complete();
        });
        endpoint.accept(false);
    }
}

