package io.confluent.connect.replicator;

import io.confluent.connect.replicator.KafkaConfigs;
import io.confluent.connect.replicator.offsets.ConsumerOffsetsTopicCommitter;
import io.confluent.connect.replicator.offsets.ConsumerOffsetsTranslator;
import io.confluent.connect.replicator.offsets.ConsumerTimestampsCommitter;
import io.confluent.connect.replicator.schemas.SchemaTranslator;
import io.confluent.connect.replicator.util.NewReplicatorAdminClient;
import io.confluent.connect.replicator.util.ReplicatorAdminClient;
import io.confluent.connect.replicator.util.ReplicatorAdminClientWithZk;
import io.confluent.connect.replicator.util.TopicMetadata;
import io.confluent.connect.replicator.util.TranslatorMonitor;
import io.confluent.connect.replicator.util.Utils;
import io.confluent.connect.replicator.util.Version;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import kafka.log.LogConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/replicator/ReplicatorSourceTask.class */
public class ReplicatorSourceTask extends SourceTask {
    public static final String REPLICATOR_ID_HEADER = "__replicator_id";
    private final Time time;
    private String taskId;
    private ReplicatorSourceTaskConfig config;
    private Converter sourceKeyConverter;
    private Converter sourceValueConverter;
    private ReplicatorAdminClient sourceAdminClient;
    private ReplicatorAdminClient destAdminClient;
    private final TranslatorMonitor translatorMonitor;
    private String sourceClusterId;
    private String destClusterId;
    private final Set<String> sourceTopicsNeedingExpansion;
    private Long retryTopicExpansionDeadline;
    private final Set<String> managedSourceTopics;
    private Long topicConfigCheckDeadline;
    private volatile Long tryResumeDeadline;
    static final long RETRY_RESUME_FREQUENCY_MS = 5000;
    static final long METADATA_MAX_AGE_MS = 30000;
    static final long READY_STATE_MAX_AGE_MS = 10000;
    private volatile Consumer<byte[], byte[]> consumer;
    private Map<String, Translator> translators;
    private ConsumerTimestampsCommitter timestampsCommitter;
    private ConsumerOffsetsTopicCommitter offsetTopicCommitter;
    private volatile boolean isStarting;
    private HeaderConverter converter;
    private List<FilterOverride> filterOverrides;
    private static final Logger log = LoggerFactory.getLogger(ReplicatorSourceTask.class);
    public static final Pattern PROVENANCE_HEADER_PATTERN = Pattern.compile("([^,]+),([^,]+),([^,]+)");
    public static final Pattern FILTER_OVERRIDE_PATTERN = Pattern.compile(ReplicatorSourceConnectorConfig.FILTER_OVERRIDE_PATTERN);

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/confluent/connect/replicator/ReplicatorSourceTask$FilterOverride.class */
    public static class FilterOverride {
        private Pattern clusterId;
        private Pattern topic;
        private long startTsInclusive;
        private long endTsExclusive;

        public FilterOverride(Matcher matcher) {
            this.clusterId = Pattern.compile(matcher.group(1));
            this.topic = Pattern.compile(matcher.group(2));
            try {
                this.startTsInclusive = Long.parseLong(matcher.group(3));
            } catch (NumberFormatException e) {
                this.startTsInclusive = 0L;
            }
            try {
                this.endTsExclusive = Long.parseLong(matcher.group(4));
            } catch (NumberFormatException e2) {
                this.endTsExclusive = Long.MAX_VALUE;
            }
        }

        public Pattern clusterId() {
            return this.clusterId;
        }

        public Pattern topic() {
            return this.topic;
        }

        public long startTsInclusive() {
            return this.startTsInclusive;
        }

        public long endTsExclusive() {
            return this.endTsExclusive;
        }

        public boolean matches(ProvenanceHeader provenanceHeader) {
            Long ts = provenanceHeader.ts();
            return (ts == null || (ts.longValue() > (-1L) ? 1 : (ts.longValue() == (-1L) ? 0 : -1)) == 0 || ((ts.longValue() > this.startTsInclusive ? 1 : (ts.longValue() == this.startTsInclusive ? 0 : -1)) >= 0 && (ts.longValue() > this.endTsExclusive ? 1 : (ts.longValue() == this.endTsExclusive ? 0 : -1)) < 0)) && (provenanceHeader.topic() == null || this.topic.matcher(provenanceHeader.topic()).matches()) && this.clusterId.matcher(provenanceHeader.clusterId()).matches();
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            FilterOverride filterOverride = (FilterOverride) obj;
            return Objects.equals(this.clusterId, filterOverride.clusterId) && Objects.equals(this.topic, filterOverride.topic) && Objects.equals(Long.valueOf(this.startTsInclusive), Long.valueOf(filterOverride.startTsInclusive)) && Objects.equals(Long.valueOf(this.endTsExclusive), Long.valueOf(filterOverride.endTsExclusive));
        }

