package org.springframework.cloud.stream.binder.kafka;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.admin.AdminUtils;
import kafka.api.OffsetRequest;
import kafka.serializer.DefaultDecoder;
import kafka.utils.VerifiableProperties;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.stream.binder.AbstractBinderPropertiesAccessor;
import org.springframework.cloud.stream.binder.BinderException;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter;
import org.springframework.cloud.stream.binder.MessageChannelBinderSupport;
import org.springframework.cloud.stream.binder.MessageValues;
import org.springframework.integration.channel.FixedSubscriberChannel;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.kafka.core.ConnectionFactory;
import org.springframework.integration.kafka.core.DefaultConnectionFactory;
import org.springframework.integration.kafka.core.Partition;
import org.springframework.integration.kafka.core.ZookeeperConfiguration;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.listener.Acknowledgment;
import org.springframework.integration.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.integration.kafka.listener.KafkaTopicOffsetManager;
import org.springframework.integration.kafka.listener.OffsetManager;
import org.springframework.integration.kafka.support.ProducerConfiguration;
import org.springframework.integration.kafka.support.ProducerFactoryBean;
import org.springframework.integration.kafka.support.ProducerMetadata;
import org.springframework.integration.kafka.support.ZookeeperConnect;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.class */
public class KafkaMessageChannelBinder extends MessageChannelBinderSupport {
    public static final int METADATA_VERIFICATION_RETRY_ATTEMPTS = 10;
    public static final double METADATA_VERIFICATION_RETRY_BACKOFF_MULTIPLIER = 2.0d;
    public static final int METADATA_VERIFICATION_RETRY_INITIAL_INTERVAL = 100;
    public static final int METADATA_VERIFICATION_MAX_INTERVAL = 1000;
    public static final String FETCH_SIZE = "fetchSize";
    public static final String QUEUE_SIZE = "fetchSize";
    public static final String REQUIRED_ACKS = "requiredAcks";
    public static final String AUTO_COMMIT_ENABLED = "autoCommitEnabled";
    private static final String DEFAULT_COMPRESSION_CODEC = "none";
    private static final int DEFAULT_REQUIRED_ACKS = 1;
    private static final boolean DEFAULT_AUTO_COMMIT_ENABLED = true;
    private RetryOperations retryOperations;
    private static final String POINT_TO_POINT_SEMANTICS_CONSUMER_GROUP = "springXD";
    private final ZookeeperConnect zookeeperConnect;
    private String brokers;
    private String[] headersToMap;
    private String zkAddress;
    private ConnectionFactory connectionFactory;
    public static final ByteArraySerializer BYTE_ARRAY_SERIALIZER = new ByteArraySerializer();
    public static final ZkSerializer utf8Serializer = new ZkSerializer() { // from class: org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.1
        public byte[] serialize(Object obj) throws ZkMarshallingError {
            try {
                return ((String) obj).getBytes("UTF-8");
            } catch (UnsupportedEncodingException e) {
                throw new ZkMarshallingError(e);
            }
        }

        public Object deserialize(byte[] bArr) throws ZkMarshallingError {
            try {
                return new String(bArr, "UTF-8");
            } catch (UnsupportedEncodingException e) {
                throw new ZkMarshallingError(e);
            }
        }
    };
    public static final String COMPRESSION_CODEC = "compressionCodec";
    protected static final Set<Object> PRODUCER_COMPRESSION_PROPERTIES = new HashSet(Arrays.asList(COMPRESSION_CODEC));
    private static final Set<Object> KAFKA_CONSUMER_PROPERTIES = new MessageChannelBinderSupport.SetBuilder().add("minPartitionCount").build();
    private static final Set<Object> SUPPORTED_CONSUMER_PROPERTIES = new MessageChannelBinderSupport.SetBuilder().addAll(CONSUMER_STANDARD_PROPERTIES).addAll(KAFKA_CONSUMER_PROPERTIES).add("partitionIndex").add("count").add("concurrency").add("fetchSize").build();
    private static final Set<Object> KAFKA_PRODUCER_PROPERTIES = new MessageChannelBinderSupport.SetBuilder().add("minPartitionCount").build();
    private static final Set<Object> SUPPORTED_NAMED_CONSUMER_PROPERTIES = new MessageChannelBinderSupport.SetBuilder().addAll(CONSUMER_STANDARD_PROPERTIES).build();
    private static final Set<Object> SUPPORTED_NAMED_PRODUCER_PROPERTIES = new MessageChannelBinderSupport.SetBuilder().addAll(PRODUCER_STANDARD_PROPERTIES).addAll(PRODUCER_BATCHING_BASIC_PROPERTIES).build();
    private static final Set<Object> SUPPORTED_PRODUCER_PROPERTIES = new MessageChannelBinderSupport.SetBuilder().addAll(PRODUCER_PARTITIONING_PROPERTIES).addAll(PRODUCER_STANDARD_PROPERTIES).add("directBindingAllowed").addAll(KAFKA_PRODUCER_PROPERTIES).addAll(PRODUCER_BATCHING_BASIC_PROPERTIES).addAll(PRODUCER_COMPRESSION_PROPERTIES).build();
    private final EmbeddedHeadersMessageConverter embeddedHeadersMessageConverter = new EmbeddedHeadersMessageConverter();
    private int defaultReplicationFactor = 1;
    private String defaultCompressionCodec = DEFAULT_COMPRESSION_CODEC;
    private int defaultRequiredAcks = 1;
    private int defaultQueueSize = 1024;
    private int defaultMaxWait = 100;
    private int defaultFetchSize = 1048576;
    private int defaultMinPartitionCount = 1;
    private String offsetStoreTopic = "SpringXdOffsets";
    private boolean defaultAutoCommitEnabled = true;
    private int socketBufferSize = 2097152;
    private int offsetStoreSegmentSize = 262144000;
    private int offsetStoreRetentionTime = 60000;
    private int offsetStoreRequiredAcks = 1;
    private int offsetStoreMaxFetchSize = 1048576;
    private int offsetStoreBatchBytes = 200;
    private int offsetStoreBatchTime = METADATA_VERIFICATION_MAX_INTERVAL;
    private int offsetUpdateTimeWindow = 10000;
    private int offsetUpdateCount = 0;
    private int offsetUpdateShutdownTimeout = 2000;
    private Mode mode = Mode.embeddedHeaders;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder$KafkaPropertiesAccessor.class */
    public class KafkaPropertiesAccessor extends AbstractBinderPropertiesAccessor {
        public KafkaPropertiesAccessor(Properties properties) {
            super(properties);
        }

