package pl.allegro.tech.hermes.common.broker;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import java.util.List;
import java.util.Set;
import javax.inject.Inject;
import javax.inject.Named;
import kafka.cluster.Broker;
import kafka.common.TopicAndPartition;
import kafka.zk.KafkaZkClient;
import org.apache.curator.framework.CuratorFramework;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.network.ListenerName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.common.exception.BrokerInfoNotAvailableException;
import pl.allegro.tech.hermes.common.exception.BrokerNotFoundForPartitionException;
import pl.allegro.tech.hermes.common.exception.PartitionsNotFoundForGivenTopicException;

/* loaded from: input_file:pl/allegro/tech/hermes/common/broker/ZookeeperBrokerStorage.class */
public class ZookeeperBrokerStorage implements BrokerStorage {
    private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperBrokerStorage.class);
    private static final String BROKER_LISTENER_NAME = "plaintext";
    private static final String PARTITIONS = "/brokers/topics/%s/partitions";
    private final CuratorFramework curatorFramework;
    private final KafkaZkClient kafkaZkClient;

    @Inject
    public ZookeeperBrokerStorage(@Named("kafkaCurator") CuratorFramework curatorFramework, KafkaZkClient kafkaZkClient) {
        this.curatorFramework = curatorFramework;
        this.kafkaZkClient = kafkaZkClient;
    }

    @Override // pl.allegro.tech.hermes.common.broker.BrokerStorage
    public int readLeaderForPartition(TopicAndPartition topicAndPartition) {
        try {
            return ((Integer) this.kafkaZkClient.getLeaderForPartition(new TopicPartition(topicAndPartition.topic(), topicAndPartition.partition())).get()).intValue();
        } catch (Exception e) {
            throw new BrokerNotFoundForPartitionException(topicAndPartition.topic(), topicAndPartition.partition(), e);
        }
    }

    @Override // pl.allegro.tech.hermes.common.broker.BrokerStorage
    public Multimap<Integer, TopicAndPartition> readLeadersForPartitions(Set<TopicAndPartition> set) {
        ArrayListMultimap create = ArrayListMultimap.create();
        for (TopicAndPartition topicAndPartition : set) {
            try {
                create.put(Integer.valueOf(readLeaderForPartition(topicAndPartition)), topicAndPartition);
            } catch (BrokerNotFoundForPartitionException e) {
                LOGGER.warn("Broker not found", e);
            }
        }
        return create;
    }

    @Override // pl.allegro.tech.hermes.common.broker.BrokerStorage
    public BrokerDetails readBrokerDetails(Integer num) {
        try {
            Broker broker = (Broker) this.kafkaZkClient.getBroker(num.intValue()).get();
            return new BrokerDetails(((Node) broker.getNode(ListenerName.normalised(BROKER_LISTENER_NAME)).get()).host(), ((Node) broker.getNode(ListenerName.normalised(BROKER_LISTENER_NAME)).get()).port());
        } catch (Exception e) {
            throw new BrokerInfoNotAvailableException(num, e);
        }
    }

    @Override // pl.allegro.tech.hermes.common.broker.BrokerStorage
    public List<Integer> readPartitionsIds(String str) {
        try {
            return Ordering.natural().sortedCopy(Lists.transform((List) this.curatorFramework.getChildren().forPath(String.format(PARTITIONS, str)), Ints.stringConverter()));
        } catch (Exception e) {
            throw new PartitionsNotFoundForGivenTopicException(str, e);
        }
    }
}
