package io.confluent.connect.replicator.offsets;

import io.confluent.connect.replicator.util.NewReplicatorAdminClient;
import io.confluent.connect.replicator.util.ReplicatorAdminClient;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/replicator/offsets/ConsumerTimestampsWriter.class */
public class ConsumerTimestampsWriter implements Configurable, AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(ConsumerTimestampsWriter.class);
    private Producer<GroupTopicPartition, TimestampAndDelta> producer;
    private ReplicatorAdminClient adminClient;
    private boolean closeAdminClient = false;
    private Set<String> whitelistTopics;
    private Pattern topicPattern;
    private Set<String> blacklistTopics;

    public ConsumerTimestampsWriter() {
    }

    public ConsumerTimestampsWriter(ReplicatorAdminClient replicatorAdminClient) {
        this.adminClient = replicatorAdminClient;
    }

    public void configure(Map<String, ?> map) {
        Throwable th;
        if (this.adminClient == null) {
            this.adminClient = new NewReplicatorAdminClient(map, Time.SYSTEM, 0L, (String) null);
            this.closeAdminClient = true;
        }
        this.adminClient.setInterestedTopics(Collections.singleton(ConsumerTimestampsWriterDefaults.CONSUMER_TIMESTAMPS_TOPIC_NAME), null);
        ConsumerTimestampsWriterConfig config = ConsumerTimestampsWriterConfig.getConfig(map);
        Integer num = config.getInt(ConsumerTimestampsWriterConfig.TIMESTAMPS_TOPIC_NUM_PARTITIONS_CONFIG);
        Short sh = config.getShort(ConsumerTimestampsWriterConfig.TIMESTAMPS_TOPIC_REPLICATION_FACTOR_CONFIG);
        this.whitelistTopics = config.getTopics();
        this.topicPattern = config.getTopicPattern();
        this.blacklistTopics = config.getBlacklistTopics();
        if (!topicExists()) {
            try {
                createTopic(Optional.of(num), Optional.of(sh));
            } catch (Exception e) {
                Throwable th2 = e;
                while (true) {
                    th = th2;
                    if (th.getCause() == null) {
                        break;
                    } else {
                        th2 = th.getCause();
                    }
                }
                if (!(th instanceof InvalidReplicationFactorException)) {
                    log.error("Failed to create topic __consumer_timestamps in the source cluster", e);
                    throw e;
                }
                log.warn("Failed to create topic __consumer_timestamps with replication factor {}. Attempting again with replication factor 1", sh);
                try {
                    createTopic(Optional.of(num), Optional.of((short) 1));
                } catch (Exception e2) {
                    log.error("Failed to create topic __consumer_timestamps in the source cluster", e2);
                    throw e2;
                }
            }
        }
        this.producer = createProducer(map);
    }

    protected boolean topicExists() {
        return this.adminClient.topicExists(ConsumerTimestampsWriterDefaults.CONSUMER_TIMESTAMPS_TOPIC_NAME);
    }

    protected boolean createTopic(Optional<Integer> optional, Optional<Short> optional2) {
        try {
            Properties properties = new Properties();
            properties.put("cleanup.policy", "compact");
            properties.put("segment.bytes", String.valueOf(104857600L));
            return this.adminClient.createTopic(ConsumerTimestampsWriterDefaults.CONSUMER_TIMESTAMPS_TOPIC_NAME, optional, optional2, properties);
        } catch (Exception e) {
            throw toRuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Set<String> whitelistTopics() {
        return this.whitelistTopics;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Pattern topicPattern() {
        return this.topicPattern;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Set<String> blacklistTopics() {
        return this.blacklistTopics;
    }

    private RuntimeException toRuntimeException(Exception exc) {
        return exc instanceof RuntimeException ? (RuntimeException) exc : new RuntimeException(exc);
    }

    private Producer<GroupTopicPartition, TimestampAndDelta> createProducer(Map<String, ?> map) {
        HashMap hashMap = new HashMap(map);
        hashMap.putAll(ConsumerTimestampsWriterDefaults.PRODUCER_CONFIG_DEFAULTS);
        hashMap.putAll(withPrefix(map, ConsumerTimestampsWriterConfig.TIMESTAMPS_PRODUCER_PREFIX, Collections.emptySet()));
        try {
            return new KafkaProducer(hashMap);
        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        }
    }

    private static Map<String, Object> withPrefix(Map<String, ?> map, String str, Set<String> set) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, ?> entry : map.entrySet()) {
            if (entry.getKey().startsWith(str)) {
                String substring = entry.getKey().substring(str.length());
                if (!set.contains(substring)) {
                    hashMap.put(substring, entry.getValue());
                }
            }
        }
        return hashMap;
    }

    public Future<RecordMetadata> send(GroupTopicPartition groupTopicPartition, TimestampAndDelta timestampAndDelta) {
        return send(groupTopicPartition, timestampAndDelta, null);
    }

    public Future<RecordMetadata> send(GroupTopicPartition groupTopicPartition, TimestampAndDelta timestampAndDelta, Callback callback) {
        return this.producer.send(new ProducerRecord(ConsumerTimestampsWriterDefaults.CONSUMER_TIMESTAMPS_TOPIC_NAME, groupTopicPartition, timestampAndDelta), callback);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        try {
            this.producer.close();
        } catch (KafkaException e) {
            log.error("Failed to stop ConsumerTimestampsWriter producer", e);
        }
        try {
            if (this.closeAdminClient) {
                this.adminClient.close();
            }
        } catch (KafkaException e2) {
            log.error("Failed to stop ConsumerTimestampsWriter admin client", e2);
        }
    }
}
