/*
 * Decompiled with CFR 0.152.
 */
package io.kgraph.rest.server.graph;

import io.kgraph.GraphAlgorithmState;
import io.kgraph.library.GraphAlgorithmType;
import io.kgraph.library.cf.CfLongId;
import io.kgraph.library.cf.EdgeCfLongIdFloatValueParser;
import io.kgraph.rest.server.KafkaGraphsApplication;
import io.kgraph.rest.server.graph.GraphAlgorithmCreateRequest;
import io.kgraph.rest.server.graph.GraphAlgorithmId;
import io.kgraph.rest.server.graph.GraphAlgorithmRunRequest;
import io.kgraph.rest.server.graph.GraphAlgorithmStatus;
import io.kgraph.rest.server.graph.GroupEdgesBySourceRequest;
import io.kgraph.rest.server.graph.KeyValue;
import io.kgraph.rest.server.utils.EdgeLongIdLongValueParser;
import io.kgraph.rest.server.utils.VertexLongIdLongValueParser;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.kafka.common.serialization.FloatSerializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.io.ClassPathResource;
import org.springframework.http.HttpEntity;
import org.springframework.http.MediaType;
import org.springframework.http.client.MultipartBodyBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.reactive.server.EntityExchangeResult;
import org.springframework.test.web.reactive.server.FluxExchangeResult;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.springframework.util.MultiValueMap;

