package io.eventuate.local.postgres.wal;

import io.eventuate.common.jdbc.EventuateSchema;
import io.eventuate.common.jdbc.SchemaAndTable;
import io.eventuate.common.json.mapper.JSonMapper;
import io.eventuate.local.common.BinlogEntry;
import io.eventuate.local.common.BinlogEntryHandler;
import io.eventuate.local.common.CdcProcessingStatusService;
import io.eventuate.local.common.OffsetProcessor;
import io.eventuate.local.db.log.common.DbLogClient;
import io.micrometer.core.instrument.MeterRegistry;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.postgresql.PGConnection;
import org.postgresql.PGProperty;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;

/* loaded from: input_file:io/eventuate/local/postgres/wal/PostgresWalClient.class */
public class PostgresWalClient extends DbLogClient {
    private PostgresWalBinlogEntryExtractor postgresWalBinlogEntryExtractor;
    private int walIntervalInMilliseconds;
    private int connectionTimeoutInMilliseconds;
    private int maxAttemptsForBinlogConnection;
    private Connection connection;
    private PGReplicationStream stream;
    private int replicationStatusIntervalInMilliseconds;
    private String replicationSlotName;
    private final PostgresWalCdcProcessingStatusService postgresWalCdcProcessingStatusService;
    private OffsetProcessor<LogSequenceNumber> offsetProcessor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/eventuate/local/postgres/wal/PostgresWalClient$BinlogEntryWithSchemaAndTable.class */
    public static class BinlogEntryWithSchemaAndTable {
        private BinlogEntry binlogEntry;
        private SchemaAndTable schemaAndTable;

        public BinlogEntryWithSchemaAndTable(BinlogEntry binlogEntry, SchemaAndTable schemaAndTable) {
            this.binlogEntry = binlogEntry;
            this.schemaAndTable = schemaAndTable;
        }

        public BinlogEntry getBinlogEntry() {
            return this.binlogEntry;
        }

        public SchemaAndTable getSchemaAndTable() {
            return this.schemaAndTable;
        }

        public static BinlogEntryWithSchemaAndTable make(PostgresWalBinlogEntryExtractor postgresWalBinlogEntryExtractor, PostgresWalChange postgresWalChange) {
            return new BinlogEntryWithSchemaAndTable(postgresWalBinlogEntryExtractor.extract(postgresWalChange), new SchemaAndTable(postgresWalChange.getSchema(), postgresWalChange.getTable()));
        }
    }

    public PostgresWalClient(MeterRegistry meterRegistry, String str, String str2, String str3, int i, int i2, int i3, int i4, String str4, DataSource dataSource, String str5, long j, int i5, int i6, String str6, long j2, EventuateSchema eventuateSchema, Long l) {
        super(meterRegistry, str2, str3, str, dataSource, str5, j, i5, i6, eventuateSchema, l);
        this.walIntervalInMilliseconds = i;
        this.connectionTimeoutInMilliseconds = i2;
        this.maxAttemptsForBinlogConnection = i3;
        this.replicationStatusIntervalInMilliseconds = i4;
        this.replicationSlotName = str4;
        this.postgresWalBinlogEntryExtractor = new PostgresWalBinlogEntryExtractor();
        this.postgresWalCdcProcessingStatusService = new PostgresWalCdcProcessingStatusService(dataSource, str6, j2);
    }

    public CdcProcessingStatusService getCdcProcessingStatusService() {
        return this.postgresWalCdcProcessingStatusService;
    }

    public void start() {
        this.logger.info("Starting PostgresWalClient");
        super.start();
        this.stopCountDownLatch = new CountDownLatch(1);
        this.running.set(true);
        connectWithRetriesOnFail();
        this.logger.info("PostgresWalClient finished processing");
    }

    private void connectWithRetriesOnFail() {
        int i = 1;
        while (this.running.get()) {
            try {
                this.logger.info("trying to connect to postgres wal");
                connectAndRun();
                break;
            } catch (SQLException e) {
                onDisconnected();
                this.logger.error("connection to postgres wal failed");
                if (i == this.maxAttemptsForBinlogConnection) {
                    handleProcessingFailException(e);
                }
                try {
                    Thread.sleep(this.connectionTimeoutInMilliseconds);
                } catch (InterruptedException e2) {
                    handleProcessingFailException(e);
                }
                i++;
            } catch (Exception e3) {
                handleProcessingFailException(e3);
                i++;
            }
        }
        this.stopCountDownLatch.countDown();
    }

