package org.springframework.cloud.stream.binder.kafka;

import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Properties;
import java.util.UUID;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.assertj.core.api.Assertions;
import org.eclipse.jetty.server.Server;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.Spy;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.test.core.BrokerAddress;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

/* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/Kafka_0_10_1_BinderTests.class */
public class Kafka_0_10_1_BinderTests extends Kafka_0_10_2_BinderTests {

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10, new String[0]);
    private Kafka10TestBinder binder;
    private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class.getSimpleName();
    private final Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();

    protected void binderBindUnbindLatency() throws InterruptedException {
        Thread.sleep(500L);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: getBinder, reason: merged with bridge method [inline-methods] */
    public Kafka10TestBinder m1getBinder() {
        if (this.binder == null) {
            KafkaBinderConfigurationProperties createConfigurationProperties = createConfigurationProperties();
            createConfigurationProperties.setHeaders(new String[]{"dlqTestHeader"});
            this.binder = new Kafka10TestBinder(createConfigurationProperties);
        }
        return this.binder;
    }

    protected KafkaBinderConfigurationProperties createConfigurationProperties() {
        KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties = new KafkaBinderConfigurationProperties();
        BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();
        ArrayList arrayList = new ArrayList();
        for (BrokerAddress brokerAddress : brokerAddresses) {
            arrayList.add(brokerAddress.toString());
        }
        kafkaBinderConfigurationProperties.setBrokers((String[]) arrayList.toArray(new String[arrayList.size()]));
        kafkaBinderConfigurationProperties.setZkNodes(new String[]{embeddedKafka.getZookeeperConnectionString()});
        return kafkaBinderConfigurationProperties;
    }

    protected int partitionSize(String str) {
        return consumerFactory().createConsumer().partitionsFor(str).size();
    }

    protected ZkUtils getZkUtils(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
        return new ZkUtils(new ZkClient(kafkaBinderConfigurationProperties.getZkConnectionString(), kafkaBinderConfigurationProperties.getZkSessionTimeout(), kafkaBinderConfigurationProperties.getZkConnectionTimeout(), ZKStringSerializer$.MODULE$), (ZkConnection) null, false);
    }

    protected void invokeCreateTopic(ZkUtils zkUtils, String str, int i, int i2, Properties properties) {
        this.adminUtilsOperation.invokeCreateTopic(zkUtils, str, i, i2, new Properties());
    }

    protected int invokePartitionSize(String str, ZkUtils zkUtils) {
        return this.adminUtilsOperation.partitionSize(str, zkUtils);
    }

    public String getKafkaOffsetHeaderKey() {
        return "kafka_offset";
    }

    protected Binder getBinder(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
        return new Kafka10TestBinder(kafkaBinderConfigurationProperties);
    }

    @Before
    public void init() {
        String str = System.getenv("KAFKA_TIMEOUT_MULTIPLIER");
        if (str != null) {
            this.timeoutMultiplier = Double.parseDouble(str);
        }
    }

    protected boolean usesExplicitRouting() {
        return false;
    }

    protected String getClassUnderTestName() {
        return this.CLASS_UNDER_TEST_NAME;
    }

    public Spy spyOn(String str) {
        throw new UnsupportedOperationException("'spyOn' is not used by Kafka tests");
    }

    private ConsumerFactory<byte[], byte[]> consumerFactory() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", createConfigurationProperties().getKafkaConnectionString());
        hashMap.put("enable.auto.commit", false);
        hashMap.put("group.id", "TEST-CONSUMER-GROUP");
        return new DefaultKafkaConsumerFactory(hashMap, new ByteArrayDeserializer(), new ByteArrayDeserializer());
    }

    @Test
    public void testCustomAvroSerialization() throws Exception {
        KafkaBinderConfigurationProperties createConfigurationProperties = createConfigurationProperties();
        ZkUtils zkUtils = new ZkUtils(new ZkClient(createConfigurationProperties.getZkConnectionString(), createConfigurationProperties.getZkSessionTimeout(), createConfigurationProperties.getZkConnectionTimeout(), ZKStringSerializer$.MODULE$), (ZkConnection) null, false);
        HashMap hashMap = new HashMap();
        hashMap.put("kafkastore.connection.url", createConfigurationProperties.getZkConnectionString());
        hashMap.put("listeners", "http://0.0.0.0:8082");
        hashMap.put("port", "8082");
        hashMap.put("kafkastore.topic", "_schemas");
        Server createServer = new SchemaRegistryRestApplication(new SchemaRegistryConfig(hashMap)).createServer();
        createServer.start();
        long currentTimeMillis = System.currentTimeMillis() + 5000;
        while (!createServer.isRunning()) {
            if (System.currentTimeMillis() > currentTimeMillis) {
                Assertions.fail("Kafka Schema Registry Server failed to start");
            }
        }
        User1 user1 = new User1();
        String str = "foo-name" + UUID.randomUUID().toString();
        String str2 = "foo-color" + UUID.randomUUID().toString();
        user1.setName(str);
        user1.setFavoriteColor(str2);
        Message build = MessageBuilder.withPayload(user1).build();
        DirectChannel directChannel = new DirectChannel();
        String str3 = "existing" + System.currentTimeMillis();
        invokeCreateTopic(zkUtils, str3, 6, 1, new Properties());
        createConfigurationProperties.setAutoAddPartitions(true);
        Binder binder = getBinder(createConfigurationProperties);
        QueueChannel queueChannel = new QueueChannel();
        ExtendedProducerProperties createProducerProperties = createProducerProperties();
        ((KafkaProducerProperties) createProducerProperties.getExtension()).getConfiguration().put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        ((KafkaProducerProperties) createProducerProperties.getExtension()).getConfiguration().put("schema.registry.url", "http://localhost:8082");
        createProducerProperties.setUseNativeEncoding(true);
        Binding bindProducer = binder.bindProducer(str3, directChannel, createProducerProperties);
        ExtendedConsumerProperties createConsumerProperties = createConsumerProperties();
        ((KafkaConsumerProperties) createConsumerProperties.getExtension()).setAutoRebalanceEnabled(false);
        ((KafkaConsumerProperties) createConsumerProperties.getExtension()).getConfiguration().put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        ((KafkaConsumerProperties) createConsumerProperties.getExtension()).getConfiguration().put("schema.registry.url", "http://localhost:8082");
        Binding bindConsumer = binder.bindConsumer(str3, "test", queueChannel, createConsumerProperties);
        binderBindUnbindLatency();
        directChannel.send(build);
        Assertions.assertThat(receive(queueChannel)).isNotNull();
        Assert.assertTrue(build.getPayload() instanceof User1);
        User1 user12 = (User1) build.getPayload();
        Assertions.assertThat(user12.getName()).isEqualTo(str);
        Assertions.assertThat(user12.getFavoriteColor()).isEqualTo(str2);
        bindProducer.unbind();
        bindConsumer.unbind();
    }
}
