package io.debezium.connector.postgresql.connection;

import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.postgresql.PostgresType;
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.ServerInfo;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import java.nio.charset.Charset;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.Driver;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.util.PSQLState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/postgresql/connection/PostgresConnection.class */
public class PostgresConnection extends JdbcConnection {
    private static final int MAX_ATTEMPTS_FOR_OBTAINING_REPLICATION_SLOT = 900;
    private final TypeRegistry typeRegistry;
    private final Charset databaseCharset;
    private static Logger LOGGER = LoggerFactory.getLogger(PostgresConnection.class);
    private static final String URL_PATTERN = "jdbc:postgresql://${" + JdbcConfiguration.HOSTNAME + "}:${" + JdbcConfiguration.PORT + "}/${" + JdbcConfiguration.DATABASE + "}";
    protected static final JdbcConnection.ConnectionFactory FACTORY = JdbcConnection.patternBasedFactory(URL_PATTERN, Driver.class.getName(), PostgresConnection.class.getClassLoader(), new Field[0]);
    private static final Duration PAUSE_BETWEEN_REPLICATION_SLOT_RETRIEVAL_ATTEMPTS = Duration.ofSeconds(2);

    public PostgresConnection(Configuration configuration, boolean z) {
        super(configuration, FACTORY, PostgresConnection::validateServerVersion, PostgresConnection::defaultSettings);
        this.typeRegistry = z ? new TypeRegistry(this) : null;
        this.databaseCharset = determineDatabaseCharset();
    }

    public PostgresConnection(Configuration configuration) {
        this(configuration, false);
    }

    public String connectionString() {
        return connectionString(URL_PATTERN);
    }

    public ServerInfo.ReplicaIdentity readReplicaIdentityInfo(TableId tableId) throws SQLException {
        String schema = (tableId.schema() == null || tableId.schema().length() <= 0) ? "public" : tableId.schema();
        StringBuilder sb = new StringBuilder();
        prepareQuery("SELECT relreplident FROM pg_catalog.pg_class c LEFT JOIN pg_catalog.pg_namespace n ON c.relnamespace=n.oid WHERE n.nspname=? and c.relname=?", preparedStatement -> {
            preparedStatement.setString(1, schema);
            preparedStatement.setString(2, tableId.table());
        }, resultSet -> {
            if (resultSet.next()) {
                sb.append(resultSet.getString(1));
            } else {
                LOGGER.warn("Cannot determine REPLICA IDENTITY information for table '{}'", tableId);
            }
        });
        return ServerInfo.ReplicaIdentity.parseFromDB(sb.toString());
    }

    public SlotState getReplicationSlotState(String str, String str2) throws SQLException {
        try {
            ServerInfo.ReplicationSlot readReplicationSlotInfo = readReplicationSlotInfo(str, str2);
            if (readReplicationSlotInfo.equals(ServerInfo.ReplicationSlot.INVALID)) {
                return null;
            }
            return readReplicationSlotInfo.asSlotState();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ConnectException("Interrupted while waiting for valid replication slot info", e);
        }
    }

