package io.streamnative.pulsar.handlers.kop;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupConfig;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator;
import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetConfig;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionConfig;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator;
import io.streamnative.pulsar.handlers.kop.stats.PrometheusMetricsProvider;
import io.streamnative.pulsar.handlers.kop.stats.StatsLogger;
import io.streamnative.pulsar.handlers.kop.utils.ConfigurationUtils;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.MetadataUtils;
import io.streamnative.pulsar.handlers.kop.utils.TopicNameUtils;
import io.streamnative.pulsar.handlers.kop.utils.ZooKeeperUtils;
import io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.protocol.ProtocolHandler;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.admin.Lookup;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.class */
public class KafkaProtocolHandler implements ProtocolHandler {
    public static final String PROTOCOL_NAME = "kafka";
    public static final String TLS_HANDLER = "tls";
    private StatsLogger rootStatsLogger;
    private PrometheusMetricsProvider statsProvider;
    private KopBrokerLookupManager kopBrokerLookupManager;
    private AdminManager adminManager = null;
    private MetadataCache<LocalBrokerData> localBrokerDataCache;
    private KafkaServiceConfiguration kafkaConfig;
    private BrokerService brokerService;
    private GroupCoordinator groupCoordinator;
    private TransactionCoordinator transactionCoordinator;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaProtocolHandler.class);
    private static final Map<PulsarService, LookupClient> LOOKUP_CLIENT_MAP = new ConcurrentHashMap();

    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler$OffsetAndTopicListener.class */
    public static class OffsetAndTopicListener implements NamespaceBundleOwnershipListener {
        final BrokerService service;
        final NamespaceName kafkaMetaNs;
        final NamespaceName kafkaTopicNs;
        final GroupCoordinator groupCoordinator;
        final String brokerUrl;

        public OffsetAndTopicListener(BrokerService brokerService, KafkaServiceConfiguration kafkaServiceConfiguration, GroupCoordinator groupCoordinator) {
            this.service = brokerService;
            this.kafkaMetaNs = NamespaceName.get(kafkaServiceConfiguration.getKafkaMetadataTenant(), kafkaServiceConfiguration.getKafkaMetadataNamespace());
            this.groupCoordinator = groupCoordinator;
            this.kafkaTopicNs = NamespaceName.get(kafkaServiceConfiguration.getKafkaTenant(), kafkaServiceConfiguration.getKafkaNamespace());
            this.brokerUrl = brokerService.pulsar().getBrokerServiceUrl();
        }

        public void onLoad(NamespaceBundle namespaceBundle) {
            if (KafkaProtocolHandler.log.isDebugEnabled()) {
                KafkaProtocolHandler.log.debug("[{}] onLoad bundle: {}", this.brokerUrl, namespaceBundle);
            }
            this.service.pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(namespaceBundle).whenComplete((list, th) -> {
                if (th != null) {
                    KafkaProtocolHandler.log.error("Failed to get owned topic list for OffsetAndTopicListener when triggering on-loading bundle {}.", namespaceBundle, th);
                    return;
                }
                KafkaProtocolHandler.log.info("get owned topic list when onLoad bundle {}, topic size {} ", namespaceBundle, Integer.valueOf(list.size()));
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    String str = (String) it.next();
                    TopicName topicName = TopicName.get(str);
                    if (Topic.GROUP_METADATA_TOPIC_NAME.equals(TopicNameUtils.getKafkaTopicNameFromPulsarTopicname(topicName))) {
                        Preconditions.checkState(topicName.isPartitioned(), "OffsetTopic should be partitioned in onLoad, but get " + topicName);
                        if (KafkaProtocolHandler.log.isDebugEnabled()) {
                            KafkaProtocolHandler.log.debug("New offset partition load:  {}, broker: {}", topicName, this.service.pulsar().getBrokerServiceUrl());
                        }
                        this.groupCoordinator.handleGroupImmigration(topicName.getPartitionIndex());
                    }
                    KopBrokerLookupManager.removeTopicManagerCache(str);
                    KafkaTopicManager.deReference(str);
                    if (!topicName.isPartitioned()) {
                        String topicName2 = topicName.getPartition(0).toString();
                        KafkaTopicManager.deReference(topicName2);
                        KopBrokerLookupManager.removeTopicManagerCache(topicName2);
                    }
                }
            });
        }

        public void unLoad(NamespaceBundle namespaceBundle) {
            if (KafkaProtocolHandler.log.isDebugEnabled()) {
                KafkaProtocolHandler.log.debug("[{}] unLoad bundle: {}", this.brokerUrl, namespaceBundle);
            }
            this.service.pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(namespaceBundle).whenComplete((list, th) -> {
                if (th != null) {
                    KafkaProtocolHandler.log.error("Failed to get owned topic list for OffsetAndTopicListener when triggering un-loading bundle {}.", namespaceBundle, th);
                    return;
                }
                KafkaProtocolHandler.log.info("get owned topic list when unLoad bundle {}, topic size {} ", namespaceBundle, Integer.valueOf(list.size()));
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    String str = (String) it.next();
                    TopicName topicName = TopicName.get(str);
                    if (Topic.GROUP_METADATA_TOPIC_NAME.equals(TopicNameUtils.getKafkaTopicNameFromPulsarTopicname(topicName))) {
                        Preconditions.checkState(topicName.isPartitioned(), "OffsetTopic should be partitioned in unLoad, but get " + topicName);
                        if (KafkaProtocolHandler.log.isDebugEnabled()) {
                            KafkaProtocolHandler.log.debug("Offset partition unload:  {}, broker: {}", topicName, this.service.pulsar().getBrokerServiceUrl());
                        }
                        this.groupCoordinator.handleGroupEmigration(topicName.getPartitionIndex());
                    }
                    KopBrokerLookupManager.removeTopicManagerCache(str);
                    KafkaTopicManager.deReference(str);
                    if (!topicName.isPartitioned()) {
                        String topicName2 = topicName.getPartition(0).toString();
                        KafkaTopicManager.deReference(topicName2);
                        KopBrokerLookupManager.removeTopicManagerCache(topicName2);
                    }
                }
            });
        }

        public boolean test(NamespaceBundle namespaceBundle) {
            return namespaceBundle.getNamespaceObject().equals(this.kafkaMetaNs) || namespaceBundle.getNamespaceObject().equals(this.kafkaTopicNs);
        }
    }

    public String protocolName() {
        return PROTOCOL_NAME;
    }

    public boolean accept(String str) {
        return PROTOCOL_NAME.equals(str.toLowerCase());
    }

    public void initialize(ServiceConfiguration serviceConfiguration) throws Exception {
        if (serviceConfiguration instanceof KafkaServiceConfiguration) {
            this.kafkaConfig = (KafkaServiceConfiguration) serviceConfiguration;
        } else {
            this.kafkaConfig = ConfigurationUtils.create(serviceConfiguration.getProperties(), (Class<? extends PulsarConfiguration>) KafkaServiceConfiguration.class);
            this.kafkaConfig.setAdvertisedAddress(serviceConfiguration.getAdvertisedAddress());
            this.kafkaConfig.setBindAddress(serviceConfiguration.getBindAddress());
        }
        KopTopic.initialize(this.kafkaConfig.getKafkaTenant() + "/" + this.kafkaConfig.getKafkaNamespace());
        for (String str : this.kafkaConfig.getKopAllowedNamespaces()) {
            String[] split = str.split("/");
            if (split.length != 2) {
                throw new IllegalArgumentException("Invalid namespace '" + str + "' in kopAllowedNamespaces config");
            }
            NamespaceName.validateNamespaceName(split[0], split[1]);
        }
        this.statsProvider = new PrometheusMetricsProvider();
        this.rootStatsLogger = this.statsProvider.getStatsLogger("");
    }

    public String getProtocolDataToAdvertise() {
        return this.kafkaConfig.getKafkaAdvertisedListeners();
    }

    public void start(BrokerService brokerService) {
        this.brokerService = brokerService;
        this.kopBrokerLookupManager = new KopBrokerLookupManager(this.brokerService.getPulsar(), false, this.kafkaConfig.getKafkaAdvertisedListeners());
        log.info("Starting KafkaProtocolHandler, kop version is: '{}'", KopVersion.getVersion());
        log.info("Git Revision {}", KopVersion.getGitSha());
        log.info("Built by {} on {} at {}", KopVersion.getBuildUser(), KopVersion.getBuildHost(), KopVersion.getBuildTime());
        this.localBrokerDataCache = this.brokerService.pulsar().getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
        ZooKeeperUtils.tryCreatePath(this.brokerService.pulsar().getZkClient(), this.kafkaConfig.getGroupIdZooKeeperPath(), new byte[0]);
        try {
            PulsarAdmin adminClient = this.brokerService.getPulsar().getAdminClient();
            this.adminManager = new AdminManager(adminClient, this.kafkaConfig);
            LOOKUP_CLIENT_MAP.put(this.brokerService.pulsar(), new LookupClient(this.brokerService.pulsar(), this.kafkaConfig));
            ClusterData build = ClusterData.builder().serviceUrl(this.brokerService.getPulsar().getWebServiceAddress()).serviceUrlTls(this.brokerService.getPulsar().getWebServiceAddressTls()).brokerServiceUrl(this.brokerService.getPulsar().getBrokerServiceUrl()).brokerServiceUrlTls(this.brokerService.getPulsar().getBrokerServiceUrlTls()).build();
            try {
                PulsarClient client = this.brokerService.getPulsar().getClient();
                try {
                    MetadataUtils.createOffsetMetadataIfMissing(adminClient, build, this.kafkaConfig);
                    startGroupCoordinator(client);
                    this.brokerService.pulsar().getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener[]{new OffsetAndTopicListener(this.brokerService, this.kafkaConfig, this.groupCoordinator)});
                    try {
                        MetadataUtils.createKafkaNamespaceIfMissing(adminClient, build, this.kafkaConfig);
                    } catch (PulsarAdminException e) {
                        log.warn("init kafka failed, need to create it manually later", e);
                    }
                    if (this.kafkaConfig.isEnableTransactionCoordinator()) {
                        try {
                            initTransactionCoordinator(adminClient, build);
                            startTransactionCoordinator();
                        } catch (Exception e2) {
                            log.error("Initialized transaction coordinator failed.", (Throwable) e2);
                            throw new IllegalStateException(e2);
                        }
                    }
                    Configuration propertiesConfiguration = new PropertiesConfiguration();
                    propertiesConfiguration.addProperty(PrometheusMetricsProvider.PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS, Integer.valueOf(this.kafkaConfig.getKopPrometheusStatsLatencyRolloverSeconds()));
                    this.statsProvider.start(propertiesConfiguration);
                    this.brokerService.pulsar().addPrometheusRawMetricsProvider(this.statsProvider);
                } catch (PulsarAdminException e3) {
                    log.error("Failed to create offset metadata", e3);
                    throw new IllegalStateException(e3);
                }
            } catch (PulsarServerException e4) {
                log.error("Failed to create builtin PulsarClient", e4);
                throw new IllegalStateException(e4);
            }
        } catch (PulsarServerException e5) {
            log.error("Failed to get pulsarAdmin", e5);
            throw new IllegalStateException(e5);
        }
    }

    public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
        Preconditions.checkState(this.kafkaConfig != null);
        Preconditions.checkState(this.brokerService != null);
        try {
            ImmutableMap.Builder builder = ImmutableMap.builder();
            Map<SecurityProtocol, EndPoint> parseListeners = EndPoint.parseListeners(this.kafkaConfig.getKafkaAdvertisedListeners());
            EndPoint.parseListeners(this.kafkaConfig.getListeners()).forEach((securityProtocol, endPoint) -> {
                EndPoint endPoint = (EndPoint) parseListeners.get(securityProtocol);
                if (endPoint == null) {
                    endPoint = endPoint;
                }
                switch (securityProtocol) {
                    case PLAINTEXT:
                    case SASL_PLAINTEXT:
                        builder.put(endPoint.getInetAddress(), new KafkaChannelInitializer(this.brokerService.getPulsar(), this.kafkaConfig, this.groupCoordinator, this.transactionCoordinator, this.adminManager, false, endPoint, this.rootStatsLogger.scope(KopServerStats.SERVER_SCOPE), this.localBrokerDataCache));
                        return;
                    case SSL:
                    case SASL_SSL:
                        builder.put(endPoint.getInetAddress(), new KafkaChannelInitializer(this.brokerService.getPulsar(), this.kafkaConfig, this.groupCoordinator, this.transactionCoordinator, this.adminManager, true, endPoint, this.rootStatsLogger.scope(KopServerStats.SERVER_SCOPE), this.localBrokerDataCache));
                        return;
                    default:
                        return;
                }
            });
            return builder.build();
        } catch (Exception e) {
            log.error("KafkaProtocolHandler newChannelInitializers failed with ", (Throwable) e);
            return null;
        }
    }

    public void close() {
        Optional.ofNullable(LOOKUP_CLIENT_MAP.remove(this.brokerService.pulsar())).ifPresent((v0) -> {
            v0.close();
        });
        this.adminManager.shutdown();
        this.groupCoordinator.shutdown();
        KafkaTopicManager.LOOKUP_CACHE.clear();
        KopBrokerLookupManager.clear();
        KafkaTopicManager.closeKafkaTopicConsumerManagers();
        KafkaTopicManager.getReferences().clear();
        KafkaTopicManager.getTopics().clear();
        this.statsProvider.stop();
    }

    public void startGroupCoordinator(PulsarClient pulsarClient) {
        this.groupCoordinator = GroupCoordinator.of((PulsarClientImpl) pulsarClient, new GroupConfig(this.kafkaConfig.getGroupMinSessionTimeoutMs(), this.kafkaConfig.getGroupMaxSessionTimeoutMs(), this.kafkaConfig.getGroupInitialRebalanceDelayMs()), OffsetConfig.builder().offsetsTopicName(this.kafkaConfig.getKafkaMetadataTenant() + "/" + this.kafkaConfig.getKafkaMetadataNamespace() + "/" + Topic.GROUP_METADATA_TOPIC_NAME).offsetsTopicNumPartitions(this.kafkaConfig.getOffsetsTopicNumPartitions()).offsetsTopicCompressionType(CompressionType.valueOf(this.kafkaConfig.getOffsetsTopicCompressionCodec())).maxMetadataSize(this.kafkaConfig.getOffsetMetadataMaxSize()).offsetsRetentionCheckIntervalMs(this.kafkaConfig.getOffsetsRetentionCheckIntervalMs()).offsetsRetentionMs(TimeUnit.MINUTES.toMillis(this.kafkaConfig.getOffsetsRetentionMinutes())).build(), SystemTimer.builder().executorName("group-coordinator-timer").build(), Time.SYSTEM);
        this.groupCoordinator.startup(true);
    }

    public void initTransactionCoordinator(PulsarAdmin pulsarAdmin, ClusterData clusterData) throws Exception {
        TransactionConfig build = TransactionConfig.builder().transactionLogNumPartitions(this.kafkaConfig.getTxnLogTopicNumPartitions()).transactionMetadataTopicName(MetadataUtils.constructTxnLogTopicBaseName(this.kafkaConfig)).build();
        MetadataUtils.createTxnMetadataIfMissing(pulsarAdmin, clusterData, this.kafkaConfig);
        this.transactionCoordinator = TransactionCoordinator.of(build, Integer.valueOf(this.kafkaConfig.getBrokerId()), this.brokerService.getPulsar().getZkClient(), this.kopBrokerLookupManager);
        loadTxnLogTopics(this.transactionCoordinator);
    }

    public void startTransactionCoordinator() throws Exception {
        if (this.transactionCoordinator != null) {
            this.transactionCoordinator.startup().get();
        } else {
            log.error("Failed to start transaction coordinator. Need init it first.");
        }
    }

    private void loadTxnLogTopics(TransactionCoordinator transactionCoordinator) throws Exception {
        Lookup lookups = this.brokerService.pulsar().getAdminClient().lookups();
        String brokerServiceUrl = this.brokerService.pulsar().getBrokerServiceUrl();
        String constructTxnLogTopicBaseName = MetadataUtils.constructTxnLogTopicBaseName(this.kafkaConfig);
        int txnLogTopicNumPartitions = this.kafkaConfig.getTxnLogTopicNumPartitions();
        HashMap hashMap = new HashMap();
        for (int i = 0; i < txnLogTopicNumPartitions; i++) {
            String lookupTopic = lookups.lookupTopic(constructTxnLogTopicBaseName + "-partition-" + i);
            hashMap.putIfAbsent(lookupTopic, new ArrayList());
            ((List) hashMap.get(lookupTopic)).add(Integer.valueOf(i));
        }
        hashMap.forEach((str, list) -> {
            log.info("Discovered broker: {} owns txn log topic partitions: {} ", str, list);
        });
        List list2 = (List) hashMap.get(brokerServiceUrl);
        if (null == list2 || list2.isEmpty()) {
            log.info("Current broker: {} does not own any of the txn log topic partitions", brokerServiceUrl);
        } else {
            FutureUtil.waitForAll((List) list2.stream().map(num -> {
                return transactionCoordinator.loadTransactionMetadata(num.intValue());
            }).collect(Collectors.toList())).get();
        }
    }

    @NonNull
    public static LookupClient getLookupClient(PulsarService pulsarService) {
        return LOOKUP_CLIENT_MAP.computeIfAbsent(pulsarService, pulsarService2 -> {
            return new LookupClient(pulsarService);
        });
    }

    public KafkaServiceConfiguration getKafkaConfig() {
        return this.kafkaConfig;
    }

    public BrokerService getBrokerService() {
        return this.brokerService;
    }

    public GroupCoordinator getGroupCoordinator() {
        return this.groupCoordinator;
    }

    public TransactionCoordinator getTransactionCoordinator() {
        return this.transactionCoordinator;
    }
}
