package org.springframework.xd.dirt.integration.kafka;

import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.admin.AdminUtils;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.producer.DefaultPartitioner;
import kafka.producer.ProducerConfig;
import kafka.serializer.DefaultDecoder;
import kafka.serializer.DefaultEncoder;
import kafka.utils.VerifiableProperties;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.springframework.context.Lifecycle;
import org.springframework.http.MediaType;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.kafka.support.ProducerConfiguration;
import org.springframework.integration.kafka.support.ProducerMetadata;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.xd.dirt.integration.bus.AbstractBusPropertiesAccessor;
import org.springframework.xd.dirt.integration.bus.Binding;
import org.springframework.xd.dirt.integration.bus.BusProperties;
import org.springframework.xd.dirt.integration.bus.EmbeddedHeadersMessageConverter;
import org.springframework.xd.dirt.integration.bus.MessageBusSupport;
import org.springframework.xd.dirt.integration.bus.serializer.MultiTypeCodec;

/* loaded from: input_file:org/springframework/xd/dirt/integration/kafka/KafkaMessageBus.class */
public class KafkaMessageBus extends MessageBusSupport {
    private static final String POINT_TO_POINT_SEMANTICS_CONSUMER_GROUP = "springXD";
    private String brokers;
    private String[] headersToMap;
    private String zkAddress;
    private static final String XD_REPLY_CHANNEL = "xdReplyChannel";
    private static final String[] STANDARD_HEADERS = {"correlationId", "sequenceSize", "sequenceNumber", "contentType", "originalContentType", XD_REPLY_CHANNEL};
    private static final Set<Object> SUPPORTED_CONSUMER_PROPERTIES = new MessageBusSupport.SetBuilder().add(BusProperties.PARTITION_INDEX).add(BusProperties.CONCURRENCY).build();
    private static final Set<Object> SUPPORTED_NAMED_CONSUMER_PROPERTIES = new MessageBusSupport.SetBuilder().build();
    private static final Set<Object> SUPPORTED_NAMED_PRODUCER_PROPERTIES = PRODUCER_STANDARD_PROPERTIES;
    private static final Set<Object> SUPPORTED_PRODUCER_PROPERTIES = new MessageBusSupport.SetBuilder().addAll(PRODUCER_PARTITIONING_PROPERTIES).addAll(PRODUCER_STANDARD_PROPERTIES).add(BusProperties.DIRECT_BINDING_ALLOWED).build();
    public static final ZkSerializer utf8Serializer = new ZkSerializer() { // from class: org.springframework.xd.dirt.integration.kafka.KafkaMessageBus.1
        public byte[] serialize(Object obj) throws ZkMarshallingError {
            try {
                return ((String) obj).getBytes("UTF-8");
            } catch (UnsupportedEncodingException e) {
                throw new ZkMarshallingError(e);
            }
        }

        public Object deserialize(byte[] bArr) throws ZkMarshallingError {
            try {
                return new String(bArr, "UTF-8");
            } catch (UnsupportedEncodingException e) {
                throw new ZkMarshallingError(e);
            }
        }
    };
    private final EmbeddedHeadersMessageConverter embeddedHeadersMessageConverter = new EmbeddedHeadersMessageConverter();
    private ExecutorService executor = Executors.newCachedThreadPool();
    private int replicationFactor = 1;
    private int numOfKafkaPartitionsForCountEqualsZero = 10;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/xd/dirt/integration/kafka/KafkaMessageBus$KafkaPropertiesAccessor.class */
    public class KafkaPropertiesAccessor extends AbstractBusPropertiesAccessor {
        public KafkaPropertiesAccessor(Properties properties) {
            super(properties);
        }

