/*
 * Decompiled with CFR 0.152.
 */
package io.kgraph.rest.server.graph;

import io.kgraph.GraphAlgorithmState;
import io.kgraph.GraphSerialized;
import io.kgraph.library.BreadthFirstSearch;
import io.kgraph.library.ConnectedComponents;
import io.kgraph.library.LabelPropagation;
import io.kgraph.library.LocalClusteringCoefficient;
import io.kgraph.library.MultipleSourceShortestPaths;
import io.kgraph.library.PageRank;
import io.kgraph.library.SingleSourceShortestPaths;
import io.kgraph.pregel.PregelGraphAlgorithm;
import io.kgraph.rest.server.KafkaGraphsProperties;
import io.kgraph.rest.server.graph.GraphAlgorithmCreateRequest;
import io.kgraph.rest.server.graph.GraphAlgorithmHandler;
import io.kgraph.rest.server.graph.GraphAlgorithmId;
import io.kgraph.rest.server.graph.GraphAlgorithmRunRequest;
import io.kgraph.rest.server.graph.GraphAlgorithmStatus;
import io.kgraph.rest.server.graph.GroupEdgesBySourceRequest;
import io.kgraph.tools.importer.GraphImporter;
import io.kgraph.utils.ClientUtils;
import io.kgraph.utils.GraphUtils;
import io.kgraph.utils.KryoSerde;
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.nodes.GroupMember;
import org.apache.curator.utils.ZKPaths;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.web.reactive.context.ReactiveWebServerInitializedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.http.codec.multipart.FormFieldPart;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyExtractors;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.server.ResponseStatusException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/*
 * Exception performing whole class analysis ignored.
 */