        public int getNumberOfKafkaPartitionsForProducer() {
            int nextModuleCount = getNextModuleCount();
            if (nextModuleCount == 0) {
                throw new IllegalArgumentException("Module count cannot be zero");
            }
            return Math.max(getMinPartitionCount(KafkaMessageChannelBinder.this.defaultMinPartitionCount), nextModuleCount * getProperty("nextModuleConcurrency", KafkaMessageChannelBinder.this.defaultConcurrency));
        }

        public int getNumberOfKafkaPartitionsForConsumer() {
            int concurrency = getConcurrency(KafkaMessageChannelBinder.this.defaultConcurrency);
            int minPartitionCount = getMinPartitionCount(KafkaMessageChannelBinder.this.defaultMinPartitionCount);
            int count = getCount();
            if (count == 0) {
                throw new IllegalArgumentException("Module count cannot be zero");
            }
            return Math.max(minPartitionCount, count * concurrency);
        }

        public String getCompressionCodec(String str) {
            return getProperty(KafkaMessageChannelBinder.COMPRESSION_CODEC, str);
        }

        public int getRequiredAcks(int i) {
            return getProperty(KafkaMessageChannelBinder.REQUIRED_ACKS, i);
        }

        public boolean getDefaultAutoCommitEnabled(boolean z) {
            return getProperty(KafkaMessageChannelBinder.AUTO_COMMIT_ENABLED, z);
        }

        public int getMinPartitionCount(int i) {
            return getProperty("minPartitionCount", i);
        }
    }

    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder$Mode.class */
    public enum Mode {
        raw,
        embeddedHeaders
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder$ReceivingHandler.class */
    public class ReceivingHandler extends AbstractReplyProducingMessageHandler {

        /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder$ReceivingHandler$KafkaBinderHeaders.class */
        private final class KafkaBinderHeaders extends MessageHeaders {
            KafkaBinderHeaders(Map<String, Object> map) {
                super(map, MessageHeaders.ID_VALUE_NONE, -1L);
            }
        }

