package io.streamnative.pulsar.handlers.kop.utils;

import com.google.common.collect.Sets;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.client.admin.Clusters;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/utils/MetadataUtils.class */
public class MetadataUtils {
    private static final Logger log = LoggerFactory.getLogger(MetadataUtils.class);

    public static String constructOffsetsTopicBaseName(KafkaServiceConfiguration kafkaServiceConfiguration) {
        return kafkaServiceConfiguration.getKafkaMetadataTenant() + "/" + kafkaServiceConfiguration.getKafkaMetadataNamespace() + "/__consumer_offsets";
    }

    public static void createKafkaMetadataIfMissing(PulsarAdmin pulsarAdmin, KafkaServiceConfiguration kafkaServiceConfiguration) throws PulsarServerException, PulsarAdminException {
        String clusterName = kafkaServiceConfiguration.getClusterName();
        String kafkaMetadataTenant = kafkaServiceConfiguration.getKafkaMetadataTenant();
        String str = kafkaMetadataTenant + "/" + kafkaServiceConfiguration.getKafkaMetadataNamespace();
        String constructOffsetsTopicBaseName = constructOffsetsTopicBaseName(kafkaServiceConfiguration);
        try {
            try {
                Clusters clusters = pulsarAdmin.clusters();
                if (!clusters.getClusters().contains(clusterName)) {
                    throw new PulsarServerException.NotFoundException("Configured cluster does not exist");
                }
                log.info("Cluster {} found: {}", clusterName, clusters.getCluster(clusterName));
                Tenants tenants = pulsarAdmin.tenants();
                if (tenants.getTenants().contains(kafkaMetadataTenant)) {
                    TenantInfo tenantInfo = tenants.getTenantInfo(kafkaMetadataTenant);
                    Set allowedClusters = tenantInfo.getAllowedClusters();
                    if (!allowedClusters.contains(clusterName)) {
                        log.info("Tenant: {} exists but cluster: {} is not in the allowedClusters list, updating it ...", kafkaMetadataTenant, clusterName);
                        allowedClusters.add(clusterName);
                        tenants.updateTenant(kafkaMetadataTenant, tenantInfo);
                    }
                } else {
                    log.info("Tenant: {} does not exist, creating it ...", kafkaMetadataTenant);
                    tenants.createTenant(kafkaMetadataTenant, new TenantInfo(Sets.newHashSet(kafkaServiceConfiguration.getSuperUserRoles()), Sets.newHashSet(new String[]{clusterName})));
                }
                Namespaces namespaces = pulsarAdmin.namespaces();
                if (namespaces.getNamespaces(kafkaMetadataTenant).contains(str)) {
                    List namespaceReplicationClusters = namespaces.getNamespaceReplicationClusters(str);
                    if (!namespaceReplicationClusters.contains(clusterName)) {
                        log.info("Namespace: {} exists but cluster: {} is not in the replicationClusters list,updating it ...", str, clusterName);
                        HashSet newHashSet = Sets.newHashSet(namespaceReplicationClusters);
                        newHashSet.add(clusterName);
                        namespaces.setNamespaceReplicationClusters(str, newHashSet);
                    }
                } else {
                    log.info("Namespaces: {} does not exist in tenant: {}, creating it ...", str, kafkaMetadataTenant);
                    HashSet newHashSet2 = Sets.newHashSet(new String[]{kafkaServiceConfiguration.getClusterName()});
                    namespaces.createNamespace(str, newHashSet2);
                    namespaces.setNamespaceReplicationClusters(str, newHashSet2);
                    namespaces.setRetention(str, new RetentionPolicies(-1, -1));
                }
                Topics topics = pulsarAdmin.topics();
                PartitionedTopicMetadata partitionedTopicMetadata = topics.getPartitionedTopicMetadata(constructOffsetsTopicBaseName);
                HashSet hashSet = new HashSet(kafkaServiceConfiguration.getOffsetsTopicNumPartitions());
                for (int i = 0; i < kafkaServiceConfiguration.getOffsetsTopicNumPartitions(); i++) {
                    hashSet.add(constructOffsetsTopicBaseName + "-partition-" + i);
                }
                if (partitionedTopicMetadata.partitions <= 0) {
                    log.info("Kafka group metadata topic {} doesn't exist. Creating it ...", constructOffsetsTopicBaseName);
                    topics.createPartitionedTopic(constructOffsetsTopicBaseName, kafkaServiceConfiguration.getOffsetsTopicNumPartitions());
                    Iterator it = hashSet.iterator();
                    while (it.hasNext()) {
                        topics.createNonPartitionedTopic((String) it.next());
                    }
                    log.info("Successfully created group metadata topic {} with {} partitions.", constructOffsetsTopicBaseName, Integer.valueOf(kafkaServiceConfiguration.getOffsetsTopicNumPartitions()));
                } else {
                    hashSet.removeAll((Collection) topics.getList(str).stream().filter(str2 -> {
                        return str2.startsWith(constructOffsetsTopicBaseName + "-partition-");
                    }).collect(Collectors.toList()));
                    if (!hashSet.isEmpty()) {
                        log.info("Identified missing offset topic partitions: {}", hashSet);
                        Iterator it2 = hashSet.iterator();
                        while (it2.hasNext()) {
                            topics.createNonPartitionedTopic((String) it2.next());
                        }
                    }
                }
                log.info("Current state of kafka metadata, cluster: {} exists: {}, tenant: {} exists: {}, namespace: {} exists: {}, topic: {} exists: {}", new Object[]{clusterName, true, kafkaMetadataTenant, true, str, true, constructOffsetsTopicBaseName, true});
            } catch (PulsarAdminException e) {
                if (!(e instanceof PulsarAdminException.ConflictException)) {
                    log.error("Failed to successfully initialize Kafka Metadata {}", str, e);
                    throw e;
                }
                log.info("Resources concurrent creating and cause e: ", e);
                log.info("Current state of kafka metadata, cluster: {} exists: {}, tenant: {} exists: {}, namespace: {} exists: {}, topic: {} exists: {}", new Object[]{clusterName, false, kafkaMetadataTenant, false, str, false, constructOffsetsTopicBaseName, false});
            }
        } catch (Throwable th) {
            log.info("Current state of kafka metadata, cluster: {} exists: {}, tenant: {} exists: {}, namespace: {} exists: {}, topic: {} exists: {}", new Object[]{clusterName, false, kafkaMetadataTenant, false, str, false, constructOffsetsTopicBaseName, false});
            throw th;
        }
    }
}