    private void connectAndRun() throws SQLException {
        Properties properties = new Properties();
        PGProperty.USER.set(properties, this.dbUserName);
        PGProperty.PASSWORD.set(properties, this.dbPassword);
        PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
        PGProperty.REPLICATION.set(properties, "database");
        PGProperty.PREFER_QUERY_MODE.set(properties, "simple");
        this.connection = DriverManager.getConnection(this.dataSourceUrl, properties);
        this.stream = ((PGConnection) this.connection.unwrap(PGConnection.class)).getReplicationAPI().replicationStream().logical().withSlotName(this.replicationSlotName).withSlotOption("include-xids", false).withSlotOption("write-in-chunks", true).withStatusInterval(this.replicationStatusIntervalInMilliseconds, TimeUnit.MILLISECONDS).start();
        this.offsetProcessor = new OffsetProcessor<>(logSequenceNumber -> {
            this.stream.setAppliedLSN(this.stream.getLastReceiveLSN());
            this.stream.setFlushedLSN(this.stream.getLastReceiveLSN());
            try {
                this.stream.forceUpdateStatus();
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }
        }, (v1) -> {
            handleProcessingFailException(v1);
        });
        onConnected();
        this.logger.info("connection to postgres wal succeed");
        StringBuilder sb = new StringBuilder();
        while (this.running.get()) {
            ByteBuffer readPending = this.stream.readPending();
            if (readPending == null) {
                saveOffsetOfLastProcessedEvent();
                this.logger.debug("Got empty message, sleeping");
                try {
                    TimeUnit.MILLISECONDS.sleep(this.walIntervalInMilliseconds);
                } catch (InterruptedException e) {
                    handleProcessingFailException(e);
                }
            } else {
                String extractStringFromBuffer = extractStringFromBuffer(readPending);
                sb.append(extractStringFromBuffer);
                if ("]}".equals(extractStringFromBuffer)) {
                    this.dbLogMetrics.onBinlogEntryProcessed();
                    String sb2 = sb.toString();
                    sb.setLength(0);
                    this.logger.debug("Got message: {}", sb2);
                    PostgresWalMessage postgresWalMessage = (PostgresWalMessage) JSonMapper.fromJson(sb2, PostgresWalMessage.class);
                    checkMonitoringChange(postgresWalMessage);
                    LogSequenceNumber lastReceiveLSN = this.stream.getLastReceiveLSN();
                    this.logger.debug("received offset: {} == {}", lastReceiveLSN, Long.valueOf(lastReceiveLSN.asLong()));
                    List list = (List) Arrays.stream(postgresWalMessage.getChange()).filter(postgresWalChange -> {
                        return postgresWalChange.getKind().equals("insert");
                    }).map(postgresWalChange2 -> {
                        return BinlogEntryWithSchemaAndTable.make(this.postgresWalBinlogEntryExtractor, postgresWalChange2);
                    }).collect(Collectors.toList());
                    this.binlogEntryHandlers.forEach(binlogEntryHandler -> {
                        list.stream().filter(binlogEntryWithSchemaAndTable -> {
                            return binlogEntryHandler.isFor(binlogEntryWithSchemaAndTable.getSchemaAndTable());
                        }).map((v0) -> {
                            return v0.getBinlogEntry();
                        }).forEach(binlogEntry -> {
                            handleBinlogEntry(binlogEntry, binlogEntryHandler);
                        });
                    });
                    saveOffsetOfLastProcessedEvent();
                }
            }
        }
        this.stopCountDownLatch.countDown();
    }

    private void handleBinlogEntry(BinlogEntry binlogEntry, BinlogEntryHandler binlogEntryHandler) {
        LogSequenceNumber lastReceiveLSN = this.stream.getLastReceiveLSN();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = null;
        try {
            completableFuture2 = binlogEntryHandler.publish(binlogEntry);
        } catch (Exception e) {
            handleProcessingFailException(e);
        }
        completableFuture2.whenComplete((obj, th) -> {
            if (th == null) {
                completableFuture.complete(Optional.of(lastReceiveLSN));
            } else {
                completableFuture.completeExceptionally(th);
                handleProcessingFailException(th);
            }
        });
        this.offsetProcessor.saveOffset(completableFuture);
        onEventReceived();
    }

    public void stop(boolean z) {
        this.logger.info("Stopping PostgresWalClient");
        super.stop(z);
        try {
            this.stream.close();
        } catch (Exception e) {
            this.logger.error(e.getMessage(), e);
        }
        try {
            this.connection.close();
        } catch (Exception e2) {
            this.logger.error(e2.getMessage(), e2);
        }
        this.logger.info("Stopped PostgresWalClient");
    }

    private void checkMonitoringChange(PostgresWalMessage postgresWalMessage) {
        Arrays.stream(postgresWalMessage.getChange()).filter(postgresWalChange -> {
            return this.cdcMonitoringDao.isMonitoringTableChange(postgresWalChange.getSchema(), postgresWalChange.getTable());
        }).findAny().ifPresent(postgresWalChange2 -> {
            this.dbLogMetrics.onLagMeasurementEventReceived(Long.parseLong(postgresWalChange2.getColumnvalues()[Arrays.asList(postgresWalChange2.getColumnnames()).indexOf("last_time")]));
            onEventReceived();
        });
    }

    private String extractStringFromBuffer(ByteBuffer byteBuffer) {
        int arrayOffset = byteBuffer.arrayOffset();
        byte[] array = byteBuffer.array();
        return new String(array, arrayOffset, array.length - arrayOffset);
    }

    private void saveOffsetOfLastProcessedEvent() {
        if (this.postgresWalCdcProcessingStatusService != null) {
            this.postgresWalCdcProcessingStatusService.saveEndingOffsetOfLastProcessedEvent(this.stream.getLastReceiveLSN().asLong());
        }
    }
}
