/*
 * Decompiled with CFR 0.152.
 */
package io.kgraph.rest.server.graph;

import io.kgraph.GraphAlgorithmState;
import io.kgraph.rest.server.KafkaGraphsApplication;
import io.kgraph.rest.server.graph.GraphAlgorithmCreateRequest;
import io.kgraph.rest.server.graph.GraphAlgorithmId;
import io.kgraph.rest.server.graph.GraphAlgorithmRunRequest;
import io.kgraph.rest.server.graph.GraphAlgorithmStatus;
import io.kgraph.rest.server.graph.GraphAlgorithmType;
import io.kgraph.rest.server.graph.GroupEdgesBySourceRequest;
import java.io.IOException;
import java.util.Map;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.io.ClassPathResource;
import org.springframework.http.HttpEntity;
import org.springframework.http.MediaType;
import org.springframework.http.client.MultipartBodyBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.reactive.server.EntityExchangeResult;
import org.springframework.test.web.reactive.server.FluxExchangeResult;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.springframework.util.MultiValueMap;

@RunWith(value=SpringRunner.class)
@AutoConfigureWebTestClient(timeout="36000")
@SpringBootTest(webEnvironment=SpringBootTest.WebEnvironment.RANDOM_PORT, classes={KafkaGraphsApplication.class})
public class GraphIntegrationTest {
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1){

        public void start() throws IOException, InterruptedException {
            super.start();
            System.setProperty("spring.embedded.kafka.brokers", this.bootstrapServers());
            System.setProperty("spring.embedded.zookeeper.connect", this.zKConnectString());
        }
    };
    @Autowired
    private WebTestClient webTestClient;

    @Test
    public void testConnectedComponents() {
        int i;
        ((WebTestClient.RequestBodySpec)this.webTestClient.post().uri("/import", new Object[0])).syncBody(this.generateBody()).exchange().expectStatus().isOk().expectBody(Void.class);
        GroupEdgesBySourceRequest prepareRequest = new GroupEdgesBySourceRequest();
        prepareRequest.setInitialVerticesTopic("initial-vertices");
        prepareRequest.setInitialEdgesTopic("initial-edges");
        prepareRequest.setVerticesTopic("new-vertices");
        prepareRequest.setEdgesGroupedBySourceTopic("new-edges");
        prepareRequest.setAsync(false);
        ((WebTestClient.RequestBodySpec)this.webTestClient.post().uri("/prepare", new Object[0])).contentType(MediaType.APPLICATION_JSON).syncBody((Object)prepareRequest).exchange().expectStatus().isOk().expectBody(Void.class);
        GraphAlgorithmCreateRequest createRequest = new GraphAlgorithmCreateRequest();
        createRequest.setAlgorithm(GraphAlgorithmType.wcc);
        createRequest.setVerticesTopic("new-vertices");
        createRequest.setEdgesGroupedBySourceTopic("new-edges");
        EntityExchangeResult createResponse = ((WebTestClient.RequestBodySpec)this.webTestClient.post().uri("/pregel", new Object[0])).contentType(MediaType.APPLICATION_JSON).syncBody((Object)createRequest).exchange().expectStatus().isOk().expectBody(GraphAlgorithmId.class).returnResult();
        String id = ((GraphAlgorithmId)createResponse.getResponseBody()).getId();
        GraphAlgorithmRunRequest runRequest = new GraphAlgorithmRunRequest();
        EntityExchangeResult runResponse = ((WebTestClient.RequestBodySpec)this.webTestClient.post().uri("/pregel/{id}", new Object[]{id})).contentType(MediaType.APPLICATION_JSON).syncBody((Object)runRequest).exchange().expectStatus().isOk().expectBody(GraphAlgorithmStatus.class).returnResult();
        GraphAlgorithmState.State state = GraphAlgorithmState.State.RUNNING;
        while (state != GraphAlgorithmState.State.COMPLETED) {
            EntityExchangeResult statusResponse = this.webTestClient.get().uri("/pregel/{id}", new Object[]{id}).exchange().expectStatus().isOk().expectBody(GraphAlgorithmStatus.class).returnResult();
            state = ((GraphAlgorithmStatus)statusResponse.getResponseBody()).getState();
        }
        FluxExchangeResult result = this.webTestClient.get().uri("/pregel/{id}/result", new Object[]{id}).accept(new MediaType[]{MediaType.TEXT_EVENT_STREAM}).exchange().expectStatus().isOk().returnResult(String.class);
        Map map = (Map)result.getResponseBody().collectMap(s -> s.split(" ")[0], s -> s.split(" ")[1]).block();
        for (i = 0; i < 10; ++i) {
            Assert.assertEquals((Object)"0", map.get(String.valueOf(i)));
        }
        for (i = 10; i < 21; ++i) {
            Assert.assertEquals((Object)"10", map.get(String.valueOf(i)));
        }
    }

    private MultiValueMap<String, HttpEntity<?>> generateBody() {
        MultipartBodyBuilder builder = new MultipartBodyBuilder();
        builder.part("verticesFile", (Object)new ClassPathResource("vertices_simple.txt"));
        builder.part("edgesFile", (Object)new ClassPathResource("edges_simple.txt"));
        builder.part("verticesTopic", (Object)"initial-vertices");
        builder.part("edgesTopic", (Object)"initial-edges");
        builder.part("useDouble", (Object)"false");
        builder.part("numPartitions", (Object)"50");
        builder.part("replicationFactor", (Object)"1");
        return builder.build();
    }
}