        public int hashCode() {
            return Objects.hash(this.clusterId, this.topic, Long.valueOf(this.startTsInclusive), Long.valueOf(this.endTsExclusive));
        }

        public String toString() {
            return this.clusterId + "," + this.topic + "," + this.startTsInclusive + "-" + this.endTsExclusive;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/confluent/connect/replicator/ReplicatorSourceTask$ProvenanceHeader.class */
    public static class ProvenanceHeader {
        private String clusterId;
        private String topic;
        private Long ts;
        private boolean valid;

        public ProvenanceHeader(String str, String str2, Long l, boolean z) {
            this.clusterId = str;
            this.topic = str2;
            this.ts = l;
            this.valid = z;
        }

        public String clusterId() {
            return this.clusterId;
        }

        public String topic() {
            return this.topic;
        }

        public Long ts() {
            return this.ts;
        }

        public boolean isValid() {
            return this.valid;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            ProvenanceHeader provenanceHeader = (ProvenanceHeader) obj;
            return this.valid == provenanceHeader.valid && Objects.equals(this.clusterId, provenanceHeader.clusterId) && Objects.equals(this.topic, provenanceHeader.topic) && Objects.equals(this.ts, provenanceHeader.ts);
        }

        public int hashCode() {
            return Objects.hash(this.clusterId, this.topic, this.ts, Boolean.valueOf(this.valid));
        }

        public String toString() {
            return this.clusterId + "," + this.topic + "," + this.ts;
        }
    }

    public ReplicatorSourceTask() {
        this(null, null, null, Time.SYSTEM, null, null, null, null, new TranslatorMonitor(READY_STATE_MAX_AGE_MS), null, null);
    }

    ReplicatorSourceTask(ReplicatorSourceTaskConfig replicatorSourceTaskConfig, SourceTaskContext sourceTaskContext, String str, Time time, Consumer<byte[], byte[]> consumer, ConsumerOffsetsTranslator consumerOffsetsTranslator, ReplicatorAdminClient replicatorAdminClient, ReplicatorAdminClient replicatorAdminClient2, TranslatorMonitor translatorMonitor, Converter converter, Converter converter2) {
        this(replicatorSourceTaskConfig, sourceTaskContext, str, time, consumer, consumerOffsetsTranslator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, converter, converter2, null);
    }

    ReplicatorSourceTask(ReplicatorSourceTaskConfig replicatorSourceTaskConfig, SourceTaskContext sourceTaskContext, String str, Time time, Consumer<byte[], byte[]> consumer, ConsumerOffsetsTranslator consumerOffsetsTranslator, ReplicatorAdminClient replicatorAdminClient, ReplicatorAdminClient replicatorAdminClient2, TranslatorMonitor translatorMonitor, Converter converter, Converter converter2, ConsumerTimestampsCommitter consumerTimestampsCommitter) {
        this.sourceTopicsNeedingExpansion = new HashSet();
        this.managedSourceTopics = new HashSet();
        this.translators = new HashMap();
        this.isStarting = false;
        this.filterOverrides = new ArrayList();
        this.config = replicatorSourceTaskConfig;
        this.context = sourceTaskContext;
        this.taskId = str;
        this.time = time;
        this.consumer = consumer;
        if (consumerOffsetsTranslator != null) {
            this.translators.put(consumerOffsetsTranslator.topic(), consumerOffsetsTranslator);
        }
        this.sourceAdminClient = replicatorAdminClient;
        this.destAdminClient = replicatorAdminClient2;
        this.translatorMonitor = translatorMonitor;
        this.sourceKeyConverter = converter;
        this.sourceValueConverter = converter2;
        this.timestampsCommitter = consumerTimestampsCommitter;
        this.retryTopicExpansionDeadline = null;
        this.tryResumeDeadline = null;
    }

    ReplicatorSourceTask(ReplicatorSourceTaskConfig replicatorSourceTaskConfig, SourceTaskContext sourceTaskContext, String str, Time time, Consumer<byte[], byte[]> consumer, ConsumerOffsetsTranslator consumerOffsetsTranslator, ReplicatorAdminClient replicatorAdminClient, ReplicatorAdminClient replicatorAdminClient2, TranslatorMonitor translatorMonitor, Converter converter, Converter converter2, ConsumerTimestampsCommitter consumerTimestampsCommitter, ConsumerOffsetsTopicCommitter consumerOffsetsTopicCommitter) {
        this.sourceTopicsNeedingExpansion = new HashSet();
        this.managedSourceTopics = new HashSet();
        this.translators = new HashMap();
        this.isStarting = false;
        this.filterOverrides = new ArrayList();
        this.config = replicatorSourceTaskConfig;
        this.context = sourceTaskContext;
        this.taskId = str;
        this.time = time;
        this.consumer = consumer;
        if (consumerOffsetsTranslator != null) {
            this.translators.put(consumerOffsetsTranslator.topic(), consumerOffsetsTranslator);
        }
        this.sourceAdminClient = replicatorAdminClient;
        this.destAdminClient = replicatorAdminClient2;
        this.translatorMonitor = translatorMonitor;
        this.sourceKeyConverter = converter;
        this.sourceValueConverter = converter2;
        this.timestampsCommitter = consumerTimestampsCommitter;
        this.offsetTopicCommitter = consumerOffsetsTopicCommitter;
        this.retryTopicExpansionDeadline = null;
        this.tryResumeDeadline = null;
    }

    public String version() {
        return Version.getVersion();
    }

    public synchronized void start(Map<String, String> map) {
        try {
            this.config = taskConfig(map);
            this.taskId = this.config.getTaskId();
            this.sourceKeyConverter = this.config.getSourceKeyConverter();
            this.sourceValueConverter = this.config.getSourceValueConverter();
            String string = this.config.getString(ReplicatorSourceConnectorConfig.SRC_ZOOKEEPER_CONNECT_CONFIG);
            String string2 = this.config.getString(ReplicatorSourceConnectorConfig.DST_ZOOKEEPER_CONNECT_CONFIG);
            boolean z = string == null || string.isEmpty();
            this.destAdminClient = destAdminClient(string2 == null || string2.isEmpty());
            this.sourceAdminClient = srcAdminClient(z);
            if (this.config.isProvenanceHeaderEnabled()) {
                setClusterIds();
                parseFilterOverrides(this.config.getString(ReplicatorSourceConnectorConfig.PROVENANCE_HEADER_FILTER_OVERRIDES_CONFIG), this.filterOverrides);
            }
            this.consumer = buildSourceConsumer(this.config);
            ConsumerOffsetsTranslator consumerOffsetsTranslator = new ConsumerOffsetsTranslator((Map<String, String>) this.config.originalsStrings(), this.taskId, this.time, this.config.getOffsetTranslatorBatchPeriodMs(), this.config.getOffsetTranslatorBatchSize());
            this.translators.put(consumerOffsetsTranslator.topic(), consumerOffsetsTranslator);
            if (this.config.isOffsetTimestampsCommitEnabled()) {
                String str = map.get("src.consumer.group.id");
                if (str == null) {
                    str = this.config.getName();
                }
                this.timestampsCommitter = new ConsumerTimestampsCommitter(str, this.config.originalsWithPrefix(KafkaConfigs.KafkaCluster.SOURCE.prefix()), this.sourceAdminClient);
            }
            if (this.config.isOffsetTopicCommitEnabled()) {
                this.offsetTopicCommitter = new ConsumerOffsetsTopicCommitter(this.consumer);
            }
            if (this.config.getSchemaRegistryTopic() != null) {
                log.info("Registering schema translator for topic {}", this.config.getSchemaRegistryTopic());
                this.translators.put(this.config.getSchemaRegistryTopic(), new SchemaTranslator(this.config, this.time));
            }
            this.converter = this.config.getSourceHeaderConverter();
            List partitions = this.config.getPartitionAssignment().partitions();
            doStart(partitions);
            log.info("Started kafka replicator task {} replicating partitions {}", this.taskId, partitions);
        } catch (ConfigException e) {
            throw new ConnectException("Failed to start Kafka replicator task due to configuration error", e);
        }
    }

    private void setClusterIds() {
        try {
            this.sourceClusterId = this.sourceAdminClient.clusterId();
        } catch (Exception e) {
            log.debug("Failed to obtain source cluster ID", e);
        }
        if (this.sourceClusterId == null) {
            throw new ConnectException("Failed to obtain source cluster ID, please restart the source Kafka cluster");
        }
        try {
            this.destClusterId = this.destAdminClient.clusterId();
        } catch (Exception e2) {
            log.debug("Failed to obtain destination cluster ID", e2);
        }
        if (this.destClusterId == null) {
            throw new ConnectException("Failed to obtain destination cluster ID, please restart the destination Kafka cluster");
        }
        if (this.sourceClusterId.equals(this.destClusterId)) {
            log.warn("The source and destination cluster IDs match. This is normal when replicating to different topics in the same cluster. Otherwise, check your source and destination cluster properties.");
        }
        log.info("Source cluster ID: {}", this.sourceClusterId);
        log.info("Destination cluster ID: {}", this.destClusterId);
    }

    protected static void parseFilterOverrides(String str, List<FilterOverride> list) {
        if (str == null || str.isEmpty()) {
            return;
        }
        Matcher matcher = FILTER_OVERRIDE_PATTERN.matcher(str);
        while (matcher.find()) {
            list.add(new FilterOverride(matcher));
        }
    }

    synchronized void doStart(Collection<TopicPartition> collection) {
        this.isStarting = true;
        try {
            this.consumer.assign(collection);
            watchTopicsForAssignedPartitions(collection);
            initializeAssignedPartitions(collection);
        } finally {
            this.isStarting = false;
        }
    }

    public synchronized List<SourceRecord> poll() throws InterruptedException {
        try {
            try {
                if (this.offsetTopicCommitter != null) {
                    this.offsetTopicCommitter.checkCommit();
                }
                retryTopicExpansionIfNeeded();
                updateTopicConfigsIfNeeded();
                maybeResumePartitions();
                List list = (List) this.translators.values().stream().map((v0) -> {
                    return v0.nextDeadline();
                }).collect(Collectors.toList());
                list.add(this.retryTopicExpansionDeadline);
                list.add(this.topicConfigCheckDeadline);
                list.add(this.tryResumeDeadline);
                long milliseconds = this.time.milliseconds();
                long max = Math.max(0L, Utils.nextDeadline((Long[]) list.toArray(new Long[list.size()])) - milliseconds);
                log.debug("Polling for records, waiting at most {} ms", Long.valueOf(max));
                ConsumerRecords<byte[], byte[]> poll = this.consumer.poll(max);
                if (log.isDebugEnabled()) {
                    Set<String> set = topicNamesFor(poll);
                    log.debug("Read {} records from {} topics: {}", new Object[]{Integer.valueOf(poll.count()), Integer.valueOf(set.size()), set});
                }
                boolean topicPreservePartitions = this.config.getTopicPreservePartitions();
                ArrayList arrayList = new ArrayList(poll.count());
                int i = 0;
                for (TopicPartition topicPartition : poll.partitions()) {
                    if (!skipRecordReplication(topicPartition, poll)) {
                        String str = topicPartition.topic();
                        String destTopic = toDestTopic(str);
                        int partition = topicPartition.partition();
                        Map<String, ?> connectPartition = Utils.toConnectPartition(str, partition);
                        for (ConsumerRecord consumerRecord : poll.records(topicPartition)) {
                            if (this.config.isProvenanceHeaderEnabled() && shouldFilterRecord(consumerRecord, this.destClusterId, destTopic, this.filterOverrides)) {
                                i++;
                                if (this.offsetTopicCommitter != null) {
                                    this.offsetTopicCommitter.commitRecord(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset());
                                }
                            } else {
                                Map<String, Object> connectOffset = Utils.toConnectOffset(consumerRecord.offset());
                                SchemaAndValue connectData = this.sourceKeyConverter.toConnectData(str, (byte[]) consumerRecord.key());
                                SchemaAndValue connectData2 = this.sourceValueConverter.toConnectData(str, (byte[]) consumerRecord.value());
                                arrayList.add(new SourceRecord(connectPartition, connectOffset, destTopic, topicPreservePartitions ? Integer.valueOf(partition) : null, connectData.schema(), connectData.value(), connectData2.schema(), connectData2.value(), timestampFromRecord(consumerRecord), toConnectHeaders(this.sourceClusterId, str, consumerRecord, this.converter, this.config.isProvenanceHeaderEnabled(), milliseconds)));
                            }
                        }
                    }
                }
                if (log.isDebugEnabled()) {
                    Logger logger = log;
                    Object[] objArr = new Object[3];
                    objArr[0] = Integer.valueOf(arrayList.size());
                    objArr[1] = this.config.isProvenanceHeaderEnabled() ? " and adding provenance headers" : ReplicatorSourceConnectorConfig.SCHEMA_REGISTRY_USER_INFO_DEFAULT;
                    objArr[2] = Integer.valueOf(i);
                    logger.debug("Replicating {} records{}, filtering {} records due to provenance headers", objArr);
                }
                translateCollectedRecords();
                return arrayList;
            } catch (OffsetOutOfRangeException e) {
                Map offsetOutOfRangePartitions = e.offsetOutOfRangePartitions();
                log.warn("Consumer from source cluster detected out of range partitions: {}", offsetOutOfRangePartitions);
                this.consumer.seekToBeginning(offsetOutOfRangePartitions.keySet());
                List<SourceRecord> emptyList = Collections.emptyList();
                translateCollectedRecords();
                return emptyList;
            } catch (WakeupException e2) {
                log.debug("Kafka replicator task {} woken up", this.taskId);
                List<SourceRecord> emptyList2 = Collections.emptyList();
                translateCollectedRecords();
                return emptyList2;
            }
        } catch (Throwable th) {
            translateCollectedRecords();
            throw th;
        }
    }

    private void translateCollectedRecords() {
        for (Translator translator : this.translators.values()) {
            if (translator.isDestinationReady()) {
                List<ConsumerRecord<byte[], byte[]>> translateCollectedRecords = translator.translateCollectedRecords();
                if (this.offsetTopicCommitter != null) {
                    this.offsetTopicCommitter.commitRecords(translateCollectedRecords);
                }
            } else {
                for (TopicPartition topicPartition : this.consumer.assignment()) {
                    if (translator.topic().equals(topicPartition.topic())) {
                        if (translator.seekToBeginningOnPause()) {
                            log.debug("Seeking to beginning of paused partition {} since destination is not ready yet", topicPartition);
                            this.consumer.seekToBeginning(Collections.singleton(topicPartition));
                        }
                        log.info("Pausing source partition {} since destination is not ready yet", topicPartition);
                        this.consumer.pause(Collections.singleton(topicPartition));
                        this.tryResumeDeadline = Long.valueOf(this.time.milliseconds() + RETRY_RESUME_FREQUENCY_MS);
                    }
                }
            }
        }
    }

    private boolean skipRecordReplication(TopicPartition topicPartition, ConsumerRecords<byte[], byte[]> consumerRecords) {
        Translator translator = this.translators.get(topicPartition.topic());
        if (translator == null) {
            return false;
        }
        translator.collect(consumerRecords.records(topicPartition));
        return true;
    }

    protected static Set<String> topicNamesFor(ConsumerRecords<byte[], byte[]> consumerRecords) {
        return consumerRecords.isEmpty() ? Collections.emptySet() : (Set) consumerRecords.partitions().stream().map((v0) -> {
            return v0.topic();
        }).collect(Collectors.toSet());
    }

    protected static Long timestampFromRecord(ConsumerRecord<byte[], byte[]> consumerRecord) {
        Long l;
        if (consumerRecord.timestamp() >= 0) {
            l = Long.valueOf(consumerRecord.timestamp());
        } else {
            if (consumerRecord.timestamp() != -1) {
                throw new CorruptRecordException(String.format("Invalid Record timestamp: %d", Long.valueOf(consumerRecord.timestamp())));
            }
            l = null;
        }
        return l;
    }

    protected static boolean shouldFilterRecord(ConsumerRecord<byte[], byte[]> consumerRecord, String str, String str2, List<FilterOverride> list) {
        boolean z = false;
        Iterator it = consumerRecord.headers().headers(REPLICATOR_ID_HEADER).iterator();
        while (it.hasNext()) {
            ProvenanceHeader parseProvenanceHeader = parseProvenanceHeader(((Header) it.next()).value(), consumerRecord);
            if (parseProvenanceHeader.isValid()) {
                if (str.equals(parseProvenanceHeader.clusterId()) && str2.equals(parseProvenanceHeader.topic())) {
                    if (matchesFilterOverride(parseProvenanceHeader, list)) {
                        return false;
                    }
                    log.trace("Found candidate filtered header {}", parseProvenanceHeader);
                    z = true;
                }
            } else {
                if (matchesFilterOverride(parseProvenanceHeader, list)) {
                    return false;
                }
                log.trace("Found candidate filtered header {}", parseProvenanceHeader);
                z = true;
            }
        }
        log.trace("No candidate filtered headers matched an override");
        return z;
    }

    protected static boolean matchesFilterOverride(ProvenanceHeader provenanceHeader, List<FilterOverride> list) {
        for (FilterOverride filterOverride : list) {
            if (filterOverride.matches(provenanceHeader)) {
                log.trace("Candidate filtered header {} matches override {}", provenanceHeader, filterOverride);
                return true;
            }
        }
        return false;
    }

    protected static ProvenanceHeader parseProvenanceHeader(byte[] bArr, ConsumerRecord<byte[], byte[]> consumerRecord) {
        long timestamp;
        String str = new String(bArr, StandardCharsets.UTF_8);
        Matcher matcher = PROVENANCE_HEADER_PATTERN.matcher(str);
        if (!matcher.matches()) {
            return new ProvenanceHeader(str, consumerRecord.topic(), Long.valueOf(consumerRecord.timestamp()), false);
        }
        String group = matcher.group(1);
        String group2 = matcher.group(2);
        try {
            timestamp = Long.parseLong(matcher.group(3));
        } catch (NumberFormatException e) {
            timestamp = consumerRecord.timestamp();
        }
        return new ProvenanceHeader(group, group2, Long.valueOf(timestamp), true);
    }

    protected static byte[] formatProvenanceHeader(String str, String str2, Long l) {
        StringBuilder sb = new StringBuilder();
        sb.append(str).append(",").append(str2).append(",").append(l);
        return sb.toString().getBytes(StandardCharsets.UTF_8);
    }

    protected static ConnectHeaders toConnectHeaders(String str, String str2, ConsumerRecord<byte[], byte[]> consumerRecord, HeaderConverter headerConverter, boolean z, long j) {
        Headers<Header> headers = consumerRecord.headers();
        ConnectHeaders connectHeaders = new ConnectHeaders();
        if (headers != null) {
            for (Header header : headers) {
                if (z && REPLICATOR_ID_HEADER.equals(header.key())) {
                    ProvenanceHeader parseProvenanceHeader = parseProvenanceHeader(header.value(), consumerRecord);
                    if (parseProvenanceHeader.isValid() && str.equals(parseProvenanceHeader.clusterId()) && str2.equals(parseProvenanceHeader.topic())) {
                    }
                }
                connectHeaders.add(header.key(), headerConverter.toConnectHeader(str2, header.key(), header.value()));
            }
        }
        if (z) {
            connectHeaders.add(REPLICATOR_ID_HEADER, formatProvenanceHeader(str, str2, Long.valueOf(j)), Schema.BYTES_SCHEMA);
        }
        return connectHeaders;
    }

    private void maybeResumePartitions() {
        long milliseconds = this.time.milliseconds();
        if (this.tryResumeDeadline == null) {
            return;
        }
        if (this.tryResumeDeadline.longValue() > milliseconds) {
            log.debug("Resuming at {} (in {} ms)", this.tryResumeDeadline, Long.valueOf(this.tryResumeDeadline.longValue() - milliseconds));
            return;
        }
        for (TopicPartition topicPartition : this.consumer.paused()) {
            Translator translator = this.translators.get(topicPartition.topic());
            if (translator == null || !translator.isDestinationReady()) {
                TopicPartition destPartition = toDestPartition(topicPartition);
                if (this.destAdminClient.partitionExists(destPartition)) {
                    log.debug("Resuming paused partition {} since partition {} now exists in the destination cluster", topicPartition, destPartition);
                    this.consumer.resume(Collections.singleton(topicPartition));
                }
            } else {
                log.debug("Resuming paused partition {} since destination is now ready", topicPartition);
                this.consumer.resume(Collections.singleton(topicPartition));
            }
        }
        if (this.consumer.paused().isEmpty()) {
            this.tryResumeDeadline = null;
        } else {
            this.tryResumeDeadline = Long.valueOf(this.time.milliseconds() + RETRY_RESUME_FREQUENCY_MS);
            log.debug("Setting resume deadline to {} (in {} ms)", this.tryResumeDeadline, Long.valueOf(RETRY_RESUME_FREQUENCY_MS));
        }
    }

    private void watchTopicsForAssignedPartitions(Collection<TopicPartition> collection) {
        HashSet hashSet = new HashSet();
        ArrayList arrayList = new ArrayList();
        for (TopicPartition topicPartition : collection) {
            Translator translator = this.translators.get(topicPartition.topic());
            if (translator != null) {
                arrayList.add(translator);
            } else {
                hashSet.add(toDestTopic(topicPartition.topic()));
            }
        }
        this.translatorMonitor.setInterestedTranslators(arrayList, () -> {
            if (this.tryResumeDeadline == null) {
                wakeupConsumer();
            }
        });
        this.destAdminClient.setInterestedTopics(hashSet, this::wakeupConsumer);
    }

    private void wakeupConsumer() {
        this.tryResumeDeadline = Long.valueOf(this.time.milliseconds());
        if (this.isStarting) {
            return;
        }
        this.consumer.wakeup();
    }

    private void initializeAssignedPartitions(Collection<TopicPartition> collection) {
        for (TopicPartition topicPartition : collection) {
            Translator translator = this.translators.get(topicPartition.topic());
            if (translator != null) {
                pauseOrSeekSourcePartition(translator.isDestinationReady(), "translator for " + translator.topic(), topicPartition, translator.seekToBeginningOnPause());
            } else {
                TopicPartition destPartition = toDestPartition(topicPartition);
                if (topicPartition.partition() == 0) {
                    String str = topicPartition.topic();
                    if (isDestTopicExpansionNeeded(str)) {
                        this.sourceTopicsNeedingExpansion.add(str);
                    }
                    if (this.config.getTopicConfigSync()) {
                        this.managedSourceTopics.add(str);
                    }
                }
                pauseOrSeekSourcePartition(!this.config.getTopicPreservePartitions() || this.destAdminClient.partitionExists(destPartition), "partition " + destPartition, topicPartition, false);
            }
        }
    }

    private void pauseOrSeekSourcePartition(boolean z, String str, TopicPartition topicPartition, boolean z2) {
        if (z) {
            seekSourcePartition(topicPartition);
            return;
        }
        if (z2) {
            this.consumer.seekToBeginning(Collections.singleton(topicPartition));
        } else {
            seekSourcePartition(topicPartition);
        }
        log.info("Seeking to the beginning and pausing source partition {} since destination {} is not ready yet", topicPartition, str);
        this.consumer.pause(Collections.singleton(topicPartition));
        this.tryResumeDeadline = Long.valueOf(this.time.milliseconds() + RETRY_RESUME_FREQUENCY_MS);
    }

    private void seekSourcePartition(TopicPartition topicPartition) {
        Map<String, Object> offset = this.context.offsetStorageReader().offset(Utils.toConnectPartition(topicPartition));
        OffsetAndMetadata committed = this.consumer.committed(topicPartition);
        switch (this.config.getOffsetStart()) {
            case CONNECT:
                if (offset != null) {
                    seekToConnectOffset(topicPartition, offset);
                    return;
                } else if (committed != null) {
                    seekToConsumerOffset(topicPartition, committed);
                    return;
                } else {
                    seekToBeginning(topicPartition);
                    return;
                }
            case CONSUMER:
                if (committed != null) {
                    seekToConsumerOffset(topicPartition, committed);
                    return;
                } else if (offset != null) {
                    seekToConnectOffset(topicPartition, offset);
                    return;
                } else {
                    seekToBeginning(topicPartition);
                    return;
                }
            default:
                throw new IllegalArgumentException("Unsupported offset start type");
        }
    }

    private void seekToConnectOffset(TopicPartition topicPartition, Map<String, Object> map) {
        long longValue = ((Long) map.get(Utils.OFFSET)).longValue();
        log.debug("Seeking to offset {} committed by Connect for source partition {}", Long.valueOf(longValue), topicPartition);
        this.consumer.seek(topicPartition, longValue + 1);
    }

    private void seekToConsumerOffset(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
        log.debug("Using consumer committed offset {} for source partition {}", Long.valueOf(offsetAndMetadata.offset()), topicPartition);
    }

    private void seekToBeginning(TopicPartition topicPartition) {
        log.debug("Seeking to the beginning of source partition {}", topicPartition);
        this.consumer.seekToBeginning(Collections.singleton(topicPartition));
    }

    private TopicPartition toDestPartition(TopicPartition topicPartition) {
        return new TopicPartition(toDestTopic(topicPartition.topic()), topicPartition.partition());
    }

    private String toDestTopic(String str) {
        return Utils.renameTopic(this.config.getTopicRenameFormat(), str);
    }

    private void retryTopicExpansionIfNeeded() {
        if (this.sourceTopicsNeedingExpansion.isEmpty()) {
            this.retryTopicExpansionDeadline = null;
            return;
        }
        if (this.retryTopicExpansionDeadline == null || this.retryTopicExpansionDeadline.longValue() < this.time.milliseconds()) {
            HashSet hashSet = new HashSet();
            for (String str : this.sourceTopicsNeedingExpansion) {
                if (maybeCreateOrExpandDestTopic(str)) {
                    hashSet.add(str);
                }
            }
            this.sourceTopicsNeedingExpansion.removeAll(hashSet);
            this.retryTopicExpansionDeadline = Long.valueOf(this.time.milliseconds() + this.config.getTopicCreateBackoffMs());
        }
    }

    private void overrideTimestampType(Properties properties) {
        properties.setProperty(LogConfig.MessageTimestampTypeProp(), this.config.getTopicTimestampType());
    }

    private void updateTopicConfigsIfNeeded() {
        if (this.managedSourceTopics.isEmpty()) {
            this.topicConfigCheckDeadline = null;
            return;
        }
        if (this.topicConfigCheckDeadline == null || this.time.milliseconds() >= this.topicConfigCheckDeadline.longValue()) {
            log.debug("Verifying topic configuration for topics {} for replicator task {}", this.managedSourceTopics, this.taskId);
            for (String str : this.managedSourceTopics) {
                String destTopic = toDestTopic(str);
                if (this.destAdminClient.topicExists(destTopic)) {
                    try {
                        Properties properties = this.sourceAdminClient.topicConfig(str);
                        overrideTimestampType(properties);
                        if (!properties.equals(this.destAdminClient.topicConfig(destTopic))) {
                            log.info("Updating configuration of topic {} with properties {}", destTopic, properties);
                            this.destAdminClient.changeTopicConfig(destTopic, properties);
                        }
                    } catch (InterruptedException | RuntimeException | ExecutionException e) {
                        log.warn("Failed topic configuration check for topic {} on task {}. Will retry later.", new Object[]{destTopic, this.taskId, e});
                    }
                }
            }
            this.topicConfigCheckDeadline = Long.valueOf(this.time.milliseconds() + this.config.getTopicConfigSyncIntervalMs());
        }
    }

    private boolean isDestTopicExpansionNeeded(String str) {
        List partitionsFor;
        boolean topicAutoCreate = this.config.getTopicAutoCreate();
        boolean topicPreservePartitions = this.config.getTopicPreservePartitions();
        if ((!topicAutoCreate && !topicPreservePartitions) || (partitionsFor = this.consumer.partitionsFor(str)) == null) {
            return false;
        }
        int size = partitionsFor.size();
        TopicMetadata topicMetadata = this.destAdminClient.topicMetadata(toDestTopic(str));
        return topicMetadata == null ? topicAutoCreate : topicPreservePartitions && size > topicMetadata.numPartitions();
    }

    private boolean maybeCreateOrExpandDestTopic(String str) {
        String destTopic = toDestTopic(str);
        try {
            List partitionsFor = this.consumer.partitionsFor(str);
            int size = partitionsFor.size();
            TopicMetadata topicMetadata = this.destAdminClient.topicMetadata(destTopic);
            if (!this.config.getTopicAutoCreate() || topicMetadata != null) {
                if (!this.config.getTopicPreservePartitions() || topicMetadata == null || size <= topicMetadata.numPartitions()) {
                    return true;
                }
                log.info("Increasing number of partitions of topic {} from {} to {} in the destination cluster", new Object[]{destTopic, Integer.valueOf(topicMetadata.numPartitions()), Integer.valueOf(size)});
                this.destAdminClient.addPartitions(destTopic, size);
                return true;
            }
            Properties properties = this.sourceAdminClient.topicConfig(str);
            overrideTimestampType(properties);
            int length = ((PartitionInfo) partitionsFor.get(0)).replicas().length;
            int aliveBrokers = this.destAdminClient.aliveBrokers();
            if (length > aliveBrokers) {
                log.warn("Unable to create topic {} in the destination cluster because the source replication factor {} is greater than the number of active brokers {}", new Object[]{destTopic, Integer.valueOf(length), Integer.valueOf(aliveBrokers)});
                return false;
            }
            log.info("Creating topic {} in destination cluster with {} partitions and replication factor {}", new Object[]{destTopic, Integer.valueOf(size), Integer.valueOf(length)});
            return this.destAdminClient.createTopic(destTopic, size, length, properties);
        } catch (InterruptedException | RuntimeException | ExecutionException e) {
            log.warn("Failed to create or expand topic {} in the destination cluster. Will try again later.", destTopic, e);
            return false;
        }
    }

    public void commitRecord(SourceRecord sourceRecord) throws InterruptedException {
        if (this.timestampsCommitter != null) {
            this.timestampsCommitter.commitRecord(sourceRecord);
        }
        if (this.offsetTopicCommitter != null) {
            this.offsetTopicCommitter.commitRecord(sourceRecord);
        }
    }

    public void commit() throws InterruptedException {
        if (this.timestampsCommitter != null) {
            this.timestampsCommitter.commit();
        }
        if (this.offsetTopicCommitter != null) {
            this.offsetTopicCommitter.commit();
        }
    }

    public void stop() {
        log.info("Closing kafka replicator task {}", this.taskId);
        if (this.consumer != null) {
            this.consumer.wakeup();
            synchronized (this) {
                AtomicReference atomicReference = new AtomicReference();
                Utils.closeQuietly(this.sourceAdminClient, "source admin client", atomicReference);
                Utils.closeQuietly(this.destAdminClient, "destination admin client", atomicReference);
                Utils.closeQuietly(this.consumer, "consumer", atomicReference);
                if (this.timestampsCommitter != null) {
                    this.timestampsCommitter.stop();
                }
                if (this.translatorMonitor != null) {
                    this.translatorMonitor.close();
                }
            }
        }
    }

    private Consumer<byte[], byte[]> buildSourceConsumer(ReplicatorSourceTaskConfig replicatorSourceTaskConfig) {
        if (this.consumer != null) {
            return this.consumer;
        }
        HashMap hashMap = new HashMap();
        hashMap.putAll(replicatorSourceTaskConfig.getSourceConsumerConfigs());
        if (!hashMap.containsKey("group.id")) {
            hashMap.put("group.id", replicatorSourceTaskConfig.getName());
        }
        if (!hashMap.containsKey("client.id")) {
            hashMap.put("client.id", this.taskId);
        }
        hashMap.put("enable.auto.commit", false);
        hashMap.put("auto.offset.reset", "none");
        log.debug("Initializing Replicator Task Connector in Group");
        return new KafkaConsumer(hashMap, new ByteArrayDeserializer(), new ByteArrayDeserializer());
    }

    private ReplicatorSourceTaskConfig taskConfig(Map<String, String> map) {
        return this.config != null ? this.config : new ReplicatorSourceTaskConfig(map);
    }

    private ReplicatorAdminClient srcAdminClient(boolean z) {
        return this.sourceAdminClient != null ? this.sourceAdminClient : z ? new NewReplicatorAdminClient(this.config.srcAdminClientConfig(), this.time, METADATA_MAX_AGE_MS, this.taskId) : new ReplicatorAdminClientWithZk(this.config.buildSrcZkUtils(), this.time, METADATA_MAX_AGE_MS);
    }

    private ReplicatorAdminClient destAdminClient(boolean z) {
        return this.destAdminClient != null ? this.destAdminClient : z ? new NewReplicatorAdminClient(this.config.dstAdminClientConfig(), this.time, METADATA_MAX_AGE_MS, this.taskId) : new ReplicatorAdminClientWithZk(this.config.buildDestZkUtils(), this.time, METADATA_MAX_AGE_MS);
    }
}