@RunWith(value=SpringRunner.class)
@AutoConfigureWebTestClient(timeout="36000")
@SpringBootTest(webEnvironment=SpringBootTest.WebEnvironment.RANDOM_PORT, classes={KafkaGraphsApplication.class})
public class GraphIntegrationTest {
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1){

        public void start() throws IOException, InterruptedException {
            super.start();
            System.setProperty("spring.embedded.kafka.brokers", this.bootstrapServers());
            System.setProperty("spring.embedded.zookeeper.connect", this.zKConnectString());
        }
    };
    @Autowired
    private WebTestClient webTestClient;

    @Test
    public void testConnectedComponents() {
        int i;
        ((WebTestClient.RequestBodySpec)this.webTestClient.post().uri("/import", new Object[0])).syncBody(this.generateCCBody()).exchange().expectStatus().isOk().expectBody(Void.class);
        GroupEdgesBySourceRequest prepareRequest = new GroupEdgesBySourceRequest();
        prepareRequest.setAlgorithm(GraphAlgorithmType.wcc);
        prepareRequest.setInitialVerticesTopic("initial-cc-vertices");
        prepareRequest.setInitialEdgesTopic("initial-cc-edges");
        prepareRequest.setVerticesTopic("new-cc-vertices");
        prepareRequest.setEdgesGroupedBySourceTopic("new-cc-edges");
        prepareRequest.setAsync(false);
        ((WebTestClient.RequestBodySpec)this.webTestClient.post().uri("/prepare", new Object[0])).contentType(MediaType.APPLICATION_JSON).syncBody((Object)prepareRequest).exchange().expectStatus().isOk().expectBody(Void.class);
        GraphAlgorithmCreateRequest createRequest = new GraphAlgorithmCreateRequest();
        createRequest.setAlgorithm(GraphAlgorithmType.wcc);
        createRequest.setVerticesTopic("new-cc-vertices");
        createRequest.setEdgesGroupedBySourceTopic("new-cc-edges");
        EntityExchangeResult createResponse = ((WebTestClient.RequestBodySpec)this.webTestClient.post().uri("/pregel", new Object[0])).contentType(MediaType.APPLICATION_JSON).syncBody((Object)createRequest).exchange().expectStatus().isOk().expectBody(GraphAlgorithmId.class).returnResult();
        String id = ((GraphAlgorithmId)createResponse.getResponseBody()).getId();
        GraphAlgorithmRunRequest runRequest = new GraphAlgorithmRunRequest();
        EntityExchangeResult runResponse = ((WebTestClient.RequestBodySpec)this.webTestClient.post().uri("/pregel/{id}", new Object[]{id})).contentType(MediaType.APPLICATION_JSON).syncBody((Object)runRequest).exchange().expectStatus().isOk().expectBody(GraphAlgorithmStatus.class).returnResult();
        GraphAlgorithmState.State state = GraphAlgorithmState.State.RUNNING;
        while (state == GraphAlgorithmState.State.RUNNING) {
            EntityExchangeResult statusResponse = this.webTestClient.get().uri("/pregel/{id}", new Object[]{id}).exchange().expectStatus().isOk().expectBody(GraphAlgorithmStatus.class).returnResult();
            state = ((GraphAlgorithmStatus)statusResponse.getResponseBody()).getState();
        }
        FluxExchangeResult result = this.webTestClient.get().uri("/pregel/{id}/result", new Object[]{id}).accept(new MediaType[]{MediaType.TEXT_EVENT_STREAM}).exchange().expectStatus().isOk().returnResult(KeyValue.class);
        Map map = (Map)result.getResponseBody().collectMap(KeyValue::getKey, KeyValue::getValue).block();
        for (i = 0; i < 10; ++i) {
            Assert.assertEquals((Object)"0", map.get(String.valueOf(i)));
        }
        for (i = 10; i < 21; ++i) {
            Assert.assertEquals((Object)"10", map.get(String.valueOf(i)));
        }
    }

    private MultiValueMap<String, HttpEntity<?>> generateCCBody() {
        MultipartBodyBuilder builder = new MultipartBodyBuilder();
        builder.part("verticesTopic", (Object)"initial-cc-vertices");
        builder.part("edgesTopic", (Object)"initial-cc-edges");
        builder.part("vertexFile", (Object)new ClassPathResource("vertices_simple.txt"));
        builder.part("edgeFile", (Object)new ClassPathResource("edges_simple.txt"));
        builder.part("vertexParser", (Object)VertexLongIdLongValueParser.class.getName());
        builder.part("edgeParser", (Object)EdgeLongIdLongValueParser.class.getName());
        builder.part("keySerializer", (Object)LongSerializer.class.getName());
        builder.part("vertexValueSerializer", (Object)LongSerializer.class.getName());
        builder.part("edgeValueSerializer", (Object)LongSerializer.class.getName());
        builder.part("numPartitions", (Object)"50");
        builder.part("replicationFactor", (Object)"1");
        return builder.build();
    }

    @Test
    public void testSvdpp() {
        ((WebTestClient.RequestBodySpec)this.webTestClient.post().uri("/import", new Object[0])).syncBody(this.generateSvdppBody()).exchange().expectStatus().isOk().expectBody(Void.class);
        GroupEdgesBySourceRequest prepareRequest = new GroupEdgesBySourceRequest();
        prepareRequest.setAlgorithm(GraphAlgorithmType.svdpp);
        prepareRequest.setInitialEdgesTopic("initial-svdpp-edges");
        prepareRequest.setVerticesTopic("new-svdpp-vertices");
        prepareRequest.setEdgesGroupedBySourceTopic("new-svdpp-edges");
        prepareRequest.setAsync(false);
        ((WebTestClient.RequestBodySpec)this.webTestClient.post().uri("/prepare", new Object[0])).contentType(MediaType.APPLICATION_JSON).syncBody((Object)prepareRequest).exchange().expectStatus().isOk().expectBody(Void.class);
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("random.seed", "0");
        configs.put("iterations", "3");
        GraphAlgorithmCreateRequest createRequest = new GraphAlgorithmCreateRequest();
        createRequest.setConfigs(configs);
        createRequest.setAlgorithm(GraphAlgorithmType.svdpp);
        createRequest.setVerticesTopic("new-svdpp-vertices");
        createRequest.setEdgesGroupedBySourceTopic("new-svdpp-edges");
        EntityExchangeResult createResponse = ((WebTestClient.RequestBodySpec)this.webTestClient.post().uri("/pregel", new Object[0])).contentType(MediaType.APPLICATION_JSON).syncBody((Object)createRequest).exchange().expectStatus().isOk().expectBody(GraphAlgorithmId.class).returnResult();
        String id = ((GraphAlgorithmId)createResponse.getResponseBody()).getId();
        GraphAlgorithmRunRequest runRequest = new GraphAlgorithmRunRequest();
        EntityExchangeResult runResponse = ((WebTestClient.RequestBodySpec)this.webTestClient.post().uri("/pregel/{id}", new Object[]{id})).contentType(MediaType.APPLICATION_JSON).syncBody((Object)runRequest).exchange().expectStatus().isOk().expectBody(GraphAlgorithmStatus.class).returnResult();
        GraphAlgorithmState.State state = GraphAlgorithmState.State.RUNNING;
        while (state == GraphAlgorithmState.State.RUNNING) {
            EntityExchangeResult statusResponse = this.webTestClient.get().uri("/pregel/{id}", new Object[]{id}).exchange().expectStatus().isOk().expectBody(GraphAlgorithmStatus.class).returnResult();
            state = ((GraphAlgorithmStatus)statusResponse.getResponseBody()).getState();
        }
        FluxExchangeResult result = this.webTestClient.get().uri("/pregel/{id}/result", new Object[]{id}).accept(new MediaType[]{MediaType.TEXT_EVENT_STREAM}).exchange().expectStatus().isOk().returnResult(KeyValue.class);
        NavigableMap map = (NavigableMap)result.getResponseBody().collectMap(kv -> new CfLongId(kv.getKey()), KeyValue::getValue, TreeMap::new).block();
        Assert.assertEquals((Object)"(1, 0)=(0.11611404, [0.006397, 0.008010])", (Object)map.firstEntry().toString());
        Assert.assertEquals((Object)"(20, 1)=(0.6374174, [0.007310, 0.002405])", (Object)map.lastEntry().toString());
    }

    private MultiValueMap<String, HttpEntity<?>> generateSvdppBody() {
        MultipartBodyBuilder builder = new MultipartBodyBuilder();
        builder.part("edgesTopic", (Object)"initial-svdpp-edges");
        builder.part("edgeFile", (Object)new ClassPathResource("ratings_simple.txt"));
        builder.part("edgeParser", (Object)EdgeCfLongIdFloatValueParser.class.getName());
        builder.part("edgeValueSerializer", (Object)FloatSerializer.class.getName());
        builder.part("numPartitions", (Object)"50");
        builder.part("replicationFactor", (Object)"1");
        return builder.build();
    }
}

