package pl.allegro.tech.hermes.infrastructure.zookeeper;

import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.exception.InternalProcessingException;
import pl.allegro.tech.hermes.common.kafka.KafkaTopic;
import pl.allegro.tech.hermes.common.kafka.KafkaTopicName;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.common.kafka.offset.SubscriptionOffsetChangeIndicator;
import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository;

/* loaded from: input_file:pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperSubscriptionOffsetChangeIndicator.class */
public class ZookeeperSubscriptionOffsetChangeIndicator implements SubscriptionOffsetChangeIndicator {
    private static final Logger logger = LoggerFactory.getLogger(ZookeeperSubscriptionOffsetChangeIndicator.class);
    private final CuratorFramework zookeeper;
    private final ZookeeperPaths paths;
    private final SubscriptionRepository subscriptionRepository;

    public ZookeeperSubscriptionOffsetChangeIndicator(CuratorFramework curatorFramework, ZookeeperPaths zookeeperPaths, SubscriptionRepository subscriptionRepository) {
        this.zookeeper = curatorFramework;
        this.paths = zookeeperPaths;
        this.subscriptionRepository = subscriptionRepository;
    }

    @Override // pl.allegro.tech.hermes.common.kafka.offset.SubscriptionOffsetChangeIndicator
    public void setSubscriptionOffset(TopicName topicName, String str, String str2, PartitionOffset partitionOffset) {
        this.subscriptionRepository.ensureSubscriptionExists(topicName, str);
        String offsetPath = this.paths.offsetPath(topicName, str, partitionOffset.getTopic(), str2, partitionOffset.getPartition());
        try {
            byte[] bytes = String.valueOf(partitionOffset.getOffset()).getBytes(StandardCharsets.UTF_8);
            if (this.zookeeper.checkExists().forPath(offsetPath) == null) {
                this.zookeeper.create().creatingParentsIfNeeded().forPath(offsetPath, bytes);
            } else {
                this.zookeeper.setData().forPath(offsetPath, bytes);
            }
        } catch (Exception e) {
            throw new InternalProcessingException(e);
        }
    }

    @Override // pl.allegro.tech.hermes.common.kafka.offset.SubscriptionOffsetChangeIndicator
    public PartitionOffsets getSubscriptionOffsets(TopicName topicName, String str, String str2) {
        this.subscriptionRepository.ensureSubscriptionExists(topicName, str);
        String subscribedKafkaTopicsPath = this.paths.subscribedKafkaTopicsPath(topicName, str);
        PartitionOffsets partitionOffsets = new PartitionOffsets();
        getZookeeperChildrenForPath(subscribedKafkaTopicsPath).stream().map(KafkaTopicName::valueOf).forEach(kafkaTopicName -> {
            partitionOffsets.addAll(getOffsetsForKafkaTopic(topicName, kafkaTopicName, str, str2));
        });
        return partitionOffsets;
    }

    @Override // pl.allegro.tech.hermes.common.kafka.offset.SubscriptionOffsetChangeIndicator
    public boolean areOffsetsMoved(TopicName topicName, String str, String str2, KafkaTopic kafkaTopic, List<Integer> list) {
        return list.stream().allMatch(num -> {
            return offsetDoesNotExist(topicName, str, str2, num, kafkaTopic);
        });
    }

    @Override // pl.allegro.tech.hermes.common.kafka.offset.SubscriptionOffsetChangeIndicator
    public void removeOffset(TopicName topicName, String str, String str2, KafkaTopicName kafkaTopicName, int i) {
        try {
            this.zookeeper.delete().guaranteed().deletingChildrenIfNeeded().forPath(this.paths.offsetPath(topicName, str, kafkaTopicName, str2, i));
        } catch (Exception e) {
            throw new InternalProcessingException(e);
        }
    }

    private boolean offsetDoesNotExist(TopicName topicName, String str, String str2, Integer num, KafkaTopic kafkaTopic) {
        String offsetPath = this.paths.offsetPath(topicName, str, kafkaTopic.name(), str2, num.intValue());
        try {
            boolean z = this.zookeeper.checkExists().forPath(offsetPath) == null;
            if (!z) {
                logger.info("Leftover on path {}", offsetPath);
            }
            return z;
        } catch (Exception e) {
            throw new InternalProcessingException(e);
        }
    }

    private PartitionOffsets getOffsetsForKafkaTopic(TopicName topicName, KafkaTopicName kafkaTopicName, String str, String str2) {
        String offsetsPath = this.paths.offsetsPath(topicName, str, kafkaTopicName, str2);
        PartitionOffsets partitionOffsets = new PartitionOffsets();
        Iterator<String> it = getZookeeperChildrenForPath(offsetsPath).iterator();
        while (it.hasNext()) {
            Integer valueOf = Integer.valueOf(it.next());
            partitionOffsets.add(new PartitionOffset(kafkaTopicName, getOffsetForPartition(topicName, kafkaTopicName, str, str2, valueOf.intValue()).longValue(), valueOf.intValue()));
        }
        return partitionOffsets;
    }

    private List<String> getZookeeperChildrenForPath(String str) {
        try {
            return (List) this.zookeeper.getChildren().forPath(str);
        } catch (Exception e) {
            throw new InternalProcessingException(e);
        }
    }

    private Long getOffsetForPartition(TopicName topicName, KafkaTopicName kafkaTopicName, String str, String str2, int i) {
        try {
            return Long.valueOf(new String((byte[]) this.zookeeper.getData().forPath(this.paths.offsetPath(topicName, str, kafkaTopicName, str2, i)), StandardCharsets.UTF_8));
        } catch (Exception e) {
            throw new InternalProcessingException(e);
        }
    }
}