        public int getNumberOfKafkaPartitions() {
            if (new MessageBusSupport.PartitioningMetadata(this).isPartitionedModule()) {
                return getPartitionCount();
            }
            int property = getProperty(BusProperties.NEXT_MODULE_COUNT, 1);
            return property == 0 ? KafkaMessageBus.this.numOfKafkaPartitionsForCountEqualsZero : property;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/xd/dirt/integration/kafka/KafkaMessageBus$ReceivingHandler.class */
    public class ReceivingHandler extends AbstractReplyProducingMessageHandler implements Lifecycle {
        private ConsumerConnector connector;

        public ReceivingHandler(ConsumerConnector consumerConnector) {
            this.connector = consumerConnector;
            setBeanFactory(KafkaMessageBus.this.getBeanFactory());
        }

        /* JADX WARN: Multi-variable type inference failed */
        protected Object handleRequestMessage(Message<?> message) {
            Message message2 = message;
            try {
                message2 = KafkaMessageBus.this.embeddedHeadersMessageConverter.extractHeaders(message);
            } catch (UnsupportedEncodingException e) {
                this.logger.error("Could not convert message", e);
            }
            return KafkaMessageBus.this.deserializePayloadIfNecessary(message2);
        }

        public void start() {
        }

        public void stop() {
            this.connector.shutdown();
        }

        public boolean isRunning() {
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/xd/dirt/integration/kafka/KafkaMessageBus$SendingHandler.class */
    public class SendingHandler extends AbstractMessageHandler {
        private final MessageHandler delegate;
        private final MessageBusSupport.PartitioningMetadata partitioningMetadata;
        private final AtomicInteger roundRobinCount;
        private final String topicName;

        private SendingHandler(MessageHandler messageHandler, String str, KafkaPropertiesAccessor kafkaPropertiesAccessor) {
            this.roundRobinCount = new AtomicInteger();
            this.delegate = messageHandler;
            this.topicName = str;
            this.partitioningMetadata = new MessageBusSupport.PartitioningMetadata(kafkaPropertiesAccessor);
            setBeanFactory(KafkaMessageBus.this.getBeanFactory());
        }

        protected void handleMessageInternal(Message<?> message) throws Exception {
            HashMap hashMap = new HashMap();
            int determinePartition = this.partitioningMetadata.isPartitionedModule() ? KafkaMessageBus.this.determinePartition(message, this.partitioningMetadata) : roundRobin();
            hashMap.put("partition", Integer.valueOf(determinePartition));
            hashMap.put("messageKey", Integer.valueOf(determinePartition));
            hashMap.put("topic", this.topicName);
            Message<byte[]> embedHeaders = KafkaMessageBus.this.embeddedHeadersMessageConverter.embedHeaders(getMessageBuilderFactory().fromMessage(KafkaMessageBus.this.serializePayloadIfNecessary(message, MediaType.APPLICATION_OCTET_STREAM)).copyHeaders(hashMap).build(), KafkaMessageBus.this.headersToMap);
            Assert.isInstanceOf(byte[].class, embedHeaders.getPayload());
            this.delegate.handleMessage(embedHeaders);
        }

        private int roundRobin() {
            int incrementAndGet = this.roundRobinCount.incrementAndGet();
            if (incrementAndGet == Integer.MAX_VALUE) {
                this.roundRobinCount.set(0);
            }
            return incrementAndGet;
        }
    }

    public KafkaMessageBus(String str, String str2, MultiTypeCodec<Object> multiTypeCodec, String... strArr) {
        this.brokers = str;
        this.zkAddress = str2;
        setCodec(multiTypeCodec);
        if (strArr.length <= 0) {
            this.headersToMap = STANDARD_HEADERS;
            return;
        }
        String[] strArr2 = (String[]) Arrays.copyOfRange(STANDARD_HEADERS, 0, STANDARD_HEADERS.length + strArr.length);
        System.arraycopy(strArr, 0, strArr2, STANDARD_HEADERS.length, strArr.length);
        this.headersToMap = strArr2;
    }

    public void setReplicationFactor(int i) {
        this.replicationFactor = i;
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBus
    public void bindConsumer(String str, MessageChannel messageChannel, Properties properties) {
        createKafkaConsumer(str, messageChannel, properties, createConsumerConnector(POINT_TO_POINT_SEMANTICS_CONSUMER_GROUP, new String[0]));
        bindExistingProducerDirectlyIfPossible(str, messageChannel);
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBus
    public void bindPubSubConsumer(String str, MessageChannel messageChannel, Properties properties) {
        createKafkaConsumer(str, messageChannel, properties, createConsumerConnector(UUID.randomUUID().toString(), new String[0]));
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBus
    public void bindProducer(String str, MessageChannel messageChannel, Properties properties) {
        Assert.isInstanceOf(SubscribableChannel.class, messageChannel);
        KafkaPropertiesAccessor kafkaPropertiesAccessor = new KafkaPropertiesAccessor(properties);
        if (str.startsWith("queue:")) {
            validateProducerProperties(str, properties, SUPPORTED_NAMED_PRODUCER_PROPERTIES);
        } else {
            validateProducerProperties(str, properties, SUPPORTED_PRODUCER_PROPERTIES);
        }
        if (bindNewProducerDirectlyIfPossible(str, (SubscribableChannel) messageChannel, kafkaPropertiesAccessor)) {
            return;
        }
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Using kafka topic for outbound: " + str);
        }
        String escapeTopicName = escapeTopicName(str);
        ensureTopicCreated(escapeTopicName, kafkaPropertiesAccessor.getNumberOfKafkaPartitions(), this.replicationFactor);
        Properties properties2 = new Properties();
        properties2.put("metadata.broker.list", this.brokers);
        properties2.put("serializer.class", DefaultEncoder.class.getName());
        properties2.put("key.serializer.class", IntegerEncoderDecoder.class.getName());
        properties2.put("partitioner.class", DefaultPartitioner.class.getName());
        properties2.put("request.required.acks", "1");
        ProducerConfig producerConfig = new ProducerConfig(properties2);
        ProducerMetadata producerMetadata = new ProducerMetadata(escapeTopicName);
        producerMetadata.setValueEncoder(new DefaultEncoder((VerifiableProperties) null));
        producerMetadata.setValueClassType(byte[].class);
        producerMetadata.setKeyEncoder(new IntegerEncoderDecoder(null));
        producerMetadata.setKeyClassType(Integer.class);
        final ProducerConfiguration producerConfiguration = new ProducerConfiguration(producerMetadata, new Producer(producerConfig));
        AbstractMessageHandler abstractMessageHandler = new AbstractMessageHandler() { // from class: org.springframework.xd.dirt.integration.kafka.KafkaMessageBus.2
            protected void handleMessageInternal(Message<?> message) throws Exception {
                producerConfiguration.send(message);
            }
        };
        Assert.isInstanceOf(SubscribableChannel.class, messageChannel);
        EventDrivenConsumer eventDrivenConsumer = new EventDrivenConsumer((SubscribableChannel) messageChannel, new SendingHandler(abstractMessageHandler, escapeTopicName, kafkaPropertiesAccessor));
        eventDrivenConsumer.setBeanFactory(getBeanFactory());
        eventDrivenConsumer.setBeanName("outbound." + str);
        eventDrivenConsumer.afterPropertiesSet();
        Binding forProducer = Binding.forProducer(str, messageChannel, eventDrivenConsumer, kafkaPropertiesAccessor);
        addBinding(forProducer);
        forProducer.start();
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBus
    public void bindPubSubProducer(String str, MessageChannel messageChannel, Properties properties) {
        bindProducer(str, messageChannel, properties);
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBus
    public void bindRequestor(String str, MessageChannel messageChannel, MessageChannel messageChannel2, Properties properties) {
        throw new UnsupportedOperationException("Auto-generated method stub");
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBus
    public void bindReplier(String str, MessageChannel messageChannel, MessageChannel messageChannel2, Properties properties) {
        throw new UnsupportedOperationException("Auto-generated method stub");
    }

    private void ensureTopicCreated(String str, int i, int i2) {
        ZkClient zkClient = new ZkClient(this.zkAddress, 10000, 10000, utf8Serializer);
        AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, str, AdminUtils.assignReplicasToBrokers(ZkUtils.getSortedBrokerList(zkClient), i, i2, -1, -1), new Properties(), true);
        zkClient.close();
    }

    private void createKafkaConsumer(String str, MessageChannel messageChannel, Properties properties, ConsumerConnector consumerConnector) {
        if (str.startsWith("queue:")) {
            validateConsumerProperties(str, properties, SUPPORTED_NAMED_CONSUMER_PROPERTIES);
        } else {
            validateConsumerProperties(str, properties, SUPPORTED_CONSUMER_PROPERTIES);
        }
        KafkaPropertiesAccessor kafkaPropertiesAccessor = new KafkaPropertiesAccessor(properties);
        HashMap hashMap = new HashMap();
        String escapeTopicName = escapeTopicName(str);
        hashMap.put(escapeTopicName, 1);
        final KafkaStream kafkaStream = (KafkaStream) ((List) consumerConnector.createMessageStreams(hashMap, new IntegerEncoderDecoder(), new DefaultDecoder((VerifiableProperties) null)).get(escapeTopicName)).iterator().next();
        final DirectChannel directChannel = new DirectChannel();
        ReceivingHandler receivingHandler = new ReceivingHandler(consumerConnector);
        receivingHandler.setOutputChannel(messageChannel);
        EventDrivenConsumer eventDrivenConsumer = new EventDrivenConsumer(directChannel, receivingHandler);
        eventDrivenConsumer.setBeanName("inbound." + str);
        Binding forConsumer = Binding.forConsumer(str, eventDrivenConsumer, messageChannel, kafkaPropertiesAccessor);
        addBinding(forConsumer);
        forConsumer.start();
        this.executor.submit(new Runnable() { // from class: org.springframework.xd.dirt.integration.kafka.KafkaMessageBus.3
            @Override // java.lang.Runnable
            public void run() {
                ConsumerIterator it = kafkaStream.iterator();
                while (it.hasNext()) {
                    directChannel.send(MessageBuilder.withPayload((byte[]) it.next().message()).build());
                }
            }
        });
    }

    ConsumerConnector createConsumerConnector(String str, String... strArr) {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", this.zkAddress);
        properties.put("group.id", str);
        properties.put("rebalance.backoff.ms", "2000");
        properties.put("rebalance.max.retries", "2000");
        Assert.isTrue(strArr.length % 2 == 0, "keyValues must be an even number of key/value pairs");
        for (int i = 0; i < strArr.length; i += 2) {
            properties.put(strArr[i], strArr[i + 1]);
        }
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }

    static String escapeTopicName(String str) {
        StringBuilder sb = new StringBuilder(str.length());
        try {
            for (byte b : str.getBytes("UTF-8")) {
                if ((b < 97 || b > 122) && ((b < 65 || b > 90) && !((b >= 48 && b <= 57) || b == 46 || b == 45))) {
                    sb.append(String.format("_%02X", Byte.valueOf(b)));
                } else {
                    sb.append((char) b);
                }
            }
            return sb.toString();
        } catch (UnsupportedEncodingException e) {
            throw new AssertionError(e);
        }
    }
}
