package io.eventuate.local.unified.cdc.pipeline.common.factory;

import io.eventuate.common.eventuate.local.BinLogEvent;
import io.eventuate.common.jdbc.EventuateSchema;
import io.eventuate.local.common.BinlogEntryReader;
import io.eventuate.local.common.BinlogEntryToEventConverter;
import io.eventuate.local.common.CdcDataPublisher;
import io.eventuate.local.unified.cdc.pipeline.common.BinlogEntryReaderProvider;
import io.eventuate.local.unified.cdc.pipeline.common.CdcPipeline;
import io.eventuate.local.unified.cdc.pipeline.common.properties.CdcPipelineProperties;
import java.util.Objects;

/* loaded from: input_file:io/eventuate/local/unified/cdc/pipeline/common/factory/CdcPipelineFactory.class */
public class CdcPipelineFactory<EVENT extends BinLogEvent> {
    private String type;
    private BinlogEntryReaderProvider binlogEntryReaderProvider;
    private CdcDataPublisher<EVENT> cdcDataPublisher;
    private BinlogEntryToEventConverterFactory<EVENT> binlogEntryToEventConverterFactory;

    public CdcPipelineFactory(String str, BinlogEntryReaderProvider binlogEntryReaderProvider, CdcDataPublisher<EVENT> cdcDataPublisher, BinlogEntryToEventConverterFactory<EVENT> binlogEntryToEventConverterFactory) {
        this.type = str;
        this.binlogEntryReaderProvider = binlogEntryReaderProvider;
        this.cdcDataPublisher = cdcDataPublisher;
        this.binlogEntryToEventConverterFactory = binlogEntryToEventConverterFactory;
    }

    public boolean supports(String str) {
        return this.type.equals(str);
    }

    public CdcPipeline<EVENT> create(CdcPipelineProperties cdcPipelineProperties) {
        BinlogEntryReader binlogEntryReader = this.binlogEntryReaderProvider.getRequired(cdcPipelineProperties.getReader()).getBinlogEntryReader();
        EventuateSchema eventuateSchema = new EventuateSchema(cdcPipelineProperties.getEventuateDatabaseSchema());
        String sourceTableName = cdcPipelineProperties.getSourceTableName();
        BinlogEntryToEventConverter apply = this.binlogEntryToEventConverterFactory.apply(binlogEntryReader.getOutboxId());
        CdcDataPublisher<EVENT> cdcDataPublisher = this.cdcDataPublisher;
        Objects.requireNonNull(cdcDataPublisher);
        binlogEntryReader.addBinlogEntryHandler(eventuateSchema, sourceTableName, apply, cdcDataPublisher::sendMessage);
        return new CdcPipeline<>(this.cdcDataPublisher);
    }
}