@Component
@EnableConfigurationProperties(value={KafkaGraphsProperties.class})
public class GraphAlgorithmHandler<EV>
implements ApplicationListener<ReactiveWebServerInitializedEvent> {
    private static final Logger log = LoggerFactory.getLogger(GraphAlgorithmHandler.class);
    private static final String X_KGRAPH_APPID = "X-KGraph-AppId";
    private static final Flux<Long> INTERVAL = Flux.interval((Duration)Duration.ofMillis(100L), (Duration)Duration.ofSeconds(2L));
    private final KafkaGraphsProperties props;
    private final CuratorFramework curator;
    private final String host;
    private int port;
    private GroupMember group;
    private final ConcurrentMap<String, PregelGraphAlgorithm<Long, ?, ?, ?>> algorithms = new ConcurrentHashMap();

    public GraphAlgorithmHandler(KafkaGraphsProperties props, CuratorFramework curator) {
        this.props = props;
        this.curator = curator;
        this.host = this.getHostAddress();
    }

    public void onApplicationEvent(ReactiveWebServerInitializedEvent event) {
        this.port = event.getWebServer().getPort();
        this.group = new GroupMember(this.curator, ZKPaths.makePath((String)"/kafka-graphs", (String)"group"), this.getHostAndPort());
        this.group.start();
    }

    public Mono<ServerResponse> importGraph(ServerRequest request) {
        return ((Mono)request.body(BodyExtractors.toMultipartData())).flatMap(parts -> {
            try {
                Map map = parts.toSingleValueMap();
                FilePart verticesFilePart = (FilePart)map.get("verticesFile");
                File verticesFile = new File(ClientUtils.tempDirectory(), verticesFilePart.filename());
                verticesFilePart.transferTo(verticesFile);
                FilePart edgesFilePart = (FilePart)map.get("edgesFile");
                File edgesFile = new File(ClientUtils.tempDirectory(), edgesFilePart.filename());
                edgesFilePart.transferTo(edgesFile);
                FormFieldPart verticesTopicPart = (FormFieldPart)map.get("verticesTopic");
                String verticesTopic = verticesTopicPart.value();
                FormFieldPart edgesTopicPart = (FormFieldPart)map.get("edgesTopic");
                String edgesTopic = edgesTopicPart.value();
                FormFieldPart useDoublePart = (FormFieldPart)map.get("useDouble");
                boolean useDouble = Boolean.parseBoolean(useDoublePart.value());
                FormFieldPart numPartitionsPart = (FormFieldPart)map.get("numPartitions");
                int numPartitions = Integer.parseInt(numPartitionsPart.value());
                FormFieldPart replicatorFactorPart = (FormFieldPart)map.get("replicationFactor");
                short replicationFactor = Short.parseShort(replicatorFactorPart.value());
                GraphImporter importer = new GraphImporter(this.props.getBootstrapServers(), verticesFile, edgesFile, verticesTopic, edgesTopic, useDouble, numPartitions, replicationFactor);
                importer.call();
            }
            catch (NumberFormatException e) {
                throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Invalid number", (Throwable)e);
            }
            catch (Exception e) {
                throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR);
            }
            return ServerResponse.ok().build();
        });
    }

    public Mono<ServerResponse> prepareGraph(ServerRequest request) {
        String appId = this.generateRandomString(8);
        return request.bodyToMono(GroupEdgesBySourceRequest.class).doOnNext(input -> {
            try {
                StreamsBuilder builder = new StreamsBuilder();
                Properties streamsConfig = GraphAlgorithmHandler.streamsConfig((String)appId, (String)this.props.getBootstrapServers(), (Serde)(input.isValuesOfTypeDouble() ? Serdes.Double() : Serdes.Long()));
                CompletableFuture future = input.isValuesOfTypeDouble() ? GraphUtils.groupEdgesBySourceAndRepartition((StreamsBuilder)builder, (Properties)streamsConfig, (String)input.getInitialVerticesTopic(), (String)input.getInitialEdgesTopic(), (GraphSerialized)GraphSerialized.with((Serde)Serdes.Long(), (Serde)Serdes.Long(), (Serde)Serdes.Double()), (String)input.getVerticesTopic(), (String)input.getEdgesGroupedBySourceTopic(), (int)input.getNumPartitions(), (short)input.getReplicationFactor()) : GraphUtils.groupEdgesBySourceAndRepartition((StreamsBuilder)builder, (Properties)streamsConfig, (String)input.getInitialVerticesTopic(), (String)input.getInitialEdgesTopic(), (GraphSerialized)GraphSerialized.with((Serde)Serdes.Long(), (Serde)Serdes.Long(), (Serde)Serdes.Long()), (String)input.getVerticesTopic(), (String)input.getEdgesGroupedBySourceTopic(), (int)input.getNumPartitions(), (short)input.getReplicationFactor());
                if (!input.isAsync()) {
                    future.get();
                }
            }
            catch (Exception e) {
                throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR);
            }
        }).then(ServerResponse.ok().build());
    }

    public Mono<ServerResponse> configure(ServerRequest request) {
        List appIdHeaders = request.headers().header("X-KGraph-AppId");
        String appId = appIdHeaders.isEmpty() ? this.generateRandomString(8) : (String)appIdHeaders.iterator().next();
        return request.bodyToMono(GraphAlgorithmCreateRequest.class).doOnNext(input -> {
            PregelGraphAlgorithm algorithm = this.getAlgorithm(appId, input);
            StreamsBuilder builder = new StreamsBuilder();
            Properties streamsConfig = GraphAlgorithmHandler.streamsConfig((String)appId, (String)this.props.getBootstrapServers(), (Serde)new KryoSerde());
            algorithm.configure(builder, streamsConfig);
            this.algorithms.put(appId, algorithm);
        }).flatMapMany(input -> this.proxyConfigure(appIdHeaders.isEmpty() ? this.group.getCurrentMembers().keySet() : Collections.emptySet(), appId, input)).then(ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body((Publisher)Mono.just((Object)new GraphAlgorithmId(appId)), GraphAlgorithmId.class));
    }

    private PregelGraphAlgorithm<Long, ?, ?, ?> getAlgorithm(String appId, GraphAlgorithmCreateRequest input) {
        try {
            LocalClusteringCoefficient algorithm;
            switch (1.$SwitchMap$io$kgraph$rest$server$graph$GraphAlgorithmType[input.getAlgorithm().ordinal()]) {
                case 1: {
                    long srcVertexId = Long.parseLong(this.getParam(input.getParams(), "srcVertexId", true));
                    if (input.isValuesOfTypeDouble()) {
                        algorithm = new BreadthFirstSearch(this.getHostAndPort(), appId, this.props.getBootstrapServers(), this.curator, input.getVerticesTopic(), input.getEdgesGroupedBySourceTopic(), GraphSerialized.with((Serde)Serdes.Long(), (Serde)Serdes.Long(), (Serde)Serdes.Double()), input.getNumPartitions(), input.getReplicationFactor(), srcVertexId);
                        break;
                    }
                    algorithm = new BreadthFirstSearch(this.getHostAndPort(), appId, this.props.getBootstrapServers(), this.curator, input.getVerticesTopic(), input.getEdgesGroupedBySourceTopic(), GraphSerialized.with((Serde)Serdes.Long(), (Serde)Serdes.Long(), (Serde)Serdes.Long()), input.getNumPartitions(), input.getReplicationFactor(), srcVertexId);
                    break;
                }
                case 2: {
                    if (input.isValuesOfTypeDouble()) {
                        algorithm = new ConnectedComponents(this.getHostAndPort(), appId, this.props.getBootstrapServers(), this.curator, input.getVerticesTopic(), input.getEdgesGroupedBySourceTopic(), GraphSerialized.with((Serde)Serdes.Long(), (Serde)Serdes.Long(), (Serde)Serdes.Double()), input.getNumPartitions(), input.getReplicationFactor());
                        break;
                    }
                    algorithm = new ConnectedComponents(this.getHostAndPort(), appId, this.props.getBootstrapServers(), this.curator, input.getVerticesTopic(), input.getEdgesGroupedBySourceTopic(), GraphSerialized.with((Serde)Serdes.Long(), (Serde)Serdes.Long(), (Serde)Serdes.Long()), input.getNumPartitions(), input.getReplicationFactor());
                    break;
                }
                case 3: {
                    algorithm = new LocalClusteringCoefficient(this.getHostAndPort(), appId, this.props.getBootstrapServers(), this.curator, input.getVerticesTopic(), input.getEdgesGroupedBySourceTopic(), GraphSerialized.with((Serde)Serdes.Long(), (Serde)Serdes.Double(), (Serde)Serdes.Double()), input.getNumPartitions(), input.getReplicationFactor());
                    break;
                }
                case 4: {
                    long srcVertexId = Long.parseLong(this.getParam(input.getParams(), "srcVertexId", true));
                    if (input.isValuesOfTypeDouble()) {
                        algorithm = new LabelPropagation(this.getHostAndPort(), appId, this.props.getBootstrapServers(), this.curator, input.getVerticesTopic(), input.getEdgesGroupedBySourceTopic(), GraphSerialized.with((Serde)Serdes.Long(), (Serde)Serdes.Long(), (Serde)Serdes.Double()), input.getNumPartitions(), input.getReplicationFactor(), srcVertexId);
                        break;
                    }
                    algorithm = new LabelPropagation(this.getHostAndPort(), appId, this.props.getBootstrapServers(), this.curator, input.getVerticesTopic(), input.getEdgesGroupedBySourceTopic(), GraphSerialized.with((Serde)Serdes.Long(), (Serde)Serdes.Long(), (Serde)Serdes.Long()), input.getNumPartitions(), input.getReplicationFactor(), srcVertexId);
                    break;
                }
                case 5: {
                    String[] values = this.getParam(input.getParams(), "landmarkVertexIds", true).split(",");
                    Set landmarkVertexIds = Arrays.stream(values).map(Long::parseLong).collect(Collectors.toSet());
                    algorithm = new MultipleSourceShortestPaths(this.getHostAndPort(), appId, this.props.getBootstrapServers(), this.curator, input.getVerticesTopic(), input.getEdgesGroupedBySourceTopic(), GraphSerialized.with((Serde)Serdes.Long(), (Serde)new KryoSerde(), (Serde)Serdes.Double()), input.getNumPartitions(), input.getReplicationFactor(), landmarkVertexIds);
                    break;
                }
                case 6: {
                    double tolerance = Double.parseDouble(this.getParam(input.getParams(), "tolerance", true));
                    double resetProbability = Double.parseDouble(this.getParam(input.getParams(), "resetProbability", true));
                    String srcVertexIdStr = this.getParam(input.getParams(), "srcVertexId", false);
                    Optional optSrcVertexId = srcVertexIdStr != null ? Optional.of(Long.parseLong(srcVertexIdStr)) : Optional.empty();
                    algorithm = new PageRank(this.getHostAndPort(), appId, this.props.getBootstrapServers(), this.curator, input.getVerticesTopic(), input.getEdgesGroupedBySourceTopic(), GraphSerialized.with((Serde)Serdes.Long(), (Serde)new KryoSerde(), (Serde)Serdes.Double()), input.getNumPartitions(), input.getReplicationFactor(), tolerance, resetProbability, optSrcVertexId);
                    break;
                }
                case 7: {
                    long srcVertexId = Long.parseLong(this.getParam(input.getParams(), "srcVertexId", true));
                    algorithm = new SingleSourceShortestPaths(this.getHostAndPort(), appId, this.props.getBootstrapServers(), this.curator, input.getVerticesTopic(), input.getEdgesGroupedBySourceTopic(), GraphSerialized.with((Serde)Serdes.Long(), (Serde)Serdes.Double(), (Serde)Serdes.Double()), input.getNumPartitions(), input.getReplicationFactor(), srcVertexId);
                    break;
                }
                default: {
                    throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Invalid algorithm: " + input.getAlgorithm());
                }
            }
            return algorithm;
        }
        catch (NumberFormatException e) {
            throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Invalid number", (Throwable)e);
        }
    }

    private String getParam(Map<String, String> params, String key, boolean isRequired) {
        String value = params.get(key);
        if (isRequired && value == null) {
            throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Missing param: " + key);
        }
        return value;
    }

    private Flux<GraphAlgorithmId> proxyConfigure(Set<String> groupMembers, String appId, GraphAlgorithmCreateRequest input) {
        Flux flux = Flux.fromIterable(groupMembers).filter(s -> !s.equals(this.getHostAndPort())).flatMap(s -> {
            log.debug("proxy configure to {}", s);
            WebClient client = WebClient.create((String)("http://" + s));
            return ((WebClient.RequestBodySpec)((WebClient.RequestBodySpec)((WebClient.RequestBodySpec)client.post().uri("/pregel", new Object[0])).accept(new MediaType[]{MediaType.APPLICATION_JSON})).header("X-KGraph-AppId", new String[]{appId})).body((Publisher)Mono.just((Object)input), GraphAlgorithmCreateRequest.class).retrieve().bodyToMono(GraphAlgorithmId.class);
        });
        return flux;
    }

    public Mono<ServerResponse> state(ServerRequest request) {
        String appId = request.pathVariable("id");
        PregelGraphAlgorithm algorithm = (PregelGraphAlgorithm)this.algorithms.get(appId);
        return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body((Publisher)Mono.just((Object)new GraphAlgorithmStatus(algorithm.state())), GraphAlgorithmStatus.class);
    }

    public Mono<ServerResponse> run(ServerRequest request) {
        List appIdHeaders = request.headers().header("X-KGraph-AppId");
        String appId = request.pathVariable("id");
        return request.bodyToMono(GraphAlgorithmRunRequest.class).flatMapMany(input -> {
            log.debug("num iterations: {}", (Object)input.getNumIterations());
            PregelGraphAlgorithm algorithm = (PregelGraphAlgorithm)this.algorithms.get(appId);
            GraphAlgorithmState state = algorithm.run(input.getNumIterations());
            GraphAlgorithmStatus status = new GraphAlgorithmStatus(state);
            Flux states = this.proxyRun(appIdHeaders.isEmpty() ? this.group.getCurrentMembers().keySet() : Collections.emptySet(), appId, input);
            return Mono.just((Object)status).mergeWith((Publisher)states);
        }).reduce((state1, state2) -> state1).flatMap(state -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body((Publisher)Mono.just((Object)state), GraphAlgorithmStatus.class));
    }

    private Flux<GraphAlgorithmStatus> proxyRun(Set<String> groupMembers, String appId, GraphAlgorithmRunRequest input) {
        Flux flux = Flux.fromIterable(groupMembers).filter(s -> !s.equals(this.getHostAndPort())).flatMap(s -> {
            log.debug("proxy run to {}", s);
            WebClient client = WebClient.create((String)("http://" + s));
            return ((WebClient.RequestBodySpec)((WebClient.RequestBodySpec)((WebClient.RequestBodySpec)client.post().uri("/pregel/" + appId, new Object[0])).accept(new MediaType[]{MediaType.APPLICATION_JSON})).header("X-KGraph-AppId", new String[]{appId})).body((Publisher)Mono.just((Object)input), GraphAlgorithmRunRequest.class).retrieve().bodyToMono(GraphAlgorithmStatus.class);
        });
        return flux;
    }

    public Mono<ServerResponse> result(ServerRequest request) {
        List appIdHeaders = request.headers().header("X-KGraph-AppId");
        String appId = request.pathVariable("id");
        PregelGraphAlgorithm algorithm = (PregelGraphAlgorithm)this.algorithms.get(appId);
        Flux body = Flux.fromIterable((Iterable)algorithm.result()).map(kv -> {
            log.trace("result: ({}, {})", kv.key, kv.value);
            return kv.key + " " + kv.value;
        });
        body = this.proxyResult(appIdHeaders.isEmpty() ? this.group.getCurrentMembers().keySet() : Collections.emptySet(), appId, body);
        return ServerResponse.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(BodyInserters.fromPublisher((Publisher)body, String.class));
    }

    private Flux<String> proxyResult(Set<String> groupMembers, String appId, Flux<String> body) {
        Flux flux = groupMembers.stream().filter(s -> !s.equals(this.getHostAndPort())).map(s -> {
            log.debug("proxy result to {}", s);
            WebClient client = WebClient.create((String)("http://" + s));
            return client.get().uri("/pregel/" + appId + "/result", new Object[0]).accept(new MediaType[]{MediaType.TEXT_EVENT_STREAM}).header("X-KGraph-AppId", new String[]{appId}).retrieve().bodyToFlux(String.class);
        }).reduce(body, Flux::mergeWith);
        return flux;
    }

    public Mono<ServerResponse> delete(ServerRequest request) {
        List appIdHeaders = request.headers().header("X-KGraph-AppId");
        String appId = request.pathVariable("id");
        PregelGraphAlgorithm algorithm = (PregelGraphAlgorithm)this.algorithms.remove(appId);
        algorithm.close();
        return this.proxyDelete(appIdHeaders.isEmpty() ? this.group.getCurrentMembers().keySet() : Collections.emptySet(), appId).then(ServerResponse.noContent().build());
    }

    private Flux<Void> proxyDelete(Set<String> groupMembers, String appId) {
        Flux flux = Flux.fromIterable(groupMembers).filter(s -> !s.equals(this.getHostAndPort())).flatMap(s -> {
            log.debug("proxy delete to {}", s);
            WebClient client = WebClient.create((String)("http://" + s));
            return client.delete().uri("/pregel/" + appId, new Object[0]).accept(new MediaType[]{MediaType.APPLICATION_JSON}).header("X-KGraph-AppId", new String[]{appId}).retrieve().bodyToMono(Void.class);
        });
        return flux;
    }

    public static Properties streamsConfig(String appId, String bootstrapServers, Serde<?> valueSerde) {
        Properties streamsConfig = new Properties();
        streamsConfig.put("application.id", appId);
        streamsConfig.put("client.id", appId + "-client");
        streamsConfig.put("bootstrap.servers", bootstrapServers);
        streamsConfig.put("default.key.serde", Serdes.Long().getClass().getName());
        streamsConfig.put("default.value.serde", valueSerde.getClass().getName());
        streamsConfig.put("cache.max.bytes.buffering", (Object)0);
        streamsConfig.put("num.stream.threads", (Object)2);
        streamsConfig.put("state.dir", ClientUtils.tempDirectory().getAbsolutePath());
        return streamsConfig;
    }

    public String getHostAndPort() {
        return this.host + ":" + this.port;
    }

    public String getHostAddress() {
        try {
            return InetAddress.getLocalHost().getHostAddress();
        }
        catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
    }

    public int getPort() {
        return this.port;
    }

    public String generateRandomString(int len) {
        int leftLimit = 97;
        int rightLimit = 122;
        Random random = new Random();
        StringBuilder buffer = new StringBuilder(len);
        for (int i = 0; i < len; ++i) {
            int randomLimitedInt = leftLimit + (int)(random.nextFloat() * (float)(rightLimit - leftLimit + 1));
            buffer.append((char)randomLimitedInt);
        }
        return buffer.toString();
    }
}

