package co.cask.cdap.data2.metadata.publisher;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.guice.ConfigModule;
import co.cask.cdap.common.guice.KafkaClientModule;
import co.cask.cdap.common.guice.LocationRuntimeModule;
import co.cask.cdap.common.guice.ZKClientModule;
import co.cask.cdap.common.namespace.guice.NamespaceClientRuntimeModule;
import co.cask.cdap.data.runtime.DataSetsModules;
import co.cask.cdap.data.runtime.SystemDatasetRuntimeModule;
import co.cask.cdap.data2.metadata.service.BusinessMetadataStore;
import co.cask.cdap.data2.metadata.service.DefaultBusinessMetadataStore;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.codec.NamespacedIdCodec;
import co.cask.cdap.proto.metadata.MetadataChangeRecord;
import co.cask.tephra.runtime.TransactionInMemoryModule;
import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.util.Modules;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.twill.common.Cancellable;
import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
import org.apache.twill.internal.utils.Networks;
import org.apache.twill.internal.zookeeper.InMemoryZKServer;
import org.apache.twill.kafka.client.FetchedMessage;
import org.apache.twill.kafka.client.KafkaClientService;
import org.apache.twill.kafka.client.KafkaConsumer;
import org.apache.twill.zookeeper.ZKClientService;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:co/cask/cdap/data2/metadata/publisher/MetadataKafkaTestBase.class */
public class MetadataKafkaTestBase {

    @ClassRule
    public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Id.NamespacedId.class, new NamespacedIdCodec()).create();
    private static InMemoryZKServer zkServer;
    private static EmbeddedKafkaServer kafkaServer;
    private static ZKClientService zkClient;
    protected static KafkaClientService kafkaClient;
    protected static CConfiguration cConf;
    protected static Injector injector;

    @BeforeClass
    public static void setup() throws IOException {
        int randomPort = Networks.getRandomPort();
        Preconditions.checkState(randomPort > 0, "Failed to get random port.");
        zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).setPort(Networks.getRandomPort()).build();
        zkServer.startAndWait();
        kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig(randomPort));
        kafkaServer.startAndWait();
        cConf = CConfiguration.create();
        cConf.set("metadata.updates.publish.enabled", "true");
        cConf.set("metadata.updates.kafka.broker.list", InetAddress.getLoopbackAddress().getHostAddress() + ":" + randomPort);
        cConf.unset("kafka.zookeeper.namespace");
        cConf.set("zookeeper.quorum", zkServer.getConnectionStr());
        injector = Guice.createInjector(new Module[]{new ConfigModule(cConf), new LocationRuntimeModule().getInMemoryModules(), new ZKClientModule(), new KafkaClientModule(), new TransactionInMemoryModule(), new SystemDatasetRuntimeModule().getInMemoryModules(), Modules.override(new Module[]{new DataSetsModules().getInMemoryModules(true)}).with(new Module[]{new AbstractModule() { // from class: co.cask.cdap.data2.metadata.publisher.MetadataKafkaTestBase.1
            protected void configure() {
                bind(BusinessMetadataStore.class).to(DefaultBusinessMetadataStore.class);
            }
        }}), new NamespaceClientRuntimeModule().getInMemoryModules()});
        zkClient = (ZKClientService) injector.getInstance(ZKClientService.class);
        zkClient.startAndWait();
        kafkaClient = (KafkaClientService) injector.getInstance(KafkaClientService.class);
        kafkaClient.startAndWait();
    }

    @AfterClass
    public static void teardown() {
        kafkaClient.stopAndWait();
        zkClient.stopAndWait();
        kafkaServer.stopAndWait();
        zkServer.stopAndWait();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<MetadataChangeRecord> getPublishedMetadataChanges(int i) throws InterruptedException {
        return getPublishedMetadataChanges(i, 0);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<MetadataChangeRecord> getPublishedMetadataChanges(int i, int i2) throws InterruptedException {
        String str = cConf.get("metadata.updates.kafka.topic");
        final CountDownLatch countDownLatch = new CountDownLatch(i);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        final ArrayList arrayList = new ArrayList(i);
        Cancellable consume = kafkaClient.getConsumer().prepare().addFromBeginning(str, 0).consume(new KafkaConsumer.MessageCallback() { // from class: co.cask.cdap.data2.metadata.publisher.MetadataKafkaTestBase.2
            public void onReceived(Iterator<FetchedMessage> it) {
                while (it.hasNext()) {
                    arrayList.add((MetadataChangeRecord) MetadataKafkaTestBase.GSON.fromJson(Bytes.toString(it.next().getPayload()), MetadataChangeRecord.class));
                    countDownLatch.countDown();
                }
            }

            public void finished() {
                countDownLatch2.countDown();
            }
        });
        Assert.assertTrue(countDownLatch.await(15L, TimeUnit.SECONDS));
        consume.cancel();
        Assert.assertTrue(countDownLatch2.await(15L, TimeUnit.SECONDS));
        return arrayList;
    }

    private static Properties generateKafkaConfig(int i) throws IOException {
        Properties properties = new Properties();
        properties.setProperty("broker.id", "1");
        properties.setProperty("port", Integer.toString(i));
        properties.setProperty("num.network.threads", "2");
        properties.setProperty("num.io.threads", "2");
        properties.setProperty("socket.send.buffer.bytes", "1048576");
        properties.setProperty("socket.receive.buffer.bytes", "1048576");
        properties.setProperty("socket.request.max.bytes", "104857600");
        properties.setProperty("log.dir", TMP_FOLDER.newFolder().getAbsolutePath());
        properties.setProperty("num.partitions", "1");
        properties.setProperty("log.flush.interval.messages", "10000");
        properties.setProperty("log.flush.interval.ms", "1000");
        properties.setProperty("log.retention.hours", "1");
        properties.setProperty("log.segment.bytes", "536870912");
        properties.setProperty("log.cleanup.interval.mins", "1");
        properties.setProperty("zookeeper.connect", zkServer.getConnectionStr());
        properties.setProperty("zookeeper.connection.timeout.ms", "1000000");
        return properties;
    }
}
