/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.proxy;

import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.awaitility.Awaitility;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

public class TestProxyWithWebSocket
extends PulsarTestSuite {
    private static final Logger log = LoggerFactory.getLogger(TestProxyWithWebSocket.class);

    @Override
    protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(String clusterName, PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
        HashMap<String, String> envs = new HashMap<String, String>();
        envs.put("webSocketServiceEnabled", "true");
        specBuilder.proxyEnvs(envs);
        return super.beforeSetupCluster(clusterName, specBuilder);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWebSocket() throws Exception {
        String tenant = "proxy-test-" + TestProxyWithWebSocket.randomName(10);
        String namespace = tenant + "/ns1";
        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(this.pulsarCluster.getHttpServiceUrl()).build();
        try {
            admin.tenants().createTenant(tenant, (TenantInfo)new TenantInfoImpl(Collections.emptySet(), Collections.singleton(this.pulsarCluster.getClusterName())));
            admin.namespaces().createNamespace(namespace, Collections.singleton(this.pulsarCluster.getClusterName()));
            HttpClient httpClient = new HttpClient();
            WebSocketClient webSocketClient = new WebSocketClient(httpClient);
            webSocketClient.start();
            MyWebSocket myWebSocket = new MyWebSocket();
            String webSocketUri = this.pulsarCluster.getProxy().getHttpServiceUrl().replaceFirst("http", "ws") + "/ws/v2/producer/persistent/" + namespace + "/my-topic";
            Future sessionFuture = webSocketClient.connect((Object)myWebSocket, URI.create(webSocketUri));
            ((Session)sessionFuture.get()).getRemote().sendString("{\n  \"payload\": \"SGVsbG8gV29ybGQ=\",\n  \"properties\": {\"key1\": \"value1\", \"key2\": \"value2\"},\n  \"context\": \"1\"\n}");
            Awaitility.await().untilAsserted(() -> {
                String response = myWebSocket.getResponse();
                Assert.assertNotNull((Object)response);
                Assert.assertTrue((boolean)response.contains("ok"));
            });
        }
        finally {
            if (Collections.singletonList(admin).get(0) != null) {
                admin.close();
            }
        }
    }

    @WebSocket
    public static class MyWebSocket
    implements WebSocketListener {
        Queue<String> incomingMessages = new ArrayBlockingQueue<String>(10);

        public void onWebSocketBinary(byte[] bytes, int i, int i1) {
        }

        public void onWebSocketText(String s) {
            this.incomingMessages.add(s);
        }

        public void onWebSocketClose(int i, String s) {
        }

        public void onWebSocketConnect(Session session) {
        }

        public void onWebSocketError(Throwable throwable) {
        }

        public String getResponse() {
            return this.incomingMessages.poll();
        }
    }
}