        public ReceivingHandler() {
            setBeanFactory(KafkaMessageChannelBinder.this.getBeanFactory());
        }

        protected Object handleRequestMessage(Message<?> message) {
            MessageValues messageValues;
            if (!Mode.embeddedHeaders.equals(KafkaMessageChannelBinder.this.mode)) {
                return message;
            }
            try {
                messageValues = KafkaMessageChannelBinder.this.embeddedHeadersMessageConverter.extractHeaders(message, true);
            } catch (Exception e) {
                this.logger.error(EmbeddedHeadersMessageConverter.decodeExceptionMessage(message), e);
                messageValues = new MessageValues(message);
            }
            MessageValues deserializePayloadIfNecessary = KafkaMessageChannelBinder.this.deserializePayloadIfNecessary(messageValues);
            return MessageBuilder.createMessage(deserializePayloadIfNecessary.getPayload(), new KafkaBinderHeaders(deserializePayloadIfNecessary));
        }

        protected boolean shouldCopyRequestHeaders() {
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder$SendingHandler.class */
    public class SendingHandler extends AbstractMessageHandler {
        private final MessageChannelBinderSupport.PartitioningMetadata partitioningMetadata;
        private final AtomicInteger roundRobinCount;
        private final String topicName;
        private final int numberOfKafkaPartitions;
        private final ProducerConfiguration<byte[], byte[]> producerConfiguration;

        private SendingHandler(String str, KafkaPropertiesAccessor kafkaPropertiesAccessor, int i, ProducerConfiguration<byte[], byte[]> producerConfiguration) {
            this.roundRobinCount = new AtomicInteger();
            this.topicName = str;
            this.numberOfKafkaPartitions = i;
            this.partitioningMetadata = new MessageChannelBinderSupport.PartitioningMetadata(kafkaPropertiesAccessor, i);
            setBeanFactory(KafkaMessageChannelBinder.this.getBeanFactory());
            this.producerConfiguration = producerConfiguration;
        }

        protected void handleMessageInternal(Message<?> message) throws Exception {
            int determinePartition = this.partitioningMetadata.isPartitionedModule() ? KafkaMessageChannelBinder.this.determinePartition(message, this.partitioningMetadata) : roundRobin() % this.numberOfKafkaPartitions;
            if (Mode.embeddedHeaders.equals(KafkaMessageChannelBinder.this.mode)) {
                this.producerConfiguration.send(this.topicName, Integer.valueOf(determinePartition), (Object) null, KafkaMessageChannelBinder.this.embeddedHeadersMessageConverter.embedHeaders(KafkaMessageChannelBinder.this.serializePayloadIfNecessary(message), KafkaMessageChannelBinder.this.headersToMap));
                return;
            }
            if (Mode.raw.equals(KafkaMessageChannelBinder.this.mode)) {
                Object obj = message.getHeaders().get("contentType");
                if (obj != null && !obj.equals("application/octet-stream")) {
                    this.logger.error("Raw mode supports only application/octet-stream content type" + message.getPayload().getClass());
                }
                if (message.getPayload() instanceof byte[]) {
                    this.producerConfiguration.send(this.topicName, Integer.valueOf(determinePartition), (Object) null, (byte[]) message.getPayload());
                } else {
                    this.logger.error("Raw mode supports only byte[] payloads but value sent was of type " + message.getPayload().getClass());
                }
            }
        }

        private int roundRobin() {
            int incrementAndGet = this.roundRobinCount.incrementAndGet();
            if (incrementAndGet == Integer.MAX_VALUE) {
                this.roundRobinCount.set(0);
            }
            return incrementAndGet;
        }
    }

    public KafkaMessageChannelBinder(ZookeeperConnect zookeeperConnect, String str, String str2, String... strArr) {
        this.zookeeperConnect = zookeeperConnect;
        this.brokers = str;
        this.zkAddress = str2;
        if (strArr.length <= 0) {
            this.headersToMap = BinderHeaders.STANDARD_HEADERS;
            return;
        }
        String[] strArr2 = (String[]) Arrays.copyOfRange(BinderHeaders.STANDARD_HEADERS, 0, BinderHeaders.STANDARD_HEADERS.length + strArr.length);
        System.arraycopy(strArr, 0, strArr2, BinderHeaders.STANDARD_HEADERS.length, strArr.length);
        this.headersToMap = strArr2;
    }

    public void setOffsetStoreTopic(String str) {
        this.offsetStoreTopic = str;
    }

    public void setOffsetStoreSegmentSize(int i) {
        this.offsetStoreSegmentSize = i;
    }

    public void setOffsetStoreRetentionTime(int i) {
        this.offsetStoreRetentionTime = i;
    }

    public void setSocketBufferSize(int i) {
        this.socketBufferSize = i;
    }

    public void setOffsetStoreRequiredAcks(int i) {
        this.offsetStoreRequiredAcks = i;
    }

    public void setOffsetStoreMaxFetchSize(int i) {
        this.offsetStoreMaxFetchSize = i;
    }

    public void setOffsetUpdateTimeWindow(int i) {
        this.offsetUpdateTimeWindow = i;
    }

    public void setOffsetUpdateCount(int i) {
        this.offsetUpdateCount = i;
    }

    public void setOffsetUpdateShutdownTimeout(int i) {
        this.offsetUpdateShutdownTimeout = i;
    }

    public void setOffsetStoreBatchBytes(int i) {
        this.offsetStoreBatchBytes = i;
    }

    public void setOffsetStoreBatchTime(int i) {
        this.offsetStoreBatchTime = i;
    }

    public ConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    public void setRetryOperations(RetryOperations retryOperations) {
        this.retryOperations = retryOperations;
    }

    public void afterPropertiesSet() throws Exception {
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(this.zookeeperConnect);
        zookeeperConfiguration.setBufferSize(this.socketBufferSize);
        zookeeperConfiguration.setMaxWait(this.defaultMaxWait);
        DefaultConnectionFactory defaultConnectionFactory = new DefaultConnectionFactory(zookeeperConfiguration);
        defaultConnectionFactory.afterPropertiesSet();
        this.connectionFactory = defaultConnectionFactory;
        if (this.retryOperations == null) {
            RetryTemplate retryTemplate = new RetryTemplate();
            SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
            simpleRetryPolicy.setMaxAttempts(10);
            retryTemplate.setRetryPolicy(simpleRetryPolicy);
            ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
            exponentialBackOffPolicy.setInitialInterval(100L);
            exponentialBackOffPolicy.setMultiplier(2.0d);
            exponentialBackOffPolicy.setMaxInterval(1000L);
            retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);
            this.retryOperations = retryTemplate;
        }
        super.afterPropertiesSet();
    }

