package io.streamnative.pulsar.handlers.kop;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupConfig;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator;
import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetConfig;
import io.streamnative.pulsar.handlers.kop.offset.OffsetMetadata;
import io.streamnative.pulsar.handlers.kop.utils.ConfigurationUtils;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.MetadataUtils;
import io.streamnative.pulsar.handlers.kop.utils.TopicNameUtils;
import io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.protocol.ProtocolHandler;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.admin.Lookup;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.class */
public class KafkaProtocolHandler implements ProtocolHandler {
    private static final Logger log = LoggerFactory.getLogger(KafkaProtocolHandler.class);
    public static final String PROTOCOL_NAME = "kafka";
    public static final String SSL_PREFIX = "SSL://";
    public static final String PLAINTEXT_PREFIX = "PLAINTEXT://";
    public static final String LISTENER_DEL = ",";
    public static final String TLS_HANDLER = "tls";
    public static final String LISTENER_PATTERN = "^(PLAINTEXT?|SSL)://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*:([0-9]+)";
    public static final int DEFAULT_PORT = 9092;
    private KafkaServiceConfiguration kafkaConfig;
    private BrokerService brokerService;
    private GroupCoordinator groupCoordinator;
    private String bindAddress;

    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler$ListenerType.class */
    public enum ListenerType {
        PLAINTEXT,
        SSL
    }

    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler$OffsetAndTopicListener.class */
    public static class OffsetAndTopicListener implements NamespaceBundleOwnershipListener {
        final BrokerService service;
        final NamespaceName kafkaMetaNs;
        final NamespaceName kafkaTopicNs;
        final GroupCoordinator groupCoordinator;

        public OffsetAndTopicListener(BrokerService brokerService, KafkaServiceConfiguration kafkaServiceConfiguration, GroupCoordinator groupCoordinator) {
            this.service = brokerService;
            this.kafkaMetaNs = NamespaceName.get(kafkaServiceConfiguration.getKafkaMetadataTenant(), kafkaServiceConfiguration.getKafkaMetadataNamespace());
            this.groupCoordinator = groupCoordinator;
            this.kafkaTopicNs = NamespaceName.get(kafkaServiceConfiguration.getKafkaTenant(), kafkaServiceConfiguration.getKafkaNamespace());
        }

        public void onLoad(NamespaceBundle namespaceBundle) {
            this.service.pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(namespaceBundle).whenComplete((list, th) -> {
                if (th != null) {
                    KafkaProtocolHandler.log.error("Failed to get owned topic list for OffsetAndTopicListener when triggering on-loading bundle {}.", namespaceBundle, th);
                    return;
                }
                KafkaProtocolHandler.log.info("get owned topic list when onLoad bundle {}, topic size {} ", namespaceBundle, Integer.valueOf(list.size()));
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    String str = (String) it.next();
                    TopicName topicName = TopicName.get(str);
                    if ("__consumer_offsets".equals(TopicNameUtils.getKafkaTopicNameFromPulsarTopicname(topicName))) {
                        Preconditions.checkState(topicName.isPartitioned(), "OffsetTopic should be partitioned in onLoad, but get " + topicName);
                        if (KafkaProtocolHandler.log.isDebugEnabled()) {
                            KafkaProtocolHandler.log.debug("New offset partition load:  {}, broker: {}", topicName, this.service.pulsar().getBrokerServiceUrl());
                        }
                        this.groupCoordinator.handleGroupImmigration(topicName.getPartitionIndex());
                    }
                    KafkaTopicManager.removeTopicManagerCache(topicName.toString());
                    try {
                        CompletableFuture<InetSocketAddress> completableFuture = new CompletableFuture<>();
                        this.service.pulsar().getClient().getLookup().getBroker(TopicName.get(str)).whenComplete((pair, th) -> {
                            if (th != null) {
                                KafkaProtocolHandler.log.warn("cloud not get broker", th);
                                completableFuture.complete(null);
                            }
                            Preconditions.checkState(((InetSocketAddress) pair.getLeft()).equals(pair.getRight()));
                            completableFuture.complete(pair.getLeft());
                        });
                        KafkaTopicManager.LOOKUP_CACHE.put(str, completableFuture);
                    } catch (PulsarServerException e) {
                        KafkaProtocolHandler.log.error("onLoad PulsarServerException ", e);
                    }
                }
            });
        }

