/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.mqtt.test.client;

import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttTopicSubscription;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(value=VertxUnitRunner.class)
public class MqttClientKeepAliveTest {
    private Vertx vertx;
    private MqttServer server;

    private void startServer(TestContext ctx) {
        Async async = ctx.async();
        this.server.listen(ctx.asyncAssertSuccess(server -> async.complete()));
        async.awaitSuccess(10000L);
    }

    @Before
    public void before(TestContext ctx) {
        this.vertx = Vertx.vertx();
        this.server = MqttServer.create((Vertx)this.vertx);
    }

    @After
    public void after(TestContext ctx) {
        this.server.close(ctx.asyncAssertSuccess(v -> this.vertx.close(ctx.asyncAssertSuccess())));
    }

    @Test
    public void autoKeepAlive(TestContext ctx) {
        AtomicInteger pings = new AtomicInteger();
        this.server.endpointHandler(endpoint -> {
            endpoint.accept(false);
            endpoint.autoKeepAlive(true);
            endpoint.pingHandler(v -> pings.incrementAndGet());
        });
        this.startServer(ctx);
        MqttClientOptions options = new MqttClientOptions();
        options.setAutoKeepAlive(true);
        options.setKeepAliveInterval(1);
        MqttClient client = MqttClient.create((Vertx)this.vertx, (MqttClientOptions)options);
        client.connect(1883, "localhost", ctx.asyncAssertSuccess(ack -> {
            Async async = ctx.async();
            AtomicInteger pongs = new AtomicInteger();
            client.pingResponseHandler(v -> {
                if (pongs.incrementAndGet() == 4) {
                    client.disconnect();
                }
            });
            client.closeHandler(v -> {
                Assert.assertEquals((long)4L, (long)pings.get());
                Assert.assertEquals((long)4L, (long)pongs.get());
                async.complete();
            });
        }));
    }

    @Test
    public void clientWillDisconnectOnMissingPingResponse(TestContext ctx) {
        AtomicInteger pings = new AtomicInteger();
        this.server.endpointHandler(endpoint -> {
            endpoint.autoKeepAlive(false);
            endpoint.accept(false);
            endpoint.pingHandler(v -> pings.incrementAndGet());
        });
        this.startServer(ctx);
        MqttClientOptions options = new MqttClientOptions();
        options.setKeepAliveInterval(1);
        MqttClient client = MqttClient.create((Vertx)this.vertx, (MqttClientOptions)options);
        client.connect(1883, "localhost", ctx.asyncAssertSuccess(ack -> {
            Async async = ctx.async();
            client.closeHandler(v -> {
                Assert.assertEquals((long)2L, (long)pings.get());
                async.complete();
            });
        }));
    }

    @Test
    public void clientWillDisconnectOnMissingManualPingResponse(TestContext ctx) {
        AtomicInteger pings = new AtomicInteger();
        this.server.endpointHandler(endpoint -> {
            endpoint.accept(false);
            endpoint.autoKeepAlive(false);
            endpoint.pingHandler(v -> pings.incrementAndGet());
        });
        this.startServer(ctx);
        MqttClientOptions options = new MqttClientOptions();
        options.setKeepAliveInterval(2);
        options.setAutoKeepAlive(false);
        MqttClient client = MqttClient.create((Vertx)this.vertx, (MqttClientOptions)options);
        client.connect(1883, "localhost", ctx.asyncAssertSuccess(ack -> {
            Async async = ctx.async();
            AtomicInteger pongs = new AtomicInteger();
            client.pingResponseHandler(v -> pongs.incrementAndGet());
            client.ping();
            client.closeHandler(v -> {
                Assert.assertEquals((long)0L, (long)pongs.get());
                Assert.assertEquals((long)1L, (long)pings.get());
                async.complete();
            });
        }));
    }

    @Test
    public void clientSendingRegularMessageDoesNotPreventClientPings(TestContext ctx) {
        AtomicInteger pings = new AtomicInteger();
        AtomicInteger messages = new AtomicInteger();
        this.server.endpointHandler(endpoint -> {
            endpoint.accept(false);
            endpoint.pingHandler(v -> pings.incrementAndGet());
            endpoint.publishHandler(msg -> {
                if (messages.incrementAndGet() == 4) {
                    endpoint.close();
                }
            });
        });
        this.startServer(ctx);
        MqttClientOptions options = new MqttClientOptions();
        options.setKeepAliveInterval(2);
        options.setAutoKeepAlive(true);
        MqttClient client = MqttClient.create((Vertx)this.vertx, (MqttClientOptions)options);
        client.connect(1883, "localhost", ctx.asyncAssertSuccess(ack -> {
            Async async = ctx.async();
            long timerID = this.vertx.setPeriodic(500L, id -> client.publish("greetings", Buffer.buffer((String)"hello"), MqttQoS.AT_MOST_ONCE, false, false));
            AtomicInteger pongs = new AtomicInteger();
            client.pingResponseHandler(v -> pongs.incrementAndGet());
            client.closeHandler(v -> {
                this.vertx.cancelTimer(timerID);
                ctx.assertEquals((Object)0, (Object)pings.get());
                ctx.assertEquals((Object)0, (Object)pongs.get());
                async.complete();
            });
        }));
    }

    @Test
    public void serverSendingRegularMessageDoesNotPreventClientPings(TestContext ctx) {
        AtomicInteger pings = new AtomicInteger();
        this.server.endpointHandler(endpoint -> {
            endpoint.accept(false);
            endpoint.subscribeHandler(subscribe -> {
                List grantedQosLevels = subscribe.topicSubscriptions().stream().map(MqttTopicSubscription::qualityOfService).collect(Collectors.toList());
                endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQosLevels);
                long id = this.vertx.setPeriodic(500L, handler -> endpoint.publish(((MqttTopicSubscription)subscribe.topicSubscriptions().get(0)).topicName(), Buffer.buffer((String)"hello"), ((MqttTopicSubscription)subscribe.topicSubscriptions().get(0)).qualityOfService(), false, false));
                endpoint.unsubscribeHandler(unsub -> {
                    this.vertx.cancelTimer(id);
                    endpoint.close();
                });
            });
            endpoint.pingHandler(v -> pings.incrementAndGet());
        });
        this.startServer(ctx);
        MqttClientOptions options = new MqttClientOptions();
        options.setKeepAliveInterval(1);
        options.setAutoKeepAlive(true);
        MqttClient client = MqttClient.create((Vertx)this.vertx, (MqttClientOptions)options);
        client.connect(1883, "localhost", ctx.asyncAssertSuccess(ack -> {
            Async async = ctx.async();
            AtomicInteger pongs = new AtomicInteger();
            client.subscribe("topic/topic", 0);
            AtomicInteger count = new AtomicInteger();
            client.publishHandler(msg -> {
                if (count.incrementAndGet() == 5) {
                    client.unsubscribe("topic/topic");
                }
            });
            client.pingResponseHandler(v -> pongs.incrementAndGet());
            client.closeHandler(v -> {
                ctx.assertTrue(pings.get() > 0);
                ctx.assertTrue(pongs.get() > 0);
                async.complete();
            });
        }));
    }
}