    public static String escapeTopicName(String str) {
        StringBuilder sb = new StringBuilder(str.length());
        try {
            for (byte b : str.getBytes("UTF-8")) {
                if ((b < 97 || b > 122) && ((b < 65 || b > 90) && !((b >= 48 && b <= 57) || b == 46 || b == 45))) {
                    sb.append(String.format("_%02X", Byte.valueOf(b)));
                } else {
                    sb.append((char) b);
                }
            }
            return sb.toString();
        } catch (UnsupportedEncodingException e) {
            throw new AssertionError(e);
        }
    }

    public void setDefaultReplicationFactor(int i) {
        this.defaultReplicationFactor = i;
    }

    public void setDefaultCompressionCodec(String str) {
        this.defaultCompressionCodec = str;
    }

    public void setDefaultRequiredAcks(int i) {
        this.defaultRequiredAcks = i;
    }

    public void setDefaultAutoCommitEnabled(boolean z) {
        this.defaultAutoCommitEnabled = z;
    }

    public void setDefaultQueueSize(int i) {
        this.defaultQueueSize = i;
    }

    public void setDefaultFetchSize(int i) {
        this.defaultFetchSize = i;
    }

    public void setDefaultMinPartitionCount(int i) {
        this.defaultMinPartitionCount = i;
    }

    public void setDefaultMaxWait(int i) {
        this.defaultMaxWait = i;
    }

    public void setMode(Mode mode) {
        this.mode = mode;
    }

    public void bindConsumer(String str, MessageChannel messageChannel, Properties properties) {
        createKafkaConsumer(str, messageChannel, properties, POINT_TO_POINT_SEMANTICS_CONSUMER_GROUP, OffsetRequest.EarliestTime());
        bindExistingProducerDirectlyIfPossible(str, messageChannel);
    }

