package pl.allegro.tech.hermes.infrastructure.zookeeper;

import com.google.common.base.Charsets;
import java.util.Iterator;
import java.util.List;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.EnsurePath;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.exception.InternalProcessingException;
import pl.allegro.tech.hermes.common.kafka.KafkaTopicName;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.common.kafka.offset.SubscriptionOffsetChangeIndicator;
import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository;

/* loaded from: input_file:pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperSubscriptionOffsetChangeIndicator.class */
public class ZookeeperSubscriptionOffsetChangeIndicator implements SubscriptionOffsetChangeIndicator {
    private final CuratorFramework zookeeper;
    private final ZookeeperPaths paths;
    private final SubscriptionRepository subscriptionRepository;

    public ZookeeperSubscriptionOffsetChangeIndicator(CuratorFramework curatorFramework, ZookeeperPaths zookeeperPaths, SubscriptionRepository subscriptionRepository) {
        this.zookeeper = curatorFramework;
        this.paths = zookeeperPaths;
        this.subscriptionRepository = subscriptionRepository;
    }

    @Override // pl.allegro.tech.hermes.common.kafka.offset.SubscriptionOffsetChangeIndicator
    public void setSubscriptionOffset(TopicName topicName, String str, String str2, PartitionOffset partitionOffset) {
        this.subscriptionRepository.ensureSubscriptionExists(topicName, str);
        String offsetPath = this.paths.offsetPath(topicName, str, partitionOffset.getTopic(), str2, partitionOffset.getPartition());
        try {
            new EnsurePath(offsetPath).ensure(this.zookeeper.getZookeeperClient());
            this.zookeeper.setData().forPath(offsetPath, String.valueOf(partitionOffset.getOffset()).getBytes(Charsets.UTF_8));
        } catch (Exception e) {
            throw new InternalProcessingException(e);
        }
    }

    @Override // pl.allegro.tech.hermes.common.kafka.offset.SubscriptionOffsetChangeIndicator
    public PartitionOffsets getSubscriptionOffsets(Topic topic, String str, String str2) {
        this.subscriptionRepository.ensureSubscriptionExists(topic.getName(), str);
        String subscribedKafkaTopicsPath = this.paths.subscribedKafkaTopicsPath(topic.getName(), str);
        PartitionOffsets partitionOffsets = new PartitionOffsets();
        getZookeeperChildrenForPath(subscribedKafkaTopicsPath).stream().map(KafkaTopicName::valueOf).forEach(kafkaTopicName -> {
            partitionOffsets.addAll(getOffsetsForKafkaTopic(topic, kafkaTopicName, str, str2));
        });
        return partitionOffsets;
    }

    private PartitionOffsets getOffsetsForKafkaTopic(Topic topic, KafkaTopicName kafkaTopicName, String str, String str2) {
        String offsetsPath = this.paths.offsetsPath(topic.getName(), str, kafkaTopicName, str2);
        PartitionOffsets partitionOffsets = new PartitionOffsets();
        Iterator<String> it = getZookeeperChildrenForPath(offsetsPath).iterator();
        while (it.hasNext()) {
            Integer valueOf = Integer.valueOf(it.next());
            partitionOffsets.add(new PartitionOffset(kafkaTopicName, getOffsetForPartition(topic, kafkaTopicName, str, str2, valueOf.intValue()).longValue(), valueOf.intValue()));
        }
        return partitionOffsets;
    }

    private List<String> getZookeeperChildrenForPath(String str) {
        try {
            return (List) this.zookeeper.getChildren().forPath(str);
        } catch (Exception e) {
            throw new InternalProcessingException(e);
        }
    }

    private Long getOffsetForPartition(Topic topic, KafkaTopicName kafkaTopicName, String str, String str2, int i) {
        try {
            return Long.valueOf(new String((byte[]) this.zookeeper.getData().forPath(this.paths.offsetPath(topic.getName(), str, kafkaTopicName, str2, i)), Charsets.UTF_8));
        } catch (Exception e) {
            throw new InternalProcessingException(e);
        }
    }
}
