/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.auth;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.channel.EventLoopGroup;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.BookKeeperClientFactory;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.PortForwarder;
import org.apache.pulsar.broker.auth.SameThreadOrderedSafeExecutor;
import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PowerMockIgnore(value={"org.slf4j.*", "com.sun.org.apache.xerces.*"})
public abstract class MockedPulsarServiceBaseTest
extends TestRetrySupport {
    protected final String DUMMY_VALUE = "DUMMY_VALUE";
    protected final String GLOBAL_DUMMY_VALUE = "GLOBAL_DUMMY_VALUE";
    protected ServiceConfiguration conf;
    protected PulsarService pulsar;
    protected PulsarAdmin admin;
    protected PulsarClient pulsarClient;
    protected PortForwarder brokerGateway;
    protected boolean enableBrokerGateway = false;
    protected URL brokerUrl;
    protected URL brokerUrlTls;
    protected URI lookupUrl;
    protected MockZooKeeper mockZooKeeper;
    protected MockZooKeeper mockZooKeeperGlobal;
    protected NonClosableMockBookKeeper mockBookKeeper;
    protected boolean isTcpLookup = false;
    protected static final String configClusterName = "test";
    private SameThreadOrderedSafeExecutor sameThreadOrderedSafeExecutor;
    private OrderedExecutor bkExecutor;
    protected boolean enableBrokerInterceptor = false;
    protected ZooKeeperClientFactory mockZooKeeperClientFactory = new ZooKeeperClientFactory(){

        public CompletableFuture<ZooKeeper> create(String serverList, ZooKeeperClientFactory.SessionType sessionType, int zkSessionTimeoutMillis) {
            if (serverList != null && (serverList.equalsIgnoreCase(MockedPulsarServiceBaseTest.this.conf.getConfigurationStoreServers()) || serverList.equalsIgnoreCase("GLOBAL_DUMMY_VALUE"))) {
                return CompletableFuture.completedFuture(MockedPulsarServiceBaseTest.this.mockZooKeeperGlobal);
            }
            return CompletableFuture.completedFuture(MockedPulsarServiceBaseTest.this.mockZooKeeper);
        }
    };
    private final BookKeeperClientFactory mockBookKeeperClientFactory = new BookKeeperClientFactory(){

        public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient, EventLoopGroup eventLoopGroup, Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass, Map<String, Object> properties) {
            return MockedPulsarServiceBaseTest.this.mockBookKeeper;
        }

        public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient, EventLoopGroup eventLoopGroup, Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass, Map<String, Object> properties, StatsLogger statsLogger) {
            return MockedPulsarServiceBaseTest.this.mockBookKeeper;
        }

        public void close() {
        }
    };
    private static final Logger log = LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);

    public MockedPulsarServiceBaseTest() {
        this.resetConfig();
    }

    protected PulsarService getPulsar() {
        return this.pulsar;
    }

    protected final void resetConfig() {
        this.conf = MockedPulsarServiceBaseTest.getDefaultConf();
    }

    protected final void internalSetup() throws Exception {
        this.incrementSetupNumber();
        this.init();
        this.lookupUrl = new URI(this.brokerUrl.toString());
        if (this.isTcpLookup) {
            this.lookupUrl = new URI(this.pulsar.getBrokerServiceUrl());
            if (this.enableBrokerGateway) {
                InetSocketAddress gatewayAddress = new InetSocketAddress(this.lookupUrl.getHost(), this.lookupUrl.getPort());
                InetSocketAddress brokerAddress = new InetSocketAddress("127.0.0.1", (int)((Integer)this.pulsar.getBrokerListenPort().get()));
                this.brokerGateway = new PortForwarder(gatewayAddress, brokerAddress);
            }
        }
        this.pulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
    }

    protected final void internalSetup(ServiceConfiguration serviceConfiguration) throws Exception {
        this.conf = serviceConfiguration;
        this.internalSetup();
    }

    protected PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(url).statsInterval((long)intervalInSecs, TimeUnit.SECONDS);
        this.customizeNewPulsarClientBuilder(clientBuilder);
        return this.createNewPulsarClient(clientBuilder);
    }

    protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
    }

    protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws PulsarClientException {
        return clientBuilder.build();
    }

    protected PulsarClient replacePulsarClient(ClientBuilder clientBuilder) throws PulsarClientException {
        if (this.pulsarClient != null) {
            this.pulsarClient.shutdown();
        }
        this.pulsarClient = this.createNewPulsarClient(clientBuilder);
        return this.pulsarClient;
    }

    protected final void internalSetupForStatsTest() throws Exception {
        this.init();
        String lookupUrl = this.brokerUrl.toString();
        if (this.isTcpLookup) {
            lookupUrl = new URI(this.pulsar.getBrokerServiceUrl()).toString();
        }
        this.pulsarClient = this.newPulsarClient(lookupUrl, 1);
    }

    protected void doInitConf() throws Exception {
        this.conf.setBrokerShutdownTimeoutMs(0L);
        this.conf.setBrokerServicePort(Optional.of(0));
        this.conf.setBrokerServicePortTls(Optional.of(0));
        this.conf.setAdvertisedAddress("localhost");
        this.conf.setWebServicePort(Optional.of(0));
        this.conf.setWebServicePortTls(Optional.of(0));
        this.conf.setNumExecutorThreadPoolSize(5);
    }

    protected final void init() throws Exception {
        this.doInitConf();
        this.sameThreadOrderedSafeExecutor = new SameThreadOrderedSafeExecutor();
        this.bkExecutor = OrderedExecutor.newBuilder().numThreads(1).name("mock-pulsar-bk").build();
        this.mockZooKeeper = MockedPulsarServiceBaseTest.createMockZooKeeper();
        this.mockZooKeeperGlobal = MockedPulsarServiceBaseTest.createMockZooKeeperGlobal();
        this.mockBookKeeper = MockedPulsarServiceBaseTest.createMockBookKeeper(this.bkExecutor);
        this.startBroker();
    }

    protected final void internalCleanup() throws Exception {
        this.markCurrentSetupNumberCleaned();
        if (this.admin != null) {
            this.admin.close();
            this.admin = null;
        }
        if (this.pulsarClient != null) {
            this.pulsarClient.shutdown();
            this.pulsarClient = null;
        }
        if (this.brokerGateway != null) {
            this.brokerGateway.close();
        }
        if (this.pulsar != null) {
            this.stopBroker();
            this.pulsar = null;
        }
        this.resetConfig();
        if (this.mockBookKeeper != null) {
            this.mockBookKeeper.reallyShutdown();
            this.mockBookKeeper = null;
        }
        if (this.mockZooKeeperGlobal != null) {
            this.mockZooKeeperGlobal.shutdown();
            this.mockZooKeeperGlobal = null;
        }
        if (this.mockZooKeeper != null) {
            this.mockZooKeeper.shutdown();
            this.mockZooKeeper = null;
        }
        if (this.sameThreadOrderedSafeExecutor != null) {
            try {
                this.sameThreadOrderedSafeExecutor.shutdownNow();
                this.sameThreadOrderedSafeExecutor.awaitTermination(5L, TimeUnit.SECONDS);
            }
            catch (InterruptedException ex) {
                log.error("sameThreadOrderedSafeExecutor shutdown had error", (Throwable)ex);
                Thread.currentThread().interrupt();
            }
            this.sameThreadOrderedSafeExecutor = null;
        }
        if (this.bkExecutor != null) {
            try {
                this.bkExecutor.shutdownNow();
                this.bkExecutor.awaitTermination(5L, TimeUnit.SECONDS);
            }
            catch (InterruptedException ex) {
                log.error("bkExecutor shutdown had error", (Throwable)ex);
                Thread.currentThread().interrupt();
            }
            this.bkExecutor = null;
        }
        this.onCleanup();
    }

    protected void onCleanup() {
    }

    protected abstract void setup() throws Exception;

    protected abstract void cleanup() throws Exception;

    protected void beforePulsarStartMocks(PulsarService pulsar) throws Exception {
    }

    protected void restartBroker() throws Exception {
        this.stopBroker();
        this.startBroker();
    }

    protected void stopBroker() throws Exception {
        log.info("Stopping Pulsar broker. brokerServiceUrl: {} webServiceAddress: {}", (Object)this.pulsar.getBrokerServiceUrl(), (Object)this.pulsar.getWebServiceAddress());
        this.pulsar.getConfiguration().setBrokerShutdownTimeoutMs(0L);
        this.pulsar.close();
        this.pulsar = null;
    }

    protected void startBroker() throws Exception {
        if (this.pulsar != null) {
            throw new RuntimeException("broker already started!");
        }
        this.pulsar = this.startBroker(this.conf);
        this.brokerUrl = this.pulsar.getWebServiceAddress() != null ? new URL(this.pulsar.getWebServiceAddress()) : null;
        URL uRL = this.brokerUrlTls = this.pulsar.getWebServiceAddressTls() != null ? new URL(this.pulsar.getWebServiceAddressTls()) : null;
        if (this.admin != null) {
            this.admin.close();
        }
        PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(this.brokerUrl != null ? this.brokerUrl.toString() : this.brokerUrlTls.toString());
        this.customizeNewPulsarAdminBuilder(pulsarAdminBuilder);
        this.admin = (PulsarAdmin)Mockito.spy((Object)pulsarAdminBuilder.build());
    }

    protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuilder) {
    }

    protected PulsarService startBroker(ServiceConfiguration conf) throws Exception {
        return this.startBrokerWithoutAuthorization(conf);
    }

    protected PulsarService startBrokerWithoutAuthorization(ServiceConfiguration conf) throws Exception {
        conf.setBrokerShutdownTimeoutMs(0L);
        PulsarService pulsar = (PulsarService)Mockito.spy((Object)this.newPulsarService(conf));
        this.setupBrokerMocks(pulsar);
        this.beforePulsarStartMocks(pulsar);
        pulsar.start();
        log.info("Pulsar started. brokerServiceUrl: {} webServiceAddress: {}", (Object)pulsar.getBrokerServiceUrl(), (Object)pulsar.getWebServiceAddress());
        return pulsar;
    }

    protected PulsarService newPulsarService(ServiceConfiguration conf) throws Exception {
        return new PulsarService(conf);
    }

    protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
        ((PulsarService)Mockito.doReturn((Object)this.mockZooKeeperClientFactory).when((Object)pulsar)).getZooKeeperClientFactory();
        ((PulsarService)Mockito.doReturn((Object)this.mockBookKeeperClientFactory).when((Object)pulsar)).newBookKeeperClientFactory();
        ((PulsarService)Mockito.doReturn((Object)this.createLocalMetadataStore()).when((Object)pulsar)).createLocalMetadataStore();
        ((PulsarService)Mockito.doReturn((Object)this.createConfigurationMetadataStore()).when((Object)pulsar)).createConfigurationMetadataStore();
        Supplier<NamespaceService> namespaceServiceSupplier = () -> BrokerTestUtil.spyWithClassAndConstructorArgs(NamespaceService.class, pulsar);
        ((PulsarService)Mockito.doReturn(namespaceServiceSupplier).when((Object)pulsar)).getNamespaceServiceProvider();
        ((PulsarService)Mockito.doReturn((Object)((Object)this.sameThreadOrderedSafeExecutor)).when((Object)pulsar)).getOrderedExecutor();
        ((PulsarService)Mockito.doReturn((Object)new CounterBrokerInterceptor()).when((Object)pulsar)).getBrokerInterceptor();
        ((PulsarService)Mockito.doAnswer(invocation -> Mockito.spy((Object)invocation.callRealMethod())).when((Object)pulsar)).newCompactor();
        if (this.enableBrokerInterceptor) {
            this.mockConfigBrokerInterceptors(pulsar);
        }
    }

    protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
        return new ZKMetadataStore((ZooKeeper)this.mockZooKeeper);
    }

    protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
        return new ZKMetadataStore((ZooKeeper)this.mockZooKeeperGlobal);
    }

    private void mockConfigBrokerInterceptors(PulsarService pulsarService) {
        ServiceConfiguration configuration = (ServiceConfiguration)Mockito.spy((Object)this.conf);
        Set mockBrokerInterceptors = (Set)Mockito.mock(Set.class);
        Mockito.when((Object)mockBrokerInterceptors.isEmpty()).thenReturn((Object)false);
        Mockito.when((Object)configuration.getBrokerInterceptors()).thenReturn((Object)mockBrokerInterceptors);
        Mockito.when((Object)pulsarService.getConfig()).thenReturn((Object)configuration);
    }

    protected void waitForZooKeeperWatchers() {
        try {
            Thread.sleep(3000L);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    protected TenantInfoImpl createDefaultTenantInfo() throws PulsarAdminException {
        if (!this.admin.clusters().getClusters().contains(configClusterName)) {
            this.admin.clusters().createCluster(configClusterName, ClusterData.builder().build());
        }
        HashSet allowedClusters = Sets.newHashSet();
        allowedClusters.add(configClusterName);
        return new TenantInfoImpl((Set)Sets.newHashSet(), (Set)allowedClusters);
    }

    public static MockZooKeeper createMockZooKeeper() throws Exception {
        MockZooKeeper zk = MockZooKeeper.newInstance((ExecutorService)MoreExecutors.newDirectExecutorService());
        ArrayList dummyAclList = new ArrayList(0);
        ZkUtils.createFullPathOptimistic((ZooKeeper)zk, (String)"/ledgers/available/192.168.1.1:5000", (byte[])"".getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), dummyAclList, (CreateMode)CreateMode.PERSISTENT);
        zk.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), dummyAclList, CreateMode.PERSISTENT);
        return zk;
    }

    public static MockZooKeeper createMockZooKeeperGlobal() {
        return MockZooKeeper.newInstanceForGlobalZK((ExecutorService)MoreExecutors.newDirectExecutorService());
    }

    public static NonClosableMockBookKeeper createMockBookKeeper(OrderedExecutor executor) throws Exception {
        return BrokerTestUtil.spyWithClassAndConstructorArgs(NonClosableMockBookKeeper.class, executor);
    }

    public static boolean retryStrategically(Predicate<Void> predicate, int retryCount, long intSleepTimeInMillis) throws Exception {
        for (int i = 0; i < retryCount; ++i) {
            if (predicate.test(null) || i == retryCount - 1) {
                return true;
            }
            Thread.sleep(intSleepTimeInMillis + intSleepTimeInMillis * (long)i);
        }
        return false;
    }

    public static void setFieldValue(Class<?> clazz, Object classObj, String fieldName, Object fieldValue) throws Exception {
        Field field = clazz.getDeclaredField(fieldName);
        field.setAccessible(true);
        field.set(classObj, fieldValue);
    }

    protected static ServiceConfiguration getDefaultConf() {
        ServiceConfiguration configuration = new ServiceConfiguration();
        configuration.setAdvertisedAddress("localhost");
        configuration.setClusterName(configClusterName);
        configuration.setManagedLedgerCacheSizeMB(8);
        configuration.setActiveConsumerFailoverDelayTimeMillis(0);
        configuration.setDefaultNumberOfNamespaceBundles(1);
        configuration.setZookeeperServers("localhost:2181");
        configuration.setConfigurationStoreServers("localhost:3181");
        configuration.setAllowAutoTopicCreationType("non-partitioned");
        configuration.setBrokerShutdownTimeoutMs(0L);
        configuration.setBrokerServicePort(Optional.of(0));
        configuration.setBrokerServicePortTls(Optional.of(0));
        configuration.setWebServicePort(Optional.of(0));
        configuration.setWebServicePortTls(Optional.of(0));
        configuration.setBookkeeperClientExposeStatsToPrometheus(true);
        configuration.setNumExecutorThreadPoolSize(5);
        configuration.setBrokerMaxConnections(0);
        configuration.setBrokerMaxConnectionsPerIp(0);
        return configuration;
    }

    public static class NonClosableMockBookKeeper
    extends PulsarMockBookKeeper {
        public NonClosableMockBookKeeper(OrderedExecutor executor) throws Exception {
            super(executor);
        }

        public void close() {
        }

        public void shutdown() {
        }

        public void reallyShutdown() {
            super.shutdown();
        }
    }
}

