package org.springframework.xd.dirt.stream;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.PostConstruct;
import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.xd.dirt.server.admin.deployment.DeploymentHandler;
import org.springframework.xd.dirt.zookeeper.Paths;
import org.springframework.xd.dirt.zookeeper.ZooKeeperConnection;
import org.springframework.xd.dirt.zookeeper.ZooKeeperUtils;
import org.springframework.xd.module.ModuleDefinition;

/* loaded from: input_file:org/springframework/xd/dirt/stream/StreamDeployer.class */
public class StreamDeployer extends AbstractInstancePersistingDeployer<StreamDefinition, Stream> {
    private static final Logger logger = LoggerFactory.getLogger(StreamDeployer.class);
    private static final TypeReference<List<ModuleDefinition>> MODULE_DEFINITIONS_LIST = new TypeReference<List<ModuleDefinition>>() { // from class: org.springframework.xd.dirt.stream.StreamDeployer.1
    };
    private final ObjectWriter objectWriter;
    private static final String DEFINITION_KEY = "definition";
    private static final String MODULE_DEFINITIONS_KEY = "moduleDefinitions";
    private final ZooKeeperConnection zkConnection;
    private final XDParser parser;

    public StreamDeployer(ZooKeeperConnection zooKeeperConnection, StreamDefinitionRepository streamDefinitionRepository, StreamRepository streamRepository, XDParser xDParser, DeploymentHandler deploymentHandler) {
        super(zooKeeperConnection, streamDefinitionRepository, streamRepository, xDParser, deploymentHandler, ParsingContext.stream);
        this.objectWriter = new ObjectMapper().writerWithType(MODULE_DEFINITIONS_LIST);
        this.zkConnection = zooKeeperConnection;
        this.parser = xDParser;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.xd.dirt.stream.AbstractInstancePersistingDeployer
    public Stream makeInstance(StreamDefinition streamDefinition) {
        return new Stream(streamDefinition);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.xd.dirt.stream.AbstractDeployer
    public StreamDefinition createDefinition(String str, String str2) {
        return new StreamDefinition(str, str2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.xd.dirt.stream.AbstractDeployer
    public String getDeploymentPath(StreamDefinition streamDefinition) {
        return Paths.build(Paths.STREAM_DEPLOYMENTS, streamDefinition.getName());
    }

    @PostConstruct
    private void updateModuleDefinitions() {
        if (this.zkConnection.getClient() != null) {
            try {
                CuratorFramework client = this.zkConnection.getClient();
                if (client.checkExists().forPath(Paths.STREAMS) != null) {
                    Iterator it = findAll().iterator();
                    while (it.hasNext()) {
                        setModuleDefinitions(client, (StreamDefinition) it.next());
                    }
                }
            } catch (Exception e) {
                logger.error("Exception migrating stream definitions. This migration is done when the existing stream definitions that don't have module definitions set.", e);
            }
        }
    }

    private void setModuleDefinitions(CuratorFramework curatorFramework, StreamDefinition streamDefinition) {
        String name = streamDefinition.getName();
        String build = Paths.build(Paths.STREAMS, name);
        try {
            byte[] bArr = (byte[]) curatorFramework.getData().forPath(build);
            if (bArr != null) {
                Map<String, String> bytesToMap = ZooKeeperUtils.bytesToMap(bArr);
                if (bytesToMap.get(MODULE_DEFINITIONS_KEY) == null) {
                    List<ModuleDefinition> createModuleDefinitions = createModuleDefinitions(this.parser.parse(name, streamDefinition.getDefinition(), this.definitionKind));
                    if (!createModuleDefinitions.isEmpty()) {
                        bytesToMap.put(DEFINITION_KEY, streamDefinition.getDefinition());
                        try {
                            bytesToMap.put(MODULE_DEFINITIONS_KEY, this.objectWriter.writeValueAsString(createModuleDefinitions));
                            (curatorFramework.checkExists().forPath(build) == null ? curatorFramework.create() : curatorFramework.setData()).forPath(build, ZooKeeperUtils.mapToBytes(bytesToMap));
                        } catch (JsonProcessingException e) {
                            logger.error("Exception writing module definitions " + createModuleDefinitions + " for the stream " + name, e);
                        }
                    }
                }
            }
        } catch (Exception e2) {
            logger.error("Exception when updating module definitions for the stream " + name, e2);
        }
    }
}