    public void bindPubSubConsumer(String str, MessageChannel messageChannel, Properties properties) {
        createKafkaConsumer(str, messageChannel, properties, UUID.randomUUID().toString(), OffsetRequest.LatestTime());
    }

    public void bindProducer(String str, MessageChannel messageChannel, Properties properties) {
        Assert.isInstanceOf(SubscribableChannel.class, messageChannel);
        KafkaPropertiesAccessor kafkaPropertiesAccessor = new KafkaPropertiesAccessor(properties);
        if (str.startsWith("queue:")) {
            validateProducerProperties(str, properties, SUPPORTED_NAMED_PRODUCER_PROPERTIES);
        } else {
            validateProducerProperties(str, properties, SUPPORTED_PRODUCER_PROPERTIES);
        }
        if (bindNewProducerDirectlyIfPossible(str, (SubscribableChannel) messageChannel, kafkaPropertiesAccessor)) {
            return;
        }
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Using kafka topic for outbound: " + str);
        }
        String escapeTopicName = escapeTopicName(str);
        Collection<Partition> ensureTopicCreated = ensureTopicCreated(escapeTopicName, kafkaPropertiesAccessor.getNumberOfKafkaPartitionsForProducer(), this.defaultReplicationFactor);
        ProducerMetadata producerMetadata = new ProducerMetadata(escapeTopicName, byte[].class, byte[].class, BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER);
        producerMetadata.setCompressionType(ProducerMetadata.CompressionType.valueOf(kafkaPropertiesAccessor.getCompressionCodec(this.defaultCompressionCodec)));
        producerMetadata.setBatchBytes(kafkaPropertiesAccessor.getBatchSize(this.defaultBatchSize));
        Properties properties2 = new Properties();
        properties2.put("acks", String.valueOf(kafkaPropertiesAccessor.getRequiredAcks(this.defaultRequiredAcks)));
        properties2.put("linger.ms", String.valueOf(kafkaPropertiesAccessor.getBatchTimeout(this.defaultBatchTimeout)));
        try {
            EventDrivenConsumer eventDrivenConsumer = new EventDrivenConsumer((SubscribableChannel) messageChannel, new SendingHandler(escapeTopicName, kafkaPropertiesAccessor, ensureTopicCreated.size(), new ProducerConfiguration(producerMetadata, new ProducerFactoryBean(producerMetadata, this.brokers, properties2).getObject())));
            eventDrivenConsumer.setBeanFactory(getBeanFactory());
            eventDrivenConsumer.setBeanName("outbound." + str);
            eventDrivenConsumer.afterPropertiesSet();
            Binding forProducer = Binding.forProducer(str, messageChannel, eventDrivenConsumer, kafkaPropertiesAccessor);
            addBinding(forProducer);
            forProducer.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void bindPubSubProducer(String str, MessageChannel messageChannel, Properties properties) {
        bindProducer(str, messageChannel, properties);
    }

    public void bindRequestor(String str, MessageChannel messageChannel, MessageChannel messageChannel2, Properties properties) {
        throw new UnsupportedOperationException("requestor binding is not supported by this binder");
    }

    public void bindReplier(String str, MessageChannel messageChannel, MessageChannel messageChannel2, Properties properties) {
        throw new UnsupportedOperationException("replier binding is not supported by this binder");
    }

    private Collection<Partition> ensureTopicCreated(final String str, final int i, int i2) {
        final ZkClient zkClient = new ZkClient(this.zkAddress, 10000, 10000, utf8Serializer);
        try {
            final Properties properties = new Properties();
            final scala.collection.Map assignReplicasToBrokers = AdminUtils.assignReplicasToBrokers(ZkUtils.getSortedBrokerList(zkClient), i, i2, -1, -1);
            this.retryOperations.execute(new RetryCallback<Object, RuntimeException>() { // from class: org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.2
                public Object doWithRetry(RetryContext retryContext) throws RuntimeException {
                    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, str, assignReplicasToBrokers, properties, true);
                    return null;
                }
            });
            try {
                Collection<Partition> collection = (Collection) this.retryOperations.execute(new RetryCallback<Collection<Partition>, Exception>() { // from class: org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.3
                    /* renamed from: doWithRetry, reason: merged with bridge method [inline-methods] */
                    public Collection<Partition> m2doWithRetry(RetryContext retryContext) throws Exception {
                        KafkaMessageChannelBinder.this.connectionFactory.refreshMetadata(Collections.singleton(str));
                        Collection<Partition> partitions = KafkaMessageChannelBinder.this.connectionFactory.getPartitions(str);
                        if (partitions.size() < i) {
                            throw new IllegalStateException("The number of expected partitions was: " + i + ", but " + partitions.size() + " have been found instead");
                        }
                        KafkaMessageChannelBinder.this.connectionFactory.getLeaders(partitions);
                        return partitions;
                    }
                });
                zkClient.close();
                return collection;
            } catch (Exception e) {
                this.logger.error("Cannot initialize Binder", e);
                throw new BinderException("Cannot initialize binder:", e);
            }
        } catch (Throwable th) {
            zkClient.close();
            throw th;
        }
    }

