package co.cask.cdap.kafka;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.guice.ConfigModule;
import co.cask.cdap.common.guice.KafkaClientModule;
import co.cask.cdap.common.guice.ZKClientModule;
import co.cask.cdap.common.utils.Tasks;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.twill.common.Cancellable;
import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
import org.apache.twill.internal.kafka.client.ZKBrokerService;
import org.apache.twill.internal.utils.Networks;
import org.apache.twill.internal.zookeeper.InMemoryZKServer;
import org.apache.twill.kafka.client.BrokerService;
import org.apache.twill.kafka.client.Compression;
import org.apache.twill.kafka.client.FetchedMessage;
import org.apache.twill.kafka.client.KafkaClientService;
import org.apache.twill.kafka.client.KafkaConsumer;
import org.apache.twill.kafka.client.KafkaPublisher;
import org.apache.twill.zookeeper.ZKClientService;
import org.junit.Assert;
import org.junit.rules.ExternalResource;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/kafka/KafkaTester.class */
public class KafkaTester extends ExternalResource {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTester.class);
    private static final Gson GSON = new Gson();
    private Injector injector;
    private InMemoryZKServer zkServer;
    private EmbeddedKafkaServer kafkaServer;
    private ZKClientService zkClient;
    private BrokerService brokerService;
    private KafkaClientService kafkaClient;
    private final Map<String, String> extraConfigs;
    private final Iterable<Module> extraModules;
    private final CConfiguration cConf;
    private final String[] kafkaBrokerListParams;
    private final TemporaryFolder tmpFolder;
    private final int numPartitions;

    public KafkaTester() {
        this(ImmutableMap.of(), ImmutableList.of(), 1, new String[0]);
    }

    public KafkaTester(Map<String, String> map, Iterable<Module> iterable, int i, String... strArr) {
        this.extraConfigs = map;
        this.extraModules = iterable;
        this.cConf = CConfiguration.create();
        this.kafkaBrokerListParams = strArr;
        this.tmpFolder = new TemporaryFolder();
        this.numPartitions = i;
    }

    protected void before() throws Throwable {
        int randomPort = Networks.getRandomPort();
        Preconditions.checkState(randomPort > 0, "Failed to get random port.");
        int randomPort2 = Networks.getRandomPort();
        Preconditions.checkState(randomPort2 > 0, "Failed to get random port.");
        this.tmpFolder.create();
        this.zkServer = InMemoryZKServer.builder().setDataDir(this.tmpFolder.newFolder()).setPort(randomPort2).build();
        this.zkServer.startAndWait();
        this.kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig(randomPort));
        this.kafkaServer.startAndWait();
        initializeCConf(randomPort);
        this.injector = createInjector();
        this.zkClient = (ZKClientService) this.injector.getInstance(ZKClientService.class);
        this.zkClient.startAndWait();
        this.kafkaClient = (KafkaClientService) this.injector.getInstance(KafkaClientService.class);
        this.kafkaClient.startAndWait();
        this.brokerService = new ZKBrokerService(this.zkClient);
        this.brokerService.startAndWait();
        LOG.info("Waiting for Kafka server to startup...");
        waitForKafkaStartup();
        LOG.info("Started kafka server on port {}", Integer.valueOf(randomPort));
    }

    protected void after() {
        this.brokerService.stopAndWait();
        this.kafkaClient.stopAndWait();
        this.zkClient.stopAndWait();
        this.kafkaServer.stopAndWait();
        this.zkServer.stopAndWait();
    }

    private void initializeCConf(int i) throws IOException {
        this.cConf.unset("kafka.zookeeper.namespace");
        this.cConf.set("local.data.dir", this.tmpFolder.newFolder().getAbsolutePath());
        this.cConf.set("zookeeper.quorum", this.zkServer.getConnectionStr());
        for (Map.Entry<String, String> entry : this.extraConfigs.entrySet()) {
            this.cConf.set(entry.getKey(), entry.getValue());
        }
        for (String str : this.kafkaBrokerListParams) {
            this.cConf.set(str, InetAddress.getLoopbackAddress().getHostAddress() + ":" + i);
        }
    }

    private Injector createInjector() throws IOException {
        return Guice.createInjector(ImmutableList.builder().add(new ConfigModule(this.cConf)).add(new ZKClientModule()).add(new KafkaClientModule()).addAll(this.extraModules).build());
    }

    private Properties generateKafkaConfig(int i) throws IOException {
        Properties properties = new Properties();
        properties.setProperty("broker.id", "1");
        properties.setProperty("port", Integer.toString(i));
        properties.setProperty("num.network.threads", "2");
        properties.setProperty("num.io.threads", "2");
        properties.setProperty("socket.send.buffer.bytes", "1048576");
        properties.setProperty("socket.receive.buffer.bytes", "1048576");
        properties.setProperty("socket.request.max.bytes", "104857600");
        properties.setProperty("log.dir", this.tmpFolder.newFolder().getAbsolutePath());
        properties.setProperty("num.partitions", String.valueOf(this.numPartitions));
        properties.setProperty("log.flush.interval.messages", "10000");
        properties.setProperty("log.flush.interval.ms", "1000");
        properties.setProperty("log.retention.hours", "1");
        properties.setProperty("log.segment.bytes", "536870912");
        properties.setProperty("log.cleanup.interval.mins", "1");
        properties.setProperty("zookeeper.connect", this.zkServer.getConnectionStr());
        properties.setProperty("zookeeper.connection.timeout.ms", "1000000");
        return properties;
    }

    private void waitForKafkaStartup() throws Exception {
        Tasks.waitFor(true, new Callable<Boolean>() { // from class: co.cask.cdap.kafka.KafkaTester.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                try {
                    KafkaTester.this.kafkaClient.getPublisher(KafkaPublisher.Ack.LEADER_RECEIVED, Compression.NONE).prepare("kafkatester.test.topic").add(Charsets.UTF_8.encode("Test Message"), 0).send().get();
                    KafkaTester.this.getPublishedMessages("kafkatester.test.topic", (Set<Integer>) ImmutableSet.of(0), 1, 0, new Function<FetchedMessage, String>() { // from class: co.cask.cdap.kafka.KafkaTester.1.1
                        public String apply(FetchedMessage fetchedMessage) {
                            if (!Charsets.UTF_8.decode(fetchedMessage.getPayload()).toString().equalsIgnoreCase("Test Message")) {
                                return "";
                            }
                            atomicBoolean.set(true);
                            return "";
                        }
                    });
                } catch (Exception e) {
                }
                return Boolean.valueOf(atomicBoolean.get());
            }
        }, 60L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
    }

    public Injector getInjector() {
        return this.injector;
    }

    public CConfiguration getCConf() {
        return this.cConf;
    }

    public <T> List<T> getPublishedMessages(String str, int i, Type type) throws InterruptedException {
        return getPublishedMessages(str, i, type, GSON);
    }

    public <T> List<T> getPublishedMessages(String str, int i, Type type, Gson gson) throws InterruptedException {
        return getPublishedMessages(str, i, type, gson, 0);
    }

    public <T> List<T> getPublishedMessages(String str, int i, final Type type, final Gson gson, int i2) throws InterruptedException {
        return getPublishedMessages(str, (Set<Integer>) ImmutableSet.of(0), i, i2, new Function<FetchedMessage, T>() { // from class: co.cask.cdap.kafka.KafkaTester.2
            public T apply(FetchedMessage fetchedMessage) {
                return (T) gson.fromJson(Bytes.toString(fetchedMessage.getPayload()), type);
            }
        });
    }

    public <T> List<T> getPublishedMessages(String str, Set<Integer> set, int i, int i2, final Function<FetchedMessage, T> function) throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(i);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        final ArrayList arrayList = new ArrayList(i);
        KafkaConsumer.Preparer prepare = this.kafkaClient.getConsumer().prepare();
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            prepare.add(str, it.next().intValue(), i2);
        }
        Cancellable consume = prepare.consume(new KafkaConsumer.MessageCallback() { // from class: co.cask.cdap.kafka.KafkaTester.3
            public void onReceived(Iterator<FetchedMessage> it2) {
                while (it2.hasNext()) {
                    arrayList.add(function.apply(it2.next()));
                    countDownLatch.countDown();
                }
            }

            public void finished() {
                countDownLatch2.countDown();
            }
        });
        Assert.assertTrue(String.format("Expected %d messages but found %d messages", Integer.valueOf(i), Integer.valueOf(arrayList.size())), countDownLatch.await(15L, TimeUnit.SECONDS));
        consume.cancel();
        Assert.assertTrue(countDownLatch2.await(15L, TimeUnit.SECONDS));
        return arrayList;
    }
}
