package io.eventuate.local.postgres.wal;

import io.eventuate.local.common.CdcProcessingStatus;
import io.eventuate.local.common.CdcProcessingStatusService;
import java.util.UUID;
import javax.sql.DataSource;
import org.postgresql.replication.LogSequenceNumber;
import org.springframework.jdbc.core.JdbcTemplate;

/* loaded from: input_file:io/eventuate/local/postgres/wal/PostgresWalCdcProcessingStatusService.class */
public class PostgresWalCdcProcessingStatusService implements CdcProcessingStatusService {
    private final JdbcTemplate jdbcTemplate;
    private volatile long endingOffsetOfLastProcessedEvent;
    private long currentWalPosition;
    private final String additionalSlotName;
    private final WaitUtil waitUtil;

    public PostgresWalCdcProcessingStatusService(DataSource dataSource, String str, long j) {
        this.jdbcTemplate = new JdbcTemplate(dataSource);
        this.additionalSlotName = str;
        this.waitUtil = new WaitUtil(j);
    }

    public CdcProcessingStatus getCurrentStatus() {
        checkCurrentWalOffsetAndWaitForSyncWithOffsetOfLastProcessedEvent();
        return new CdcProcessingStatus(this.endingOffsetOfLastProcessedEvent, this.currentWalPosition);
    }

    public void saveEndingOffsetOfLastProcessedEvent(long j) {
        this.endingOffsetOfLastProcessedEvent = j;
    }

    private synchronized void checkCurrentWalOffsetAndWaitForSyncWithOffsetOfLastProcessedEvent() {
        if (this.waitUtil.start()) {
            this.currentWalPosition = getCurrentWalPosition();
        } else if (this.currentWalPosition == this.endingOffsetOfLastProcessedEvent) {
            this.waitUtil.stop();
        } else {
            this.waitUtil.tick();
        }
    }

    private long getCurrentWalPosition() {
        try {
            this.jdbcTemplate.queryForList("SELECT * FROM pg_create_logical_replication_slot(?, 'test_decoding')", new Object[]{this.additionalSlotName});
            this.jdbcTemplate.execute("CREATE TABLE if not exists eventuate.replication_test (id VARCHAR(64) PRIMARY KEY)");
            this.jdbcTemplate.execute("DELETE FROM eventuate.replication_test");
            this.jdbcTemplate.update("INSERT INTO eventuate.replication_test values (?)", new Object[]{UUID.randomUUID().toString()});
            long asLong = LogSequenceNumber.valueOf((String) this.jdbcTemplate.queryForObject("SELECT lsn FROM pg_logical_slot_get_changes(?, NULL, NULL) ORDER BY lsn DESC limit 1", String.class, new Object[]{this.additionalSlotName})).asLong();
            this.jdbcTemplate.queryForList("SELECT * FROM pg_drop_replication_slot(?)", new Object[]{this.additionalSlotName});
            return asLong;
        } catch (Throwable th) {
            this.jdbcTemplate.queryForList("SELECT * FROM pg_drop_replication_slot(?)", new Object[]{this.additionalSlotName});
            throw th;
        }
    }
}