    private void createKafkaConsumer(String str, MessageChannel messageChannel, Properties properties, String str2, long j) {
        Collection<Partition> arrayList;
        if (str.startsWith("queue:")) {
            validateConsumerProperties(str, properties, SUPPORTED_NAMED_CONSUMER_PROPERTIES);
        } else {
            validateConsumerProperties(str, properties, SUPPORTED_CONSUMER_PROPERTIES);
        }
        KafkaPropertiesAccessor kafkaPropertiesAccessor = new KafkaPropertiesAccessor(properties);
        int concurrency = kafkaPropertiesAccessor.getConcurrency(this.defaultConcurrency);
        Collection<Partition> ensureTopicCreated = ensureTopicCreated(escapeTopicName(str), kafkaPropertiesAccessor.getNumberOfKafkaPartitionsForConsumer(), this.defaultReplicationFactor);
        DefaultDecoder defaultDecoder = new DefaultDecoder((VerifiableProperties) null);
        DefaultDecoder defaultDecoder2 = new DefaultDecoder((VerifiableProperties) null);
        int count = kafkaPropertiesAccessor.getCount();
        if (count == 1) {
            arrayList = ensureTopicCreated;
        } else {
            arrayList = new ArrayList();
            for (Partition partition : ensureTopicCreated) {
                if (kafkaPropertiesAccessor.getPartitionIndex() == -1) {
                    int sequence = kafkaPropertiesAccessor.getSequence();
                    if (count == 0) {
                        throw new IllegalArgumentException("The Kafka transport does not support 0-count modules");
                    }
                    if (partition.getId() % count == sequence - 1) {
                        arrayList.add(partition);
                    }
                } else if (partition.getId() % count == kafkaPropertiesAccessor.getPartitionIndex()) {
                    arrayList.add(partition);
                }
            }
        }
        ReceivingHandler receivingHandler = new ReceivingHandler();
        receivingHandler.setOutputChannel(messageChannel);
        FixedSubscriberChannel fixedSubscriberChannel = new FixedSubscriberChannel(receivingHandler);
        fixedSubscriberChannel.setBeanName("bridge." + str);
        final KafkaMessageListenerContainer createMessageListenerContainer = createMessageListenerContainer(kafkaPropertiesAccessor, str2, concurrency, arrayList, j);
        final KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter(createMessageListenerContainer);
        kafkaMessageDrivenChannelAdapter.setBeanFactory(getBeanFactory());
        kafkaMessageDrivenChannelAdapter.setKeyDecoder(defaultDecoder2);
        kafkaMessageDrivenChannelAdapter.setPayloadDecoder(defaultDecoder);
        kafkaMessageDrivenChannelAdapter.setOutputChannel(fixedSubscriberChannel);
        kafkaMessageDrivenChannelAdapter.setAutoCommitOffset(kafkaPropertiesAccessor.getDefaultAutoCommitEnabled(this.defaultAutoCommitEnabled));
        kafkaMessageDrivenChannelAdapter.afterPropertiesSet();
        kafkaMessageDrivenChannelAdapter.start();
        EventDrivenConsumer eventDrivenConsumer = new EventDrivenConsumer(fixedSubscriberChannel, receivingHandler) { // from class: org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.4
            protected void doStop() {
                kafkaMessageDrivenChannelAdapter.stop();
                if (createMessageListenerContainer.getOffsetManager() instanceof DisposableBean) {
                    try {
                        createMessageListenerContainer.getOffsetManager().destroy();
                    } catch (Exception e) {
                        this.logger.error("Error while closing the offset manager", e);
                    }
                }
                super.doStop();
            }
        };
        eventDrivenConsumer.setBeanName("inbound." + str);
        Binding forConsumer = Binding.forConsumer(str, eventDrivenConsumer, messageChannel, kafkaPropertiesAccessor);
        addBinding(forConsumer);
        forConsumer.start();
    }

