/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.mqtt.test.client;

import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import java.util.LinkedList;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(value=VertxUnitRunner.class)
public class MqttClientOutOfOrderAcksTest {
    private static final Logger log = LoggerFactory.getLogger(MqttClientOutOfOrderAcksTest.class);
    private static final String MQTT_TOPIC = "/my_topic";
    private static final String MQTT_MESSAGE = "Hello Vert.x MQTT Client";
    Vertx vertx = Vertx.vertx();
    MqttServer server;

    @Test
    public void publishQoS1OutOfOrderAcks(TestContext context) throws InterruptedException {
        this.clientSendThreePublishMessages(MqttQoS.AT_LEAST_ONCE, context);
    }

    @Test
    public void publishQoS2OutOfOrderAcks(TestContext context) throws InterruptedException {
        this.clientSendThreePublishMessages(MqttQoS.EXACTLY_ONCE, context);
    }

    private void clientSendThreePublishMessages(MqttQoS mqttQoS, TestContext context) {
        Async async = context.async(3);
        MqttClient client = MqttClient.create((Vertx)this.vertx);
        LinkedList<Integer> expectOrder = new LinkedList<Integer>();
        expectOrder.add(2);
        expectOrder.add(1);
        expectOrder.add(3);
        client.publishCompletionHandler(h -> {
            context.assertEquals(h, expectOrder.poll());
            log.info((Object)("[CLIENT] Publish completed for message with id: " + h));
            async.countDown();
        });
        client.connect(1883, "localhost", c -> {
            for (int i = 0; i < 3; ++i) {
                client.publish(MQTT_TOPIC, Buffer.buffer((byte[])MQTT_MESSAGE.getBytes()), mqttQoS, false, false, h -> log.info((Object)("[CLIENT] publishing message id = " + h.result())));
            }
        });
        async.await();
        client.disconnect();
    }

    @Before
    public void before(TestContext context) {
        this.server = MqttServer.create((Vertx)this.vertx);
        this.server.exceptionHandler(t -> context.fail());
        this.server.endpointHandler(MqttClientOutOfOrderAcksTest::serverLogic).listen(context.asyncAssertSuccess());
    }

    @After
    public void after(TestContext ctx) {
        this.server.close(ctx.asyncAssertSuccess(v -> this.vertx.close(ctx.asyncAssertSuccess())));
    }

    private static void serverLogic(MqttEndpoint mqttEndpoint) {
        log.info((Object)"[SERVER] Client connected");
        mqttEndpoint.publishHandler(p -> {
            log.info((Object)("[SERVER] Received PUBLISH with message id = " + p.messageId()));
            if (p.qosLevel().equals((Object)MqttQoS.EXACTLY_ONCE) && p.messageId() == 3) {
                mqttEndpoint.publishReceived(3);
                mqttEndpoint.publishReceived(2);
                mqttEndpoint.publishReceived(1);
            }
            if (p.qosLevel().equals((Object)MqttQoS.AT_LEAST_ONCE) && p.messageId() == 3) {
                mqttEndpoint.publishAcknowledge(2);
                mqttEndpoint.publishAcknowledge(1);
                mqttEndpoint.publishAcknowledge(3);
            }
        });
        mqttEndpoint.publishReleaseHandler(pr -> {
            log.info((Object)("[SERVER] Receive PUBREL with message id = " + pr));
            if (pr == 1) {
                mqttEndpoint.publishComplete(2);
                mqttEndpoint.publishComplete(1);
                mqttEndpoint.publishComplete(3);
            }
        });
        mqttEndpoint.disconnectHandler(d -> log.info((Object)"[SERVER] Client disconnected"));
        mqttEndpoint.accept(false);
    }
}