        public void unLoad(NamespaceBundle namespaceBundle) {
            this.service.pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(namespaceBundle).whenComplete((list, th) -> {
                if (th != null) {
                    KafkaProtocolHandler.log.error("Failed to get owned topic list for OffsetAndTopicListener when triggering un-loading bundle {}.", namespaceBundle, th);
                    return;
                }
                KafkaProtocolHandler.log.info("get owned topic list when unLoad bundle {}, topic size {} ", namespaceBundle, Integer.valueOf(list.size()));
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    TopicName topicName = TopicName.get((String) it.next());
                    if ("__consumer_offsets".equals(TopicNameUtils.getKafkaTopicNameFromPulsarTopicname(topicName))) {
                        Preconditions.checkState(topicName.isPartitioned(), "OffsetTopic should be partitioned in unLoad, but get " + topicName);
                        if (KafkaProtocolHandler.log.isDebugEnabled()) {
                            KafkaProtocolHandler.log.debug("Offset partition unload:  {}, broker: {}", topicName, this.service.pulsar().getBrokerServiceUrl());
                        }
                        this.groupCoordinator.handleGroupEmigration(topicName.getPartitionIndex());
                    }
                    KafkaTopicManager.removeTopicManagerCache(topicName.toString());
                }
            });
        }

        public boolean test(NamespaceBundle namespaceBundle) {
            return namespaceBundle.getNamespaceObject().equals(this.kafkaMetaNs) || namespaceBundle.getNamespaceObject().equals(this.kafkaTopicNs);
        }
    }

    public String protocolName() {
        return PROTOCOL_NAME;
    }

    public boolean accept(String str) {
        return PROTOCOL_NAME.equals(str.toLowerCase());
    }

    public void initialize(ServiceConfiguration serviceConfiguration) throws Exception {
        if (serviceConfiguration instanceof KafkaServiceConfiguration) {
            this.kafkaConfig = (KafkaServiceConfiguration) serviceConfiguration;
        } else {
            this.kafkaConfig = ConfigurationUtils.create(serviceConfiguration.getProperties(), (Class<? extends PulsarConfiguration>) KafkaServiceConfiguration.class);
            this.kafkaConfig.setAdvertisedAddress(serviceConfiguration.getAdvertisedAddress());
            this.kafkaConfig.setBindAddress(serviceConfiguration.getBindAddress());
        }
        this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(this.kafkaConfig.getBindAddress());
        KopTopic.initialize(this.kafkaConfig.getKafkaTenant() + "/" + this.kafkaConfig.getKafkaNamespace());
    }

    public String getProtocolDataToAdvertise() {
        String listenersFromConfig = getListenersFromConfig(this.kafkaConfig);
        if (log.isDebugEnabled()) {
            log.debug("Get configured listeners {}", listenersFromConfig);
        }
        return listenersFromConfig;
    }

    public void start(BrokerService brokerService) {
        this.brokerService = brokerService;
        log.info("Starting KafkaProtocolHandler, kop version is: '{}'", KopVersion.getVersion());
        log.info("Git Revision {}", KopVersion.getGitSha());
        log.info("Built by {} on {} at {}", new Object[]{KopVersion.getBuildUser(), KopVersion.getBuildHost(), KopVersion.getBuildTime()});
        if (this.kafkaConfig.isEnableGroupCoordinator()) {
            try {
                initGroupCoordinator(this.brokerService);
                startGroupCoordinator();
                this.brokerService.pulsar().getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener[]{new OffsetAndTopicListener(this.brokerService, this.kafkaConfig, this.groupCoordinator)});
            } catch (Exception e) {
                log.error("initGroupCoordinator failed with", e);
            }
        }
    }

    public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
        Preconditions.checkState(this.kafkaConfig != null);
        Preconditions.checkState(this.brokerService != null);
        if (this.kafkaConfig.isEnableGroupCoordinator()) {
            Preconditions.checkState(this.groupCoordinator != null);
        }
        try {
            String[] split = getListenersFromConfig(this.kafkaConfig).split(LISTENER_DEL);
            ImmutableMap.Builder builder = ImmutableMap.builder();
            for (String str : split) {
                if (str.startsWith(PLAINTEXT_PREFIX)) {
                    builder.put(new InetSocketAddress(this.brokerService.pulsar().getBindAddress(), getListenerPort(str)), new KafkaChannelInitializer(this.brokerService.pulsar(), this.kafkaConfig, this.groupCoordinator, false));
                } else if (str.startsWith(SSL_PREFIX)) {
                    builder.put(new InetSocketAddress(this.brokerService.pulsar().getBindAddress(), getListenerPort(str)), new KafkaChannelInitializer(this.brokerService.pulsar(), this.kafkaConfig, this.groupCoordinator, true));
                } else {
                    log.error("Kafka listener {} not supported. supports {} and {}", new Object[]{str, PLAINTEXT_PREFIX, SSL_PREFIX});
                }
            }
            return builder.build();
        } catch (Exception e) {
            log.error("KafkaProtocolHandler newChannelInitializers failed with ", e);
            return null;
        }
    }

    public void close() {
        if (this.groupCoordinator != null) {
            this.groupCoordinator.shutdown();
        }
        KafkaTopicManager.LOOKUP_CACHE.clear();
    }

    public void initGroupCoordinator(BrokerService brokerService) throws Exception {
        GroupConfig groupConfig = new GroupConfig(this.kafkaConfig.getGroupMinSessionTimeoutMs(), this.kafkaConfig.getGroupMaxSessionTimeoutMs(), this.kafkaConfig.getGroupInitialRebalanceDelayMs());
        OffsetConfig build = OffsetConfig.builder().offsetsTopicName(this.kafkaConfig.getKafkaMetadataTenant() + "/" + this.kafkaConfig.getKafkaMetadataNamespace() + "/__consumer_offsets").offsetsTopicNumPartitions(this.kafkaConfig.getOffsetsTopicNumPartitions()).offsetsTopicCompressionType(CompressionType.valueOf(this.kafkaConfig.getOffsetsTopicCompressionCodec())).maxMetadataSize(this.kafkaConfig.getOffsetMetadataMaxSize()).offsetsRetentionCheckIntervalMs(this.kafkaConfig.getOffsetsRetentionCheckIntervalMs()).offsetsRetentionMs(TimeUnit.MINUTES.toMillis(this.kafkaConfig.getOffsetsRetentionMinutes())).build();
        MetadataUtils.createKafkaMetadataIfMissing(brokerService.pulsar().getAdminClient(), this.kafkaConfig);
        this.groupCoordinator = GroupCoordinator.of(brokerService.pulsar().getClient(), groupConfig, build, this.kafkaConfig, SystemTimer.builder().executorName("group-coordinator-timer").build(), Time.SYSTEM);
        loadOffsetTopics(this.groupCoordinator);
    }

    public void startGroupCoordinator() throws Exception {
        if (this.groupCoordinator != null) {
            this.groupCoordinator.startup(true);
        } else {
            log.error("Failed to start group coordinator. Need init it first.");
        }
    }

    private void loadOffsetTopics(GroupCoordinator groupCoordinator) throws Exception {
        Lookup lookups = this.brokerService.pulsar().getAdminClient().lookups();
        String brokerServiceUrl = this.brokerService.pulsar().getBrokerServiceUrl();
        String constructOffsetsTopicBaseName = MetadataUtils.constructOffsetsTopicBaseName(this.kafkaConfig);
        int offsetsTopicNumPartitions = this.kafkaConfig.getOffsetsTopicNumPartitions();
        HashMap hashMap = new HashMap();
        for (int i = 0; i < offsetsTopicNumPartitions; i++) {
            String lookupTopic = lookups.lookupTopic(constructOffsetsTopicBaseName + "-partition-" + i);
            hashMap.putIfAbsent(lookupTopic, new ArrayList());
            ((List) hashMap.get(lookupTopic)).add(Integer.valueOf(i));
        }
        hashMap.entrySet().stream().forEach(entry -> {
            log.info("Discovered broker: {} owns offset topic partitions: {} ", entry.getKey(), entry.getValue());
        });
        List list = (List) hashMap.get(brokerServiceUrl);
        if (null == list || list.isEmpty()) {
            log.info("Current broker: {} does not own any of the offset topic partitions", brokerServiceUrl);
        } else {
            FutureUtil.waitForAll((List) list.stream().map(num -> {
                return groupCoordinator.handleGroupImmigration(num.intValue());
            }).collect(Collectors.toList())).get();
        }
    }

    public static int getListenerPort(String str) {
        Preconditions.checkState(str.matches(LISTENER_PATTERN), "listener not match patten");
        return Integer.parseInt(str.substring(str.lastIndexOf(58) + 1));
    }

    public static int getListenerPort(String str, ListenerType listenerType) {
        for (String str2 : str.split(LISTENER_DEL)) {
            if (listenerType == ListenerType.PLAINTEXT && str2.startsWith(PLAINTEXT_PREFIX)) {
                return getListenerPort(str2);
            }
            if (listenerType == ListenerType.SSL && str2.startsWith(SSL_PREFIX)) {
                return getListenerPort(str2);
            }
        }
        log.info("KafkaProtocolHandler listeners {} not contains type {}", str, listenerType);
        return -1;
    }

    public static String getKopBrokerUrl(String str, Boolean bool) {
        for (String str2 : str.split(LISTENER_DEL)) {
            if (bool.booleanValue() && str2.startsWith(SSL_PREFIX)) {
                return str2;
            }
            if (!bool.booleanValue() && str2.startsWith(PLAINTEXT_PREFIX)) {
                return str2;
            }
        }
        log.info("listener {} not contains a valid SSL or PLAINTEXT address", str);
        return null;
    }

    public static String getListenersFromConfig(KafkaServiceConfiguration kafkaServiceConfiguration) {
        String listeners = kafkaServiceConfiguration.getListeners();
        String advertisedAddress = PulsarService.advertisedAddress(kafkaServiceConfiguration);
        return (listeners == null || listeners.isEmpty()) ? PLAINTEXT_PREFIX + advertisedAddress + ':' + DEFAULT_PORT : checkAndFillUpListeners(listeners, advertisedAddress);
    }

    public static String checkAndFillUpListeners(String str, String str2) {
        String[] split = str.split(LISTENER_DEL);
        Preconditions.checkState(split.length >= 1, "Empty listener returned, should have at least 1 listener");
        String str3 = OffsetMetadata.NO_METADATA;
        for (String str4 : split) {
            Preconditions.checkState(str4.matches(LISTENER_PATTERN), "Listener in wrong format: " + str4);
            boolean z = false;
            int indexOf = str4.indexOf("//") + 2;
            int lastIndexOf = str4.lastIndexOf(":");
            String substring = str4.substring(0, indexOf);
            String substring2 = str4.substring(indexOf, lastIndexOf);
            String substring3 = str4.substring(lastIndexOf + 1);
            Preconditions.checkState(substring.equals(SSL_PREFIX) || substring.equals(PLAINTEXT_PREFIX), "Not expected Listener type: " + substring);
            if (substring2.isEmpty()) {
                substring2 = str2;
                z = true;
            } else {
                Preconditions.checkState(substring2.equals(str2), "HostName: " + substring2 + " not equals advertisedAddress: " + str2);
            }
            int parseInt = Integer.parseInt(substring3);
            Preconditions.checkState(parseInt >= 1 && parseInt <= 65536, "Not a valid port: " + parseInt);
            String str5 = z ? substring + substring2 + ":" + substring3 : str4;
            str3 = str3.isEmpty() ? str5 : str3 + LISTENER_DEL + str5;
        }
        return str3;
    }

    public KafkaServiceConfiguration getKafkaConfig() {
        return this.kafkaConfig;
    }

    public BrokerService getBrokerService() {
        return this.brokerService;
    }

    public GroupCoordinator getGroupCoordinator() {
        return this.groupCoordinator;
    }

    public String getBindAddress() {
        return this.bindAddress;
    }
}