    public KafkaMessageListenerContainer createMessageListenerContainer(Properties properties, String str, int i, String str2, long j) {
        return createMessageListenerContainer(new KafkaPropertiesAccessor(properties), str, i, str2, null, j);
    }

    private KafkaMessageListenerContainer createMessageListenerContainer(KafkaPropertiesAccessor kafkaPropertiesAccessor, String str, int i, Collection<Partition> collection, long j) {
        return createMessageListenerContainer(kafkaPropertiesAccessor, str, i, null, collection, j);
    }

    private KafkaMessageListenerContainer createMessageListenerContainer(KafkaPropertiesAccessor kafkaPropertiesAccessor, String str, int i, String str2, Collection<Partition> collection, long j) {
        Assert.isTrue(StringUtils.hasText(str2) ^ (!CollectionUtils.isEmpty(collection)), "Exactly one of topic or a list of listened partitions must be provided");
        KafkaMessageListenerContainer kafkaMessageListenerContainer = str2 != null ? new KafkaMessageListenerContainer(this.connectionFactory, new String[]{str2}) : new KafkaMessageListenerContainer(this.connectionFactory, (Partition[]) collection.toArray(new Partition[collection.size()]));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Listening to topic " + str2);
        }
        kafkaMessageListenerContainer.setConcurrency(Math.min(i, collection.size()));
        kafkaMessageListenerContainer.setOffsetManager(createOffsetManager(str, j));
        kafkaMessageListenerContainer.setQueueSize(kafkaPropertiesAccessor.getProperty("fetchSize", this.defaultQueueSize));
        kafkaMessageListenerContainer.setMaxFetch(kafkaPropertiesAccessor.getProperty("fetchSize", this.defaultFetchSize));
        return kafkaMessageListenerContainer;
    }

    private OffsetManager createOffsetManager(String str, long j) {
        try {
            KafkaTopicOffsetManager kafkaTopicOffsetManager = new KafkaTopicOffsetManager(this.zookeeperConnect, this.offsetStoreTopic, Collections.emptyMap());
            kafkaTopicOffsetManager.setConsumerId(str);
            kafkaTopicOffsetManager.setReferenceTimestamp(j);
            kafkaTopicOffsetManager.setSegmentSize(this.offsetStoreSegmentSize);
            kafkaTopicOffsetManager.setRetentionTime(this.offsetStoreRetentionTime);
            kafkaTopicOffsetManager.setRequiredAcks(this.offsetStoreRequiredAcks);
            kafkaTopicOffsetManager.setMaxSize(this.offsetStoreMaxFetchSize);
            kafkaTopicOffsetManager.setBatchBytes(this.offsetStoreBatchBytes);
            kafkaTopicOffsetManager.setMaxQueueBufferingTime(this.offsetStoreBatchTime);
            kafkaTopicOffsetManager.afterPropertiesSet();
            WindowingOffsetManager windowingOffsetManager = new WindowingOffsetManager(kafkaTopicOffsetManager);
            windowingOffsetManager.setTimespan(this.offsetUpdateTimeWindow);
            windowingOffsetManager.setCount(this.offsetUpdateCount);
            windowingOffsetManager.setShutdownTimeout(this.offsetUpdateShutdownTimeout);
            windowingOffsetManager.afterPropertiesSet();
            return windowingOffsetManager;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void doManualAck(LinkedList<MessageHeaders> linkedList) {
        Iterator<MessageHeaders> it = linkedList.iterator();
        while (it.hasNext()) {
            Acknowledgment acknowledgment = (Acknowledgment) it.next().get("kafka_acknowledgment");
            Assert.notNull(acknowledgment, "Acknowledgement shouldn't be null when acknowledging kafka message manually.");
            acknowledgment.acknowledge();
        }
    }
}
