package io.eventuate.local.unified.cdc.pipeline.common.health;

import io.eventuate.local.common.BinlogEntryReader;
import io.eventuate.local.db.log.common.DbLogClient;
import io.eventuate.local.mysql.binlog.MySqlBinaryLogClient;
import io.eventuate.local.unified.cdc.pipeline.common.BinlogEntryReaderProvider;
import org.springframework.beans.factory.annotation.Value;

/* loaded from: input_file:io/eventuate/local/unified/cdc/pipeline/common/health/BinlogEntryReaderHealthCheck.class */
public class BinlogEntryReaderHealthCheck extends AbstractHealthCheck {

    @Value("${eventuatelocal.cdc.max.event.interval.to.assume.reader.healthy:#{60000}}")
    private long maxEventIntervalToAssumeReaderHealthy;
    private BinlogEntryReaderProvider binlogEntryReaderProvider;

    public BinlogEntryReaderHealthCheck(BinlogEntryReaderProvider binlogEntryReaderProvider) {
        this.binlogEntryReaderProvider = binlogEntryReaderProvider;
    }

    @Override // io.eventuate.local.unified.cdc.pipeline.common.health.AbstractHealthCheck
    protected void determineHealth(HealthBuilder healthBuilder) {
        this.binlogEntryReaderProvider.getAll().forEach(binlogEntryReaderLeadership -> {
            BinlogEntryReader binlogEntryReader = binlogEntryReaderLeadership.getBinlogEntryReader();
            binlogEntryReader.getProcessingError().ifPresent(str -> {
                healthBuilder.addError(String.format("%s got error during processing: %s", binlogEntryReader.getReaderName(), str));
            });
            if (binlogEntryReader instanceof MySqlBinaryLogClient) {
                checkMySqlBinlogReaderHealth((MySqlBinaryLogClient) binlogEntryReader, healthBuilder);
            }
            if (!binlogEntryReaderLeadership.isLeader()) {
                healthBuilder.addDetail(String.format("%s is not the leader", binlogEntryReader.getReaderName()));
                return;
            }
            checkBinlogEntryReaderHealth(binlogEntryReader, healthBuilder);
            if (binlogEntryReader instanceof DbLogClient) {
                checkDbLogReaderHealth((DbLogClient) binlogEntryReader, healthBuilder);
            }
        });
    }

    private void checkDbLogReaderHealth(DbLogClient dbLogClient, HealthBuilder healthBuilder) {
        if (dbLogClient.isConnected()) {
            healthBuilder.addDetail(String.format("Reader with id %s is connected", dbLogClient.getReaderName()));
        } else {
            healthBuilder.addError(String.format("Reader with id %s disconnected", dbLogClient.getReaderName()));
        }
    }

    private void checkBinlogEntryReaderHealth(BinlogEntryReader binlogEntryReader, HealthBuilder healthBuilder) {
        long currentTimeMillis = System.currentTimeMillis() - binlogEntryReader.getLastEventTime();
        if (currentTimeMillis > this.maxEventIntervalToAssumeReaderHealthy) {
            healthBuilder.addError(String.format("Reader with id %s has not received message for %s milliseconds", binlogEntryReader.getReaderName(), Long.valueOf(currentTimeMillis)));
        } else {
            healthBuilder.addDetail(String.format("Reader with id %s received message %s milliseconds ago", binlogEntryReader.getReaderName(), Long.valueOf(currentTimeMillis)));
        }
    }

    private void checkMySqlBinlogReaderHealth(MySqlBinaryLogClient mySqlBinaryLogClient, HealthBuilder healthBuilder) {
        mySqlBinaryLogClient.getPublishingException().ifPresent(exc -> {
            healthBuilder.addError(String.format("Reader with id %s failed to publish event, exception: %s", mySqlBinaryLogClient.getReaderName(), exc.getMessage()));
        });
    }
}
