package io.eventuate.local.unified.cdc.pipeline;

import io.eventuate.coordination.leadership.LeaderSelectorFactory;
import io.eventuate.local.common.BinlogEntryReader;
import io.eventuate.local.common.BinlogEntryReaderLeadership;
import io.eventuate.local.mysql.binlog.MySqlBinaryLogClient;
import io.eventuate.local.unified.cdc.pipeline.common.BinlogEntryReaderProvider;
import io.eventuate.local.unified.cdc.pipeline.common.CdcPipeline;
import io.eventuate.local.unified.cdc.pipeline.common.DefaultSourceTableNameResolver;
import io.eventuate.local.unified.cdc.pipeline.common.factory.CdcPipelineFactory;
import io.eventuate.local.unified.cdc.pipeline.common.factory.CdcPipelineReaderFactory;
import io.eventuate.local.unified.cdc.pipeline.common.properties.CdcPipelineProperties;
import io.eventuate.local.unified.cdc.pipeline.common.properties.CdcPipelineReaderProperties;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;

/* loaded from: input_file:io/eventuate/local/unified/cdc/pipeline/UnifiedCdcConfigurator.class */
public class UnifiedCdcConfigurator {

    @Value("${eventuate.cdc.service.dry.run:#{false}}")
    private boolean dryRunOption;

    @Autowired
    private Collection<CdcPipelineFactory> cdcPipelineFactories;

    @Autowired
    private Collection<CdcPipelineReaderFactory> cdcPipelineReaderFactories;

    @Autowired
    @Qualifier("defaultCdcPipelineFactory")
    private CdcPipelineFactory defaultCdcPipelineFactory;

    @Autowired
    @Qualifier("defaultCdcPipelineReaderFactory")
    private CdcPipelineReaderFactory defaultCdcPipelineReaderFactory;

    @Autowired
    private CdcPipelineProperties defaultCdcPipelineProperties;

    @Autowired
    private CdcPipelineReaderProperties defaultCdcPipelineReaderProperties;

    @Autowired
    private BinlogEntryReaderProvider binlogEntryReaderProvider;

    @Autowired
    private DefaultSourceTableNameResolver defaultSourceTableNameResolver;

    @Autowired
    private LeaderSelectorFactory leaderSelectorFactory;

    @Autowired
    private CdcMessageCleanerConfigurator cdcMessageCleanerConfigurator;

    @Autowired
    private PipelineConfigPropertiesProvider pipelineConfigPropertiesProvider;
    private Logger logger = LoggerFactory.getLogger(getClass());
    private List<CdcPipeline> cdcPipelines = new ArrayList();
    private Map<String, CdcPipelineProperties> pipelineProperties = new HashMap();
    private Map<String, CdcPipelineReaderProperties> pipelineReaderProperties = new HashMap();

    @PostConstruct
    public void initialize() {
        this.logger.info("Starting unified cdc pipelines");
        this.pipelineConfigPropertiesProvider.pipelineReaderProperties().ifPresentOrElse(map -> {
            map.forEach(this::createCdcPipelineReader);
        }, () -> {
            createStartSaveCdcDefaultPipelineReader(this.defaultCdcPipelineReaderProperties);
        });
        if (this.dryRunOption) {
            dryRun();
        } else {
            start();
            this.cdcMessageCleanerConfigurator.startMessageCleaners(this.pipelineProperties, this.pipelineReaderProperties);
        }
    }

    @PreDestroy
    public void stop() {
        this.binlogEntryReaderProvider.stop();
        this.cdcPipelines.forEach((v0) -> {
            v0.stop();
        });
        this.cdcMessageCleanerConfigurator.stopMessageCleaners();
    }

    private void start() {
        this.pipelineConfigPropertiesProvider.pipelineProperties().ifPresentOrElse(map -> {
            map.forEach(this::createStartSaveCdcPipeline);
        }, () -> {
            createStartSaveCdcDefaultPipeline(this.defaultCdcPipelineProperties);
        });
        this.binlogEntryReaderProvider.start();
        this.logger.info("Unified cdc pipelines are started");
    }

