/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.mqtt.test.server;

import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(value=VertxUnitRunner.class)
public class MqttServerTest {
    private static final Logger log = LoggerFactory.getLogger(MqttServerTest.class);
    protected static final String MQTT_SERVER_HOST = "localhost";
    protected static final int MQTT_SERVER_PORT = 1883;
    private Vertx vertx;

    @Before
    public void before() {
        this.vertx = Vertx.vertx();
    }

    @After
    public void after(TestContext context) {
        this.vertx.close(context.asyncAssertSuccess());
    }

    @Test
    public void listenWithoutEndpointHandler(TestContext context) {
        MqttServer server = MqttServer.create((Vertx)this.vertx, (MqttServerOptions)new MqttServerOptions().setHost(MQTT_SERVER_HOST).setPort(1883));
        server.listen(context.asyncAssertFailure(err -> context.assertEquals(IllegalStateException.class, err.getClass())));
    }

    @Test
    public void sharedServersRoundRobin(TestContext context) {
        int numServers = VertxOptions.DEFAULT_EVENT_LOOP_POOL_SIZE / 2 - 1;
        int numConnections = numServers * 20;
        ArrayList<MqttServer> servers = new ArrayList<MqttServer>();
        ConcurrentHashSet connectedServers = new ConcurrentHashSet();
        CountDownLatch latchListen = new CountDownLatch(numServers);
        CountDownLatch latchConns = new CountDownLatch(numConnections);
        ConcurrentHashMap connectCount = new ConcurrentHashMap();
        try {
            int i;
            for (i = 0; i < numServers; ++i) {
                MqttServer server = MqttServer.create((Vertx)this.vertx, (MqttServerOptions)new MqttServerOptions().setHost(MQTT_SERVER_HOST).setPort(1883));
                servers.add(server);
                server.endpointHandler(arg_0 -> MqttServerTest.lambda$sharedServersRoundRobin$1((Set)connectedServers, server, connectCount, latchConns, arg_0)).listen(ar -> {
                    if (ar.succeeded()) {
                        log.info((Object)("MQTT server listening on port " + ((MqttServer)ar.result()).actualPort()));
                        latchListen.countDown();
                    } else {
                        log.error((Object)"Error starting MQTT server", ar.cause());
                    }
                });
            }
            context.assertTrue(latchListen.await(10L, TimeUnit.SECONDS));
            for (i = 0; i < numConnections; ++i) {
                String clientId = String.format("client-%d", i);
                try {
                    MemoryPersistence persistence = new MemoryPersistence();
                    MqttClient client = new MqttClient(String.format("tcp://%s:%d", MQTT_SERVER_HOST, 1883), clientId, (MqttClientPersistence)persistence);
                    client.connect();
                    log.info((Object)("Client connected " + clientId));
                    continue;
                }
                catch (MqttException e) {
                    log.error((Object)("Error on connecting client " + clientId), (Throwable)e);
                    context.assertTrue(false);
                }
            }
            context.assertTrue(latchConns.await(10L, TimeUnit.SECONDS));
            context.assertEquals((Object)numServers, (Object)connectedServers.size());
            for (MqttServer server : servers) {
                context.assertTrue(connectedServers.contains(server));
            }
            context.assertEquals((Object)numServers, (Object)connectCount.size());
            Iterator<Object> i2 = connectCount.values().iterator();
            while (i2.hasNext()) {
                int cnt = (Integer)i2.next();
                context.assertEquals((Object)(numConnections / numServers), (Object)cnt);
            }
            CountDownLatch closeLatch = new CountDownLatch(numServers);
            for (MqttServer server : servers) {
                server.close(ar -> {
                    context.assertTrue(ar.succeeded());
                    closeLatch.countDown();
                });
            }
            context.assertTrue(closeLatch.await(10L, TimeUnit.SECONDS));
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private static /* synthetic */ void lambda$sharedServersRoundRobin$1(Set connectedServers, MqttServer server, Map connectCount, CountDownLatch latchConns, MqttEndpoint endpoint) {
        connectedServers.add(server);
        Integer cnt = (Integer)connectCount.get(server);
        int icnt = cnt == null ? 0 : cnt;
        connectCount.put(server, ++icnt);
        endpoint.accept(false);
        latchConns.countDown();
    }
}

