package io.eventuate.local.common;

import io.eventuate.common.eventuate.local.BinLogEvent;
import io.eventuate.common.jdbc.EventuateSchema;
import io.eventuate.common.jdbc.SchemaAndTable;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/eventuate/local/common/BinlogEntryReader.class */
public abstract class BinlogEntryReader {
    protected MeterRegistry meterRegistry;
    protected CountDownLatch stopCountDownLatch;
    protected String dataSourceUrl;
    protected DataSource dataSource;
    protected String readerName;
    protected Long outboxId;
    protected CommonCdcMetrics commonCdcMetrics;
    protected Logger logger = LoggerFactory.getLogger(getClass());
    protected List<BinlogEntryHandler> binlogEntryHandlers = new CopyOnWriteArrayList();
    protected AtomicBoolean running = new AtomicBoolean(false);
    protected volatile Optional<String> processingError = Optional.empty();
    private volatile long lastEventTime = 0;
    protected Optional<Runnable> restartCallback = Optional.empty();

    public BinlogEntryReader(MeterRegistry meterRegistry, String str, DataSource dataSource, String str2, Long l) {
        this.meterRegistry = meterRegistry;
        this.dataSourceUrl = str;
        this.dataSource = dataSource;
        this.readerName = str2;
        this.outboxId = l;
        this.commonCdcMetrics = new CommonCdcMetrics(meterRegistry, str2);
    }

    public abstract CdcProcessingStatusService getCdcProcessingStatusService();

    public Optional<String> getProcessingError() {
        return this.processingError;
    }

    public String getReaderName() {
        return this.readerName;
    }

    public Long getOutboxId() {
        return this.outboxId;
    }

    public long getLastEventTime() {
        return this.lastEventTime;
    }

    public <EVENT extends BinLogEvent> BinlogEntryHandler addBinlogEntryHandler(EventuateSchema eventuateSchema, String str, BinlogEntryToEventConverter<EVENT> binlogEntryToEventConverter, Function<EVENT, CompletableFuture<?>> function) {
        this.logger.info("Adding binlog entry handler for schema = {}, table = {}", eventuateSchema.getEventuateDatabaseSchema(), str);
        if (eventuateSchema.isEmpty()) {
            throw new IllegalArgumentException("The eventuate schema cannot be empty for the cdc processor.");
        }
        BinlogEntryHandler binlogEntryHandler = new BinlogEntryHandler(new SchemaAndTable(eventuateSchema.getEventuateDatabaseSchema(), str), binlogEntryToEventConverter, function);
        this.binlogEntryHandlers.add(binlogEntryHandler);
        this.logger.info("Added binlog entry handler for schema = {}, table = {}", eventuateSchema.getEventuateDatabaseSchema(), str);
        return binlogEntryHandler;
    }

    public void start() {
        this.commonCdcMetrics.setLeader(true);
    }

    public void stop() {
        stop(true);
    }

    public void stop(boolean z) {
        if (this.running.compareAndSet(true, false)) {
            try {
                this.stopCountDownLatch.await();
            } catch (InterruptedException e) {
                this.logger.error(e.getMessage(), e);
            }
            if (z) {
                this.binlogEntryHandlers.clear();
            }
            stopMetrics();
        }
    }

    public void setRestartCallback(Runnable runnable) {
        this.restartCallback = Optional.of(runnable);
    }

    protected void stopMetrics() {
        this.commonCdcMetrics.setLeader(false);
    }

    protected void onEventReceived() {
        this.commonCdcMetrics.onMessageProcessed();
        onActivity();
    }

    protected void onActivity() {
        this.lastEventTime = System.currentTimeMillis();
    }

    protected void handleProcessingFailException(Throwable th) {
        this.logger.error("Stopping due to exception", th);
        this.processingError = Optional.of(th.getMessage());
        this.stopCountDownLatch.countDown();
        throw new RuntimeException(th);
    }
}
