/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.mqtt.test.client;

import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetSocket;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.MqttServer;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(value=VertxUnitRunner.class)
public class MqttConnectTest {
    private Vertx vertx;
    private MqttServer server;
    private NetServer proxyServer;

    @Before
    public void before() {
        this.vertx = Vertx.vertx();
        this.server = MqttServer.create((Vertx)this.vertx);
        this.proxyServer = this.vertx.createNetServer();
    }

    @After
    public void after(TestContext ctx) {
        this.proxyServer.close(ctx.asyncAssertSuccess(v1 -> this.server.close(ctx.asyncAssertSuccess(v2 -> this.vertx.close(ctx.asyncAssertSuccess())))));
    }

    @Test
    public void concurrentConnect(TestContext ctx) {
        this.server.endpointHandler(endpoint -> {
            endpoint.accept(false);
            endpoint.publish("test", Buffer.buffer(), MqttQoS.AT_LEAST_ONCE, false, false);
        });
        Async serverLatch = ctx.async();
        this.server.listen(1883, ctx.asyncAssertSuccess(v -> serverLatch.complete()));
        serverLatch.awaitSuccess(10000L);
        MqttClient client = MqttClient.create((Vertx)this.vertx);
        Async msglatch = ctx.async();
        client.connect(1883, "localhost", ctx.asyncAssertSuccess(ack -> client.publishHandler(msg -> msglatch.complete())));
        client.connect(1883, "localhost", ctx.asyncAssertFailure(err -> ctx.assertEquals(IllegalStateException.class, err.getClass())));
    }

    @Test
    public void reconnectInCloseHandler(TestContext ctx) {
        this.server.endpointHandler(endpoint -> endpoint.accept(false));
        Async serverLatch = ctx.async();
        this.server.listen(0, ctx.asyncAssertSuccess(v -> serverLatch.complete()));
        serverLatch.awaitSuccess(10000L);
        int port = this.server.actualPort();
        MqttClient client = MqttClient.create((Vertx)this.vertx);
        NetClient proxyClient = this.vertx.createNetClient();
        this.proxyServer.connectHandler(so1 -> {
            so1.pause();
            proxyClient.connect(port, "localhost", ar -> {
                if (ar.succeeded()) {
                    NetSocket so2 = (NetSocket)ar.result();
                    this.vertx.setTimer(1000L, id -> so1.close());
                    so1.handler(arg_0 -> ((NetSocket)so2).write(arg_0));
                    so2.handler(arg_0 -> ((NetSocket)so1).write(arg_0));
                    so1.closeHandler(v -> so2.close());
                    so2.closeHandler(v -> so1.close());
                } else {
                    so1.close();
                }
                so1.resume();
            });
        });
        Async proxyLatch = ctx.async();
        this.proxyServer.listen(1883, "localhost", ctx.asyncAssertSuccess(v -> proxyLatch.complete()));
        proxyLatch.awaitSuccess(10000L);
        Async async = ctx.async();
        client.connect(1883, "localhost", ctx.asyncAssertSuccess(ack1 -> client.closeHandler(v1 -> client.connect(1883, "localhost", ctx.asyncAssertSuccess(ack2 -> client.closeHandler(v2 -> async.complete()))))));
    }

    @Test
    public void disconnectThenReconnect(TestContext ctx) {
        this.server.endpointHandler(endpoint -> endpoint.accept(false));
        Async serverLatch = ctx.async();
        this.server.listen(1883, ctx.asyncAssertSuccess(v -> serverLatch.complete()));
        serverLatch.awaitSuccess(10000L);
        MqttClient client = MqttClient.create((Vertx)this.vertx);
        client.connect(1883, "localhost", ctx.asyncAssertSuccess(ack1 -> client.disconnect(ctx.asyncAssertSuccess(v -> {
            ctx.assertFalse(client.isConnected());
            client.connect(1883, "localhost", ctx.asyncAssertSuccess(ack2 -> ctx.assertTrue(client.isConnected())));
        }))));
    }

    @Test
    public void disconnectBeforeConnAck(TestContext ctx) {
        MqttClient client = MqttClient.create((Vertx)this.vertx);
        Async async = ctx.async();
        this.server.endpointHandler(endpoint -> client.disconnect(ctx.asyncAssertSuccess(v -> async.complete())));
        Async serverLatch = ctx.async();
        this.server.listen(1883, ctx.asyncAssertSuccess(v -> serverLatch.complete()));
        serverLatch.awaitSuccess(10000L);
        client.connect(1883, "localhost", ctx.asyncAssertFailure(err -> {}));
    }

    @Test
    public void disconnectWhenConnecting(TestContext ctx) {
        MqttClient client = MqttClient.create((Vertx)this.vertx);
        AtomicBoolean accept = new AtomicBoolean();
        this.server.endpointHandler(endpoint -> {
            if (accept.get()) {
                endpoint.accept(false);
            }
        });
        Async serverLatch = ctx.async();
        this.server.listen(1883, ctx.asyncAssertSuccess(v -> serverLatch.complete()));
        serverLatch.awaitSuccess(10000L);
        client.connect(1883, "localhost", ctx.asyncAssertFailure(err -> {}));
        client.disconnect(ctx.asyncAssertSuccess(v -> {
            accept.set(true);
            client.connect(1883, "localhost", ctx.asyncAssertSuccess(err -> {}));
        }));
    }

    @Test
    public void rejectThenAccept(TestContext ctx) {
        MqttClient client = MqttClient.create((Vertx)this.vertx);
        AtomicBoolean rejectedOnce = new AtomicBoolean();
        this.server.endpointHandler(endpoint -> {
            if (rejectedOnce.getAndSet(true)) {
                endpoint.accept(false);
            } else {
                endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
            }
        });
        Async serverLatch = ctx.async();
        this.server.listen(1883, ctx.asyncAssertSuccess(v -> serverLatch.complete()));
        serverLatch.awaitSuccess(10000L);
        client.connect(1883, "localhost", ctx.asyncAssertFailure(err -> client.connect(1883, "localhost", ctx.asyncAssertSuccess(msg -> {}))));
    }

    @Test
    public void mqttClientReconnectAfterFailure(TestContext ctx) throws Exception {
        MqttClientOptions options = new MqttClientOptions();
        MqttClient mqttClient = MqttClient.create((Vertx)Vertx.vertx(), (MqttClientOptions)options);
        Async async = ctx.async();
        mqttClient.connect(1883, "localhost", ctx.asyncAssertFailure(err -> async.complete()));
        async.awaitSuccess(10000L);
        Async serverLatch = ctx.async();
        this.server.endpointHandler(endpoint -> endpoint.accept(false));
        this.server.listen(1883, "localhost", ctx.asyncAssertSuccess(v -> serverLatch.complete()));
        serverLatch.awaitSuccess(10000L);
        mqttClient.connect(1883, "localhost", ctx.asyncAssertSuccess());
    }
}

