/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.mqtt.test.server;

import io.netty.handler.codec.mqtt.MqttProperties;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttTopicSubscription;
import io.vertx.mqtt.test.server.MqttServerBaseTest;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.eclipse.paho.mqttv5.client.IMqttMessageListener;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttClientPersistence;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class Mqtt5ServerPublishTest
extends MqttServerBaseTest {
    private static final Logger log = LoggerFactory.getLogger(Mqtt5ServerPublishTest.class);
    private Async async;
    private static final String MQTT_TOPIC = "/my_topic";
    private static final String MQTT_MESSAGE = "Hello Vert.x MQTT Server";
    private String topic;
    private String message;
    private io.netty.handler.codec.mqtt.MqttProperties properties;
    private AtomicInteger nextMessageId = new AtomicInteger(0);
    private AtomicReference<MqttProperties> lastMessageProperties = new AtomicReference<Object>(null);

    @Before
    public void before(TestContext context) {
        this.setUp(context);
    }

    @After
    public void after(TestContext context) {
        this.tearDown(context);
    }

    @Test
    public void publishQos0(TestContext context) {
        io.netty.handler.codec.mqtt.MqttProperties props = new io.netty.handler.codec.mqtt.MqttProperties();
        String expectedContentType = "plain/text";
        props.add((MqttProperties.MqttProperty)new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.CONTENT_TYPE.value(), expectedContentType));
        this.publish(context, MQTT_TOPIC, MQTT_MESSAGE, 0, props);
        context.assertNotNull((Object)this.lastMessageProperties.get());
        context.assertEquals((Object)expectedContentType, (Object)this.lastMessageProperties.get().getContentType());
    }

    @Test
    public void publishQos1(TestContext context) {
        io.netty.handler.codec.mqtt.MqttProperties props = new io.netty.handler.codec.mqtt.MqttProperties();
        props.add((MqttProperties.MqttProperty)new MqttProperties.UserProperty("priority", "fast"));
        this.publish(context, MQTT_TOPIC, MQTT_MESSAGE, 1, props);
        context.assertNotNull((Object)this.lastMessageProperties.get());
        UserProperty userProp = (UserProperty)this.lastMessageProperties.get().getUserProperties().get(0);
        context.assertNotNull((Object)userProp);
        context.assertEquals((Object)userProp.getKey(), (Object)"priority");
        context.assertEquals((Object)userProp.getValue(), (Object)"fast");
    }

    @Test
    public void publishQos2(TestContext context) {
        io.netty.handler.codec.mqtt.MqttProperties props = new io.netty.handler.codec.mqtt.MqttProperties();
        int expectedMessageExpiry = 234;
        props.add((MqttProperties.MqttProperty)new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value(), Integer.valueOf(expectedMessageExpiry)));
        this.publish(context, MQTT_TOPIC, MQTT_MESSAGE, 2, props);
        context.assertNotNull((Object)this.lastMessageProperties.get());
        context.assertEquals((Object)expectedMessageExpiry, (Object)this.lastMessageProperties.get().getMessageExpiryInterval());
    }

    private void publish(TestContext context, String topic, String message, int qos, io.netty.handler.codec.mqtt.MqttProperties properties) {
        this.topic = topic;
        this.message = message;
        this.properties = properties;
        this.async = context.async(2);
        try {
            MemoryPersistence persistence = new MemoryPersistence();
            MqttClient client = new MqttClient(String.format("tcp://%s:%d", "localhost", 1883), "12345", (MqttClientPersistence)persistence);
            client.connect();
            MqttSubscription[] subscriptions = new MqttSubscription[]{new MqttSubscription(topic, qos)};
            client.subscribe(subscriptions, new IMqttMessageListener[]{new IMqttMessageListener(){

                public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                    log.info((Object)("Just received message [" + mqttMessage.toString() + "] on topic [" + topic + "] with QoS [" + mqttMessage.getQos() + "] and properties [" + mqttMessage.getProperties() + "]"));
                    Mqtt5ServerPublishTest.this.lastMessageProperties.set(mqttMessage.getProperties());
                    Mqtt5ServerPublishTest.this.async.countDown();
                    if (mqttMessage.getQos() == 0) {
                        Mqtt5ServerPublishTest.this.async.complete();
                    }
                }
            }});
            this.async.await();
        }
        catch (MqttException e) {
            context.fail((Throwable)e);
        }
    }

    @Override
    protected void endpointHandler(MqttEndpoint endpoint, TestContext context) {
        endpoint.subscribeHandler(subscribe -> {
            endpoint.subscribeAcknowledge(subscribe.messageId(), subscribe.topicSubscriptions().stream().map(MqttTopicSubscription::qualityOfService).collect(Collectors.toList()));
            endpoint.publish(this.topic, Buffer.buffer((String)this.message), ((MqttTopicSubscription)subscribe.topicSubscriptions().get(0)).qualityOfService(), false, false, this.nextMessageId.getAndIncrement(), this.properties, publishSent -> context.assertTrue(publishSent.succeeded()));
        }).publishAcknowledgeHandler(messageId -> {
            log.info((Object)("Message [" + messageId + "] acknowledged"));
            this.async.countDown();
        }).publishReceivedHandler(messageId -> endpoint.publishRelease(messageId.intValue())).publishCompletionHandler(messageId -> {
            log.info((Object)("Message [" + messageId + "] completed"));
            this.async.countDown();
        });
        endpoint.accept(false);
    }
}