    private ServerInfo.ReplicationSlot fetchReplicationSlotInfo(String str, String str2) throws SQLException {
        String database = database();
        return queryForSlot(str, database, str2, resultSet -> {
            Long parseRestartLsn;
            if (!resultSet.next()) {
                LOGGER.debug("No replication slot '{}' is present for plugin '{}' and database '{}'", new Object[]{str, str2, database});
                return ServerInfo.ReplicationSlot.INVALID;
            }
            boolean z = resultSet.getBoolean("active");
            Long parseConfirmedFlushLsn = parseConfirmedFlushLsn(str, str2, database, resultSet);
            if (parseConfirmedFlushLsn == null || (parseRestartLsn = parseRestartLsn(str, str2, database, resultSet)) == null) {
                return null;
            }
            return new ServerInfo.ReplicationSlot(z, parseConfirmedFlushLsn, parseRestartLsn, Long.valueOf(resultSet.getLong("catalog_xmin")));
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServerInfo.ReplicationSlot readReplicationSlotInfo(String str, String str2) throws SQLException, InterruptedException {
        String database = database();
        Metronome parker = Metronome.parker(PAUSE_BETWEEN_REPLICATION_SLOT_RETRIEVAL_ATTEMPTS, Clock.SYSTEM);
        for (int i = 1; i <= MAX_ATTEMPTS_FOR_OBTAINING_REPLICATION_SLOT; i++) {
            ServerInfo.ReplicationSlot fetchReplicationSlotInfo = fetchReplicationSlotInfo(str, str2);
            if (fetchReplicationSlotInfo != null) {
                LOGGER.info("Obtained valid replication slot {}", fetchReplicationSlotInfo);
                return fetchReplicationSlotInfo;
            }
            LOGGER.warn("Cannot obtain valid replication slot '{}' for plugin '{}' and database '{}' [during attempt {} out of {}, concurrent tx probably blocks taking snapshot.", new Object[]{str, str2, database, Integer.valueOf(i), Integer.valueOf(MAX_ATTEMPTS_FOR_OBTAINING_REPLICATION_SLOT)});
            parker.pause();
        }
        throw new ConnectException("Unable to obtain valid replication slot. Make sure there are no long-running transactions running in parallel as they may hinder the allocation of the replication slot when starting this connector");
    }

    protected ServerInfo.ReplicationSlot queryForSlot(String str, String str2, String str3, JdbcConnection.ResultSetMapper<ServerInfo.ReplicationSlot> resultSetMapper) throws SQLException {
        return (ServerInfo.ReplicationSlot) prepareQueryAndMap("select * from pg_replication_slots where slot_name = ? and database = ? and plugin = ?", preparedStatement -> {
            preparedStatement.setString(1, str);
            preparedStatement.setString(2, str2);
            preparedStatement.setString(3, str3);
        }, resultSetMapper);
    }

    private Long parseConfirmedFlushLsn(String str, String str2, String str3, ResultSet resultSet) {
        Long tryParseLsn;
        try {
            tryParseLsn = tryParseLsn(str, str2, str3, resultSet, "confirmed_flush_lsn");
        } catch (SQLException e) {
            LOGGER.info("unable to find confirmed_flushed_lsn, falling back to restart_lsn");
            try {
                tryParseLsn = tryParseLsn(str, str2, str3, resultSet, "restart_lsn");
            } catch (SQLException e2) {
                throw new ConnectException("Neither confirmed_flush_lsn nor restart_lsn could be found");
            }
        }
        return tryParseLsn;
    }

    private Long parseRestartLsn(String str, String str2, String str3, ResultSet resultSet) {
        try {
            return tryParseLsn(str, str2, str3, resultSet, "restart_lsn");
        } catch (SQLException e) {
            throw new ConnectException("restart_lsn could be found");
        }
    }

    private Long tryParseLsn(String str, String str2, String str3, ResultSet resultSet, String str4) throws ConnectException, SQLException {
        String string = resultSet.getString(str4);
        if (string == null) {
            return null;
        }
        try {
            Long valueOf = Long.valueOf(LogSequenceNumber.valueOf(string).asLong());
            if (valueOf.longValue() == LogSequenceNumber.INVALID_LSN.asLong()) {
                throw new ConnectException("Invalid LSN returned from database");
            }
            return valueOf;
        } catch (Exception e) {
            throw new ConnectException("Value " + str4 + " in the pg_replication_slots table for slot = '" + str + "', plugin = '" + str2 + "', database = '" + str3 + "' is not valid. This is an abnormal situation and the database status should be checked.");
        }
    }

    public boolean dropReplicationSlot(String str) {
        for (int i = 0; i < 3; i++) {
            try {
                execute(new String[]{"select pg_drop_replication_slot('" + str + "')"});
                return true;
            } catch (SQLException e) {
                if (!PSQLState.OBJECT_IN_USE.getState().equals(e.getSQLState())) {
                    if (PSQLState.UNDEFINED_OBJECT.getState().equals(e.getSQLState())) {
                        LOGGER.debug("Replication slot {} has already been dropped", str);
                        return false;
                    }
                    LOGGER.error("Unexpected error while attempting to drop replication slot", e);
                    return false;
                }
                if (i >= 2) {
                    LOGGER.warn("Cannot drop replication slot '{}' because it's still in use", str);
                    return false;
                }
                LOGGER.debug("Cannot drop replication slot '{}' because it's still in use", str);
                try {
                    Metronome.parker(Duration.ofSeconds(1L), Clock.system()).pause();
                } catch (InterruptedException e2) {
                }
            }
        }
        return false;
    }

    public boolean dropPublication(String str) {
        try {
            LOGGER.debug("Dropping publication '{}'", str);
            execute(new String[]{"DROP PUBLICATION " + str});
            return true;
        } catch (SQLException e) {
            if (PSQLState.UNDEFINED_OBJECT.getState().equals(e.getSQLState())) {
                LOGGER.debug("Publication {} has already been dropped", str);
                return false;
            }
            LOGGER.error("Unexpected error while attempting to drop publication", e);
            return false;
        }
    }

    public synchronized void close() {
        try {
            super.close();
        } catch (SQLException e) {
            LOGGER.error("Unexpected error while closing Postgres connection", e);
        }
    }

    public Long currentTransactionId() throws SQLException {
        AtomicLong atomicLong = new AtomicLong(0L);
        query("select * from txid_current()", resultSet -> {
            if (resultSet.next()) {
                atomicLong.compareAndSet(0L, resultSet.getLong(1));
            }
        });
        long j = atomicLong.get();
        if (j > 0) {
            return Long.valueOf(j);
        }
        return null;
    }

    public long currentXLogLocation() throws SQLException {
        AtomicLong atomicLong = new AtomicLong(0L);
        query(connection().getMetaData().getDatabaseMajorVersion() >= 10 ? "select * from pg_current_wal_lsn()" : "select * from pg_current_xlog_location()", resultSet -> {
            if (!resultSet.next()) {
                throw new IllegalStateException("there should always be a valid xlog position");
            }
            atomicLong.compareAndSet(0L, LogSequenceNumber.valueOf(resultSet.getString(1)).asLong());
        });
        return atomicLong.get();
    }

    public ServerInfo serverInfo() throws SQLException {
        ServerInfo serverInfo = new ServerInfo();
        query("SELECT version(), current_user, current_database()", resultSet -> {
            if (resultSet.next()) {
                serverInfo.withServer(resultSet.getString(1)).withUsername(resultSet.getString(2)).withDatabase(resultSet.getString(3));
            }
        });
        String username = serverInfo.username();
        if (username != null) {
            query("SELECT oid, rolname, rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin, rolreplication FROM pg_roles WHERE pg_has_role('" + username + "', oid, 'member')", resultSet2 -> {
                while (resultSet2.next()) {
                    serverInfo.addRole(resultSet2.getString(2), "superuser: " + resultSet2.getBoolean(3) + ", replication: " + resultSet2.getBoolean(8) + ", inherit: " + resultSet2.getBoolean(4) + ", create role: " + resultSet2.getBoolean(5) + ", create db: " + resultSet2.getBoolean(6) + ", can log in: " + resultSet2.getBoolean(7));
                }
            });
        }
        return serverInfo;
    }

    public Charset getDatabaseCharset() {
        return this.databaseCharset;
    }

    private Charset determineDatabaseCharset() {
        try {
            return Charset.forName(connection().getEncoding().name());
        } catch (SQLException e) {
            throw new RuntimeException("Couldn't obtain encoding for database " + database(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void defaultSettings(Configuration.Builder builder) {
        builder.with("assumeMinServerVersion", "9.4");
    }

    private static void validateServerVersion(Statement statement) throws SQLException {
        DatabaseMetaData metaData = statement.getConnection().getMetaData();
        int databaseMajorVersion = metaData.getDatabaseMajorVersion();
        int databaseMinorVersion = metaData.getDatabaseMinorVersion();
        if (databaseMajorVersion < 9 || (databaseMajorVersion == 9 && databaseMinorVersion < 4)) {
            throw new SQLException("Cannot connect to a version of Postgres lower than 9.4");
        }
    }

    protected int resolveNativeType(String str) {
        return getTypeRegistry().get(str).getRootType().getOid();
    }

    protected int resolveJdbcType(int i, int i2) {
        return getTypeRegistry().get(i2).getRootType().getJdbcId();
    }

    protected Optional<ColumnEditor> readTableColumn(ResultSet resultSet, TableId tableId, Tables.ColumnNameFilter columnNameFilter) throws SQLException {
        String string = resultSet.getString(4);
        if (columnNameFilter != null && !columnNameFilter.matches(tableId.catalog(), tableId.schema(), tableId.table(), string)) {
            return Optional.empty();
        }
        ColumnEditor name = Column.editor().name(string);
        name.type(resultSet.getString(6));
        name.length(resultSet.getInt(7));
        if (resultSet.getObject(9) != null) {
            name.scale(Integer.valueOf(resultSet.getInt(9)));
        }
        name.optional(isNullable(resultSet.getInt(11)));
        name.position(resultSet.getInt(17));
        name.autoIncremented("YES".equalsIgnoreCase(resultSet.getString(23)));
        String str = null;
        try {
            str = resultSet.getString(24);
        } catch (SQLException e) {
        }
        name.generated("YES".equalsIgnoreCase(str));
        PostgresType postgresType = getTypeRegistry().get(name.typeName());
        name.nativeType(postgresType.getRootType().getOid());
        name.jdbcType(postgresType.getRootType().getJdbcId());
        if (2001 == postgresType.getJdbcId()) {
            name.length(postgresType.getDefaultLength());
            name.scale(Integer.valueOf(postgresType.getDefaultScale()));
        }
        return Optional.of(name);
    }

    public TypeRegistry getTypeRegistry() {
        Objects.requireNonNull(this.typeRegistry, "Connection does not provide type registry");
        return this.typeRegistry;
    }
}
