package io.streamnative.pulsar.handlers.kop.utils;

import com.google.common.collect.Sets;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.kafka.common.internals.Topic;
import org.apache.pulsar.client.admin.Clusters;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/utils/MetadataUtils.class */
public class MetadataUtils {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MetadataUtils.class);
    public static final int MAX_COMPACTION_THRESHOLD = 104857600;

    public static String constructOffsetsTopicBaseName(KafkaServiceConfiguration kafkaServiceConfiguration) {
        return kafkaServiceConfiguration.getKafkaMetadataTenant() + "/" + kafkaServiceConfiguration.getKafkaMetadataNamespace() + "/" + Topic.GROUP_METADATA_TOPIC_NAME;
    }

    public static String constructTxnLogTopicBaseName(KafkaServiceConfiguration kafkaServiceConfiguration) {
        return kafkaServiceConfiguration.getKafkaMetadataTenant() + "/" + kafkaServiceConfiguration.getKafkaMetadataNamespace() + "/" + Topic.TRANSACTION_STATE_TOPIC_NAME;
    }

    public static void createOffsetMetadataIfMissing(PulsarAdmin pulsarAdmin, ClusterData clusterData, KafkaServiceConfiguration kafkaServiceConfiguration) throws PulsarAdminException {
        createKafkaMetadataIfMissing(pulsarAdmin, clusterData, kafkaServiceConfiguration, new KopTopic(constructOffsetsTopicBaseName(kafkaServiceConfiguration)), kafkaServiceConfiguration.getOffsetsTopicNumPartitions());
    }

    public static void createTxnMetadataIfMissing(PulsarAdmin pulsarAdmin, ClusterData clusterData, KafkaServiceConfiguration kafkaServiceConfiguration) throws PulsarAdminException {
        createKafkaMetadataIfMissing(pulsarAdmin, clusterData, kafkaServiceConfiguration, new KopTopic(constructTxnLogTopicBaseName(kafkaServiceConfiguration)), kafkaServiceConfiguration.getTxnLogTopicNumPartitions());
    }

    private static void createKafkaMetadataIfMissing(PulsarAdmin pulsarAdmin, ClusterData clusterData, KafkaServiceConfiguration kafkaServiceConfiguration, KopTopic kopTopic, int i) throws PulsarAdminException {
        String clusterName = kafkaServiceConfiguration.getClusterName();
        String kafkaMetadataTenant = kafkaServiceConfiguration.getKafkaMetadataTenant();
        String str = kafkaMetadataTenant + "/" + kafkaServiceConfiguration.getKafkaMetadataNamespace();
        try {
            try {
                Clusters clusters = pulsarAdmin.clusters();
                if (clusters.getClusters().contains(clusterName)) {
                    log.info("Cluster {} found: {}", clusterName, clusters.getCluster(clusterName));
                } else {
                    try {
                        pulsarAdmin.clusters().createCluster(clusterName, clusterData);
                    } catch (PulsarAdminException e) {
                        if (!(e instanceof PulsarAdminException.ConflictException)) {
                            throw e;
                        }
                        log.info("Attempted to create cluster {} however it was created concurrently.", clusterName);
                    }
                }
                Tenants tenants = pulsarAdmin.tenants();
                if (tenants.getTenants().contains(kafkaMetadataTenant)) {
                    TenantInfo tenantInfo = tenants.getTenantInfo(kafkaMetadataTenant);
                    Set allowedClusters = tenantInfo.getAllowedClusters();
                    if (!allowedClusters.contains(clusterName)) {
                        log.info("Tenant: {} exists but cluster: {} is not in the allowedClusters list, updating it ...", kafkaMetadataTenant, clusterName);
                        allowedClusters.add(clusterName);
                        tenants.updateTenant(kafkaMetadataTenant, tenantInfo);
                    }
                } else {
                    log.info("Tenant: {} does not exist, creating it ...", kafkaMetadataTenant);
                    tenants.createTenant(kafkaMetadataTenant, TenantInfo.builder().adminRoles(kafkaServiceConfiguration.getSuperUserRoles()).allowedClusters(Collections.singleton(clusterName)).build());
                }
                Namespaces namespaces = pulsarAdmin.namespaces();
                if (namespaces.getNamespaces(kafkaMetadataTenant).contains(str)) {
                    List namespaceReplicationClusters = namespaces.getNamespaceReplicationClusters(str);
                    if (!namespaceReplicationClusters.contains(clusterName)) {
                        log.info("Namespace: {} exists but cluster: {} is not in the replicationClusters list,updating it ...", str, clusterName);
                        HashSet newHashSet = Sets.newHashSet(namespaceReplicationClusters);
                        newHashSet.add(clusterName);
                        namespaces.setNamespaceReplicationClusters(str, newHashSet);
                    }
                } else {
                    log.info("Namespaces: {} does not exist in tenant: {}, creating it ...", str, kafkaMetadataTenant);
                    HashSet newHashSet2 = Sets.newHashSet(new String[]{clusterName});
                    namespaces.createNamespace(str, newHashSet2);
                    namespaces.setNamespaceReplicationClusters(str, newHashSet2);
                }
                int offsetsRetentionMinutes = (int) kafkaServiceConfiguration.getOffsetsRetentionMinutes();
                RetentionPolicies retention = namespaces.getRetention(str);
                if (retention == null || retention.getRetentionTimeInMinutes() != offsetsRetentionMinutes) {
                    namespaces.setRetention(str, new RetentionPolicies((int) kafkaServiceConfiguration.getOffsetsRetentionMinutes(), -1));
                }
                Long compactionThreshold = namespaces.getCompactionThreshold(str);
                if (compactionThreshold != null && compactionThreshold.longValue() != 104857600) {
                    namespaces.setCompactionThreshold(str, 104857600L);
                }
                int offsetsMessageTTL = kafkaServiceConfiguration.getOffsetsMessageTTL();
                Integer namespaceMessageTTL = namespaces.getNamespaceMessageTTL(str);
                if (namespaceMessageTTL == null || namespaceMessageTTL.intValue() != offsetsMessageTTL) {
                    namespaces.setNamespaceMessageTTL(str, offsetsMessageTTL);
                }
                createTopicIfNotExist(pulsarAdmin, kopTopic.getFullName(), i);
                log.info("Current state of kafka metadata, cluster: {} exists: {}, tenant: {} exists: {}, namespace: {} exists: {}, topic: {} exists: {}", clusterName, true, kafkaMetadataTenant, true, str, true, kopTopic.getOriginalName(), true);
            } catch (Throwable th) {
                log.info("Current state of kafka metadata, cluster: {} exists: {}, tenant: {} exists: {}, namespace: {} exists: {}, topic: {} exists: {}", clusterName, false, kafkaMetadataTenant, false, str, false, kopTopic.getOriginalName(), false);
                throw th;
            }
        } catch (PulsarAdminException e2) {
            if (!(e2 instanceof PulsarAdminException.ConflictException)) {
                log.error("Failed to successfully initialize Kafka Metadata {}", str, e2);
                throw e2;
            }
            log.info("Resources concurrent creating and cause e: ", e2);
            log.info("Current state of kafka metadata, cluster: {} exists: {}, tenant: {} exists: {}, namespace: {} exists: {}, topic: {} exists: {}", clusterName, false, kafkaMetadataTenant, false, str, false, kopTopic.getOriginalName(), false);
        }
    }

    public static void createKafkaNamespaceIfMissing(PulsarAdmin pulsarAdmin, ClusterData clusterData, KafkaServiceConfiguration kafkaServiceConfiguration) throws PulsarAdminException {
        String clusterName = kafkaServiceConfiguration.getClusterName();
        String kafkaTenant = kafkaServiceConfiguration.getKafkaTenant();
        String str = kafkaTenant + "/" + kafkaServiceConfiguration.getKafkaNamespace();
        try {
            try {
                Clusters clusters = pulsarAdmin.clusters();
                if (clusters.getClusters().contains(clusterName)) {
                    log.info("Cluster {} found: {}", clusterName, clusters.getCluster(clusterName));
                } else {
                    try {
                        pulsarAdmin.clusters().createCluster(clusterName, clusterData);
                    } catch (PulsarAdminException e) {
                        if (!(e instanceof PulsarAdminException.ConflictException)) {
                            throw e;
                        }
                        log.info("Attempted to create cluster {} however it was created concurrently.", clusterName);
                    }
                }
                Tenants tenants = pulsarAdmin.tenants();
                if (tenants.getTenants().contains(kafkaTenant)) {
                    TenantInfo tenantInfo = tenants.getTenantInfo(kafkaTenant);
                    Set allowedClusters = tenantInfo.getAllowedClusters();
                    if (!allowedClusters.contains(clusterName)) {
                        log.info("Tenant: {} exists but cluster: {} is not in the allowedClusters list, updating it ...", kafkaTenant, clusterName);
                        allowedClusters.add(clusterName);
                        tenants.updateTenant(kafkaTenant, tenantInfo);
                    }
                } else {
                    log.info("Tenant: {} does not exist, creating it ...", kafkaTenant);
                    tenants.createTenant(kafkaTenant, TenantInfo.builder().adminRoles(kafkaServiceConfiguration.getSuperUserRoles()).allowedClusters(Collections.singleton(clusterName)).build());
                }
                Namespaces namespaces = pulsarAdmin.namespaces();
                if (namespaces.getNamespaces(kafkaTenant).contains(str)) {
                    List namespaceReplicationClusters = namespaces.getNamespaceReplicationClusters(str);
                    if (!namespaceReplicationClusters.contains(clusterName)) {
                        log.info("Namespace: {} exists but cluster: {} is not in the replicationClusters list,updating it ...", str, clusterName);
                        HashSet newHashSet = Sets.newHashSet(namespaceReplicationClusters);
                        newHashSet.add(clusterName);
                        namespaces.setNamespaceReplicationClusters(str, newHashSet);
                    }
                } else {
                    log.info("Namespaces: {} does not exist in tenant: {}, creating it ...", str, kafkaTenant);
                    HashSet newHashSet2 = Sets.newHashSet(new String[]{clusterName});
                    namespaces.createNamespace(str, newHashSet2);
                    namespaces.setNamespaceReplicationClusters(str, newHashSet2);
                }
                log.info("Current state of kafka metadata, cluster: {} exists: {}, tenant: {} exists: {}, namespace: {} exists: {}", clusterName, true, kafkaTenant, true, str, true);
            } catch (PulsarAdminException e2) {
                if (!(e2 instanceof PulsarAdminException.ConflictException)) {
                    log.error("Failed to successfully initialize Kafka Metadata {}", str, e2);
                    throw e2;
                }
                log.info("Resources concurrent creating and cause e: ", e2);
                log.info("Current state of kafka metadata, cluster: {} exists: {}, tenant: {} exists: {}, namespace: {} exists: {}", clusterName, false, kafkaTenant, false, str, false);
            }
        } catch (Throwable th) {
            log.info("Current state of kafka metadata, cluster: {} exists: {}, tenant: {} exists: {}, namespace: {} exists: {}", clusterName, false, kafkaTenant, false, str, false);
            throw th;
        }
    }

    private static void createTopicIfNotExist(PulsarAdmin pulsarAdmin, String str, int i) throws PulsarAdminException {
        try {
            pulsarAdmin.topics().createPartitionedTopic(str, i);
        } catch (PulsarAdminException.ConflictException e) {
            log.info("Resources concurrent creating for topic : {}, caused by : {}", str, e.getMessage());
        }
        try {
            pulsarAdmin.topics().createMissedPartitions(str);
        } catch (PulsarAdminException e2) {
        }
    }
}
