package io.streamnative.pulsar.handlers.kop;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator;
import io.streamnative.pulsar.handlers.kop.offset.OffsetMetadata;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.pulsar.ZookeeperSessionExpiredHandlers;
import org.apache.pulsar.broker.BookKeeperClientFactory;
import org.apache.pulsar.broker.ManagedLedgerClientFactory;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.web.WebService;
import org.apache.pulsar.common.configuration.VipStatus;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService;
import org.eclipse.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/KafkaService.class */
public class KafkaService extends PulsarService {
    private static final Logger log = LoggerFactory.getLogger(KafkaService.class);
    private final KafkaServiceConfiguration kafkaConfig;
    private GroupCoordinator groupCoordinator;

    public KafkaService(KafkaServiceConfiguration kafkaServiceConfiguration) {
        super(kafkaServiceConfiguration);
        this.kafkaConfig = kafkaServiceConfiguration;
    }

    public Map<String, String> getProtocolDataToAdvertise() {
        return ImmutableMap.builder().put(KafkaProtocolHandler.PROTOCOL_NAME, this.kafkaConfig.getListeners()).build();
    }

    public void start() throws PulsarServerException {
        ReentrantLock mutex = getMutex();
        mutex.lock();
        try {
            try {
                log.info("Starting Pulsar Broker service powered by Pulsar version: '{}'", getBrokerVersion() != null ? getBrokerVersion() : "unknown");
                if (getState() != PulsarService.State.Init) {
                    throw new PulsarServerException("Cannot start the service once it was stopped");
                }
                if (this.kafkaConfig.getListeners() == null || this.kafkaConfig.getListeners().isEmpty()) {
                    throw new IllegalArgumentException("Kafka Listeners should be provided through brokerConf.listeners");
                }
                if (this.kafkaConfig.getAdvertisedAddress() != null && !this.kafkaConfig.getListeners().contains(this.kafkaConfig.getAdvertisedAddress())) {
                    String str = "Error config: advertisedAddress - " + this.kafkaConfig.getAdvertisedAddress() + " and listeners - " + this.kafkaConfig.getListeners() + " not match.";
                    log.error(str);
                    throw new IllegalArgumentException(str);
                }
                setOrderedExecutor(OrderedExecutor.newBuilder().numThreads(8).name("pulsar-ordered").build());
                KafkaProtocolHandler kafkaProtocolHandler = new KafkaProtocolHandler();
                kafkaProtocolHandler.initialize(this.kafkaConfig);
                setLocalZooKeeperConnectionProvider(new LocalZooKeeperConnectionService(getZooKeeperClientFactory(), this.kafkaConfig.getZookeeperServers(), this.kafkaConfig.getZooKeeperSessionTimeoutMillis()));
                getLocalZooKeeperConnectionProvider().start(ZookeeperSessionExpiredHandlers.shutdownWhenZookeeperSessionExpired(getShutdownService()));
                startZkCacheService();
                BookKeeperClientFactory newBookKeeperClientFactory = newBookKeeperClientFactory();
                setBkClientFactory(newBookKeeperClientFactory);
                setManagedLedgerClientFactory(new ManagedLedgerClientFactory(this.kafkaConfig, getZkClient(), newBookKeeperClientFactory));
                setBrokerService(new BrokerService(this));
                getLoadManager().set(LoadManager.create(this));
                setDefaultOffloader(createManagedLedgerOffloader(OffloadPolicies.create(this.kafkaConfig.getProperties())));
                getBrokerService().start();
                WebService webService = new WebService(this);
                setWebService(webService);
                HashMap newHashMap = Maps.newHashMap();
                newHashMap.put("pulsar", this);
                HashMap newHashMap2 = Maps.newHashMap();
                newHashMap2.put("statusFilePath", this.kafkaConfig.getStatusFilePath());
                newHashMap2.put("isReadyProbe", new Supplier<Boolean>() { // from class: io.streamnative.pulsar.handlers.kop.KafkaService.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.function.Supplier
                    public Boolean get() {
                        return Boolean.valueOf(KafkaService.this.getState() == PulsarService.State.Started);
                    }
                });
                webService.addRestResources("/", VipStatus.class.getPackage().getName(), false, newHashMap2);
                webService.addRestResources("/", "org.apache.pulsar.broker.web", false, newHashMap);
                webService.addRestResources("/admin", "org.apache.pulsar.broker.admin.v1", true, newHashMap);
                webService.addRestResources("/admin/v2", "org.apache.pulsar.broker.admin.v2", true, newHashMap);
                webService.addRestResources("/admin/v3", "org.apache.pulsar.broker.admin.v3", true, newHashMap);
                webService.addRestResources("/lookup", "org.apache.pulsar.broker.lookup", true, newHashMap);
                webService.addServlet("/metrics", new ServletHolder(new PrometheusMetricsServlet(this, this.kafkaConfig.isExposeTopicLevelMetricsInPrometheus(), this.kafkaConfig.isExposeConsumerLevelMetricsInPrometheus())), false, newHashMap);
                if (log.isDebugEnabled()) {
                    log.debug("Attempting to add static directory");
                }
                webService.addStaticResources("/static", "/static");
                setSchemaRegistryService(SchemaRegistryService.create((SchemaStorage) null, new HashSet()));
                webService.start();
                setWebServiceAddress(webAddress(this.kafkaConfig));
                setWebServiceAddressTls(webAddressTls(this.kafkaConfig));
                setBrokerServiceUrl(this.kafkaConfig.getBrokerServicePort().isPresent() ? brokerUrl(advertisedAddress(this.kafkaConfig), ((Integer) getBrokerListenPort().get()).intValue()) : null);
                setBrokerServiceUrlTls(brokerUrlTls(this.kafkaConfig));
                startNamespaceService();
                startLeaderElectionService();
                getNsService().registerBootstrapNamespaces();
                setMetricsGenerator(new MetricsGenerator(this));
                startLoadManagementService();
                acquireSLANamespace();
                String str2 = "bootstrap service " + (this.kafkaConfig.getWebServicePort().isPresent() ? "port = " + this.kafkaConfig.getWebServicePort().get() : OffsetMetadata.NO_METADATA) + (this.kafkaConfig.getWebServicePortTls().isPresent() ? "tls-port = " + this.kafkaConfig.getWebServicePortTls() : OffsetMetadata.NO_METADATA) + "kafka listener url= " + this.kafkaConfig.getListeners();
                kafkaProtocolHandler.start(getBrokerService());
                getBrokerService().startProtocolHandlers(ImmutableMap.builder().put(KafkaProtocolHandler.PROTOCOL_NAME, kafkaProtocolHandler.newChannelInitializers()).build());
                this.groupCoordinator = kafkaProtocolHandler.getGroupCoordinator();
                setState(PulsarService.State.Started);
                log.info("Kafka messaging service is ready, {}, cluster={}, configs={}", new Object[]{str2, this.kafkaConfig.getClusterName(), ReflectionToStringBuilder.toString(this.kafkaConfig)});
                mutex.unlock();
            } catch (Exception e) {
                log.error(e.getMessage(), e);
                throw new PulsarServerException(e);
            }
        } catch (Throwable th) {
            mutex.unlock();
            throw th;
        }
    }

    public void close() throws PulsarServerException {
        if (this.groupCoordinator != null) {
            this.groupCoordinator.shutdown();
        }
        super.close();
    }

    public KafkaServiceConfiguration getKafkaConfig() {
        return this.kafkaConfig;
    }

    public GroupCoordinator getGroupCoordinator() {
        return this.groupCoordinator;
    }

    public void setGroupCoordinator(GroupCoordinator groupCoordinator) {
        this.groupCoordinator = groupCoordinator;
    }
}