    private void dryRun() {
        this.logger.warn("Unified cdc pipelines are not started, 'dry run' option is used");
        List list = (List) this.binlogEntryReaderProvider.getAll().stream().map((v0) -> {
            return v0.getBinlogEntryReader();
        }).filter(binlogEntryReader -> {
            return binlogEntryReader instanceof MySqlBinaryLogClient;
        }).map(binlogEntryReader2 -> {
            return (MySqlBinaryLogClient) binlogEntryReader2;
        }).collect(Collectors.toList());
        list.forEach(mySqlBinaryLogClient -> {
            this.logger.info((String) mySqlBinaryLogClient.getMigrationInfo().map(migrationInfo -> {
                return String.format("MySqlBinaryLogClient '%s' received '%s' from the debezium storage, migration should be performed", mySqlBinaryLogClient.getReaderName(), migrationInfo.getBinlogFileOffset());
            }).orElse(String.format("MySqlBinaryLogClient '%s' did not receive offset from the debezium storage, migration should not be performed", mySqlBinaryLogClient.getReaderName())));
        });
        if (list.isEmpty()) {
            this.logger.info("There is no mysql binlog readers, migration information is unavailable.");
        }
        this.logger.warn("'dry run' option is used, application will be stopped.");
        System.exit(0);
    }

    private void createStartSaveCdcPipeline(String str, CdcPipelineProperties cdcPipelineProperties) {
        if (cdcPipelineProperties.getSourceTableName() == null) {
            cdcPipelineProperties.setSourceTableName(this.defaultSourceTableNameResolver.resolve(cdcPipelineProperties.getType()));
        }
        this.pipelineProperties.put(str.toLowerCase(), cdcPipelineProperties);
        CdcPipeline<?> createCdcPipeline = createCdcPipeline(cdcPipelineProperties);
        createCdcPipeline.start();
        this.cdcPipelines.add(createCdcPipeline);
    }

    private void createStartSaveCdcDefaultPipeline(CdcPipelineProperties cdcPipelineProperties) {
        cdcPipelineProperties.validate();
        CdcPipeline create = this.defaultCdcPipelineFactory.create(cdcPipelineProperties);
        create.start();
        this.cdcPipelines.add(create);
    }

    private void createStartSaveCdcDefaultPipelineReader(CdcPipelineReaderProperties cdcPipelineReaderProperties) {
        cdcPipelineReaderProperties.validate();
        this.binlogEntryReaderProvider.add(cdcPipelineReaderProperties.getReaderName(), new BinlogEntryReaderLeadership(cdcPipelineReaderProperties.getLeadershipLockPath(), this.leaderSelectorFactory, this.defaultCdcPipelineReaderFactory.create(cdcPipelineReaderProperties)));
    }

    private CdcPipeline<?> createCdcPipeline(CdcPipelineProperties cdcPipelineProperties) {
        return findCdcPipelineFactory(cdcPipelineProperties.getType()).create(cdcPipelineProperties);
    }

    private void createCdcPipelineReader(String str, CdcPipelineReaderProperties cdcPipelineReaderProperties) {
        CdcPipelineReaderFactory<? extends CdcPipelineReaderProperties, BinlogEntryReader> findCdcPipelineReaderFactory = findCdcPipelineReaderFactory(cdcPipelineReaderProperties.getType());
        this.pipelineReaderProperties.put(str.toLowerCase(), cdcPipelineReaderProperties);
        this.binlogEntryReaderProvider.add(str, new BinlogEntryReaderLeadership(cdcPipelineReaderProperties.getLeadershipLockPath(), this.leaderSelectorFactory, findCdcPipelineReaderFactory.create(cdcPipelineReaderProperties)));
    }

    private CdcPipelineFactory<?> findCdcPipelineFactory(String str) {
        return this.cdcPipelineFactories.stream().filter(cdcPipelineFactory -> {
            return cdcPipelineFactory.supports(str);
        }).findAny().orElseThrow(() -> {
            return new RuntimeException(String.format("pipeline factory not found for type %s", str));
        });
    }

    private CdcPipelineReaderFactory<? extends CdcPipelineReaderProperties, BinlogEntryReader> findCdcPipelineReaderFactory(String str) {
        return this.cdcPipelineReaderFactories.stream().filter(cdcPipelineReaderFactory -> {
            return cdcPipelineReaderFactory.supports(str);
        }).findAny().orElseThrow(() -> {
            return new RuntimeException(String.format("reader factory not found for type %s", str));
        });
    }
}
