package io.confluent.connect.jdbc.source;

import io.confluent.connect.jdbc.dialect.DatabaseDialect;
import io.confluent.connect.jdbc.util.ConnectionProvider;
import io.confluent.connect.jdbc.util.QuoteMethod;
import io.confluent.connect.jdbc.util.TableId;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/jdbc/source/TableMonitorThread.class */
public class TableMonitorThread extends Thread {
    private static final Logger log = LoggerFactory.getLogger(TableMonitorThread.class);
    private final DatabaseDialect dialect;
    private final ConnectionProvider connectionProvider;
    private final ConnectorContext context;
    private final long startupMs;
    private final long pollMs;
    private final Set<String> whitelist;
    private final Set<String> blacklist;
    private final Time time;
    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
    private final AtomicReference<List<TableId>> tables = new AtomicReference<>();

    public TableMonitorThread(DatabaseDialect databaseDialect, ConnectionProvider connectionProvider, ConnectorContext connectorContext, long j, long j2, Set<String> set, Set<String> set2, Time time) {
        this.dialect = databaseDialect;
        this.connectionProvider = connectionProvider;
        this.context = connectorContext;
        this.startupMs = j;
        this.pollMs = j2;
        this.whitelist = set;
        this.blacklist = set2;
        this.time = time;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        log.info("Starting thread to monitor tables.");
        while (this.shutdownLatch.getCount() > 0) {
            try {
                if (updateTables()) {
                    this.context.requestTaskReconfiguration();
                }
                try {
                    log.debug("Waiting {} ms to check for changed.", Long.valueOf(this.pollMs));
                } catch (InterruptedException e) {
                    log.error("Unexpected InterruptedException, ignoring: ", e);
                }
                if (this.shutdownLatch.await(this.pollMs, TimeUnit.MILLISECONDS)) {
                    return;
                }
            } catch (Exception e2) {
                throw fail(e2);
            }
        }
    }

    public List<TableId> tables() {
        awaitTablesReady(this.startupMs);
        List<TableId> list = this.tables.get();
        if (list == null) {
            log.info("No Tables snapshot available");
            return null;
        }
        Map map = (Map) ((Map) list.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.tableName();
        }))).entrySet().stream().filter(entry -> {
            return ((List) entry.getValue()).size() > 1;
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        if (list.isEmpty()) {
            log.info("Based on the supplied filtering rules, there are no matching tables to read from");
        } else {
            log.debug("Based on the supplied filtering rules, the tables available to read from include: {}", this.dialect.expressionBuilder().appendList().delimitedBy(",").of(list));
        }
        if (map.isEmpty()) {
            return list;
        }
        throw fail(new ConnectException(("The connector uses the unqualified table name as the topic name and has detected duplicate unqualified table names. This could lead to mixed data types in the topic and downstream processing errors. To prevent such processing errors, the JDBC Source connector fails to start when it detects duplicate table name configurations. Update the connector's " + (this.whitelist != null ? "'table.whitelist'" : this.blacklist != null ? "'table.blacklist'" : "'table.whitelist' or 'table.blacklist'") + " config to include exactly one table in each of the tables listed below.\n\t") + map.values()));
    }

    private void awaitTablesReady(long j) {
        try {
            this.time.waitObject(this.tables, () -> {
                return Boolean.valueOf(this.tables.get() != null);
            }, this.time.milliseconds() + j);
        } catch (InterruptedException | TimeoutException e) {
            log.warn("Timed out or interrupted while awaiting for tables being read.");
        }
    }

    public void shutdown() {
        log.info("Shutting down thread monitoring tables.");
        this.shutdownLatch.countDown();
    }

    private boolean updateTables() {
        try {
            List<TableId> tableIds = this.dialect.tableIds(this.connectionProvider.getConnection());
            log.debug("Got the following tables: {}", tableIds);
            ArrayList arrayList = new ArrayList(tableIds.size());
            if (this.whitelist != null) {
                for (TableId tableId : tableIds) {
                    String expressionBuilder = this.dialect.expressionBuilder().append(tableId, QuoteMethod.NEVER).toString();
                    String expressionBuilder2 = this.dialect.expressionBuilder().append(tableId, QuoteMethod.ALWAYS).toString();
                    if (this.whitelist.contains(expressionBuilder) || this.whitelist.contains(expressionBuilder2) || this.whitelist.contains(tableId.tableName())) {
                        arrayList.add(tableId);
                    }
                }
            } else if (this.blacklist != null) {
                for (TableId tableId2 : tableIds) {
                    String expressionBuilder3 = this.dialect.expressionBuilder().append(tableId2, QuoteMethod.NEVER).toString();
                    String expressionBuilder4 = this.dialect.expressionBuilder().append(tableId2, QuoteMethod.ALWAYS).toString();
                    if (!this.blacklist.contains(expressionBuilder3) && !this.blacklist.contains(expressionBuilder4) && !this.blacklist.contains(tableId2.tableName())) {
                        arrayList.add(tableId2);
                    }
                }
            } else {
                arrayList.addAll(tableIds);
            }
            List<TableId> andSet = this.tables.getAndSet(arrayList);
            synchronized (this.tables) {
                this.tables.notifyAll();
            }
            return !Objects.equals(andSet, arrayList);
        } catch (SQLException e) {
            log.error("Error while trying to get updated table list, ignoring and waiting for next table poll interval", e);
            this.connectionProvider.close();
            return false;
        }
    }

    private RuntimeException fail(Throwable th) {
        log.error("Encountered an unrecoverable error while reading tables from the database", th);
        ConnectException connectException = new ConnectException("Encountered an unrecoverable error while reading tables from the database", th);
        this.context.raiseError(connectException);
        this.shutdownLatch.countDown();
        return connectException;
    }
}
