public class KafkaMessageBus extends MessageBusSupport
| Stream definition | Kafka topic | Kafka partitions | Notes |
|---|---|---|---|
| foo = "http | log" | foo.0 | 1 partition | 1 producer, 1 consumer |
| foo = "http | log", log.count=x | foo.0 | x partitions | 1 producer, x consumers with static group 'springXD', achieves queue semantics |
| foo = "http | log", log.count=x + XD partitioning | still 1 topic 'foo.0' | x partitions + use key computed by XD | 1 producer, x consumers with static group 'springXD', achieves queue semantics |
| foo = "http | log", log.count=x, concurrency=y | foo.0 | x*y partitions | 1 producer, x XD consumers, each with y threads |
| foo = "http | log", log.count=0, x actual log containers | foo.0 | 10(configurable) partitions | 1 producer, x XD consumers. Can't know the number of partitions beforehand, so decide a number that better be greater than number of containers |
MessageBusSupport.DirectHandler, MessageBusSupport.PartitioningMetadata, MessageBusSupport.SetBuilder, MessageBusSupport.SharedChannelProvider<T extends org.springframework.messaging.MessageChannel>| Modifier and Type | Field and Description |
|---|---|
static java.lang.String |
COMPRESSION_CODEC |
protected static java.util.Set<java.lang.Object> |
PRODUCER_COMPRESSION_PROPERTIES |
static java.lang.String |
REQUIRED_ACKS |
static org.I0Itec.zkclient.serialize.ZkSerializer |
utf8Serializer
Used when writing directly to ZK.
|
CONSUMER_RETRY_PROPERTIES, defaultBackOffInitialInterval, defaultBackOffMaxInterval, defaultBackOffMultiplier, defaultBatchBufferLimit, defaultBatchingEnabled, defaultBatchSize, defaultBatchTimeout, defaultCompress, defaultConcurrency, defaultMaxAttempts, directChannelProvider, evaluationContext, JOB_CHANNEL_TYPE_PREFIX, logger, MEDIATYPES_MEDIATYPE_ALL, ORIGINAL_CONTENT_TYPE_HEADER, P2P_NAMED_CHANNEL_TYPE_PREFIX, PARTITION_HEADER, PRODUCER_BATCHING_ADVANCED_PROPERTIES, PRODUCER_BATCHING_BASIC_PROPERTIES, PRODUCER_PARTITIONING_PROPERTIES, PRODUCER_STANDARD_PROPERTIES, PUBSUB_NAMED_CHANNEL_TYPE_PREFIX| Constructor and Description |
|---|
KafkaMessageBus(java.lang.String brokers,
java.lang.String zkAddress,
MultiTypeCodec<java.lang.Object> codec,
java.lang.String... headersToMap) |
| Modifier and Type | Method and Description |
|---|---|
void |
bindConsumer(java.lang.String name,
org.springframework.messaging.MessageChannel moduleInputChannel,
java.util.Properties properties)
Bind a message consumer on a p2p channel
|
void |
bindProducer(java.lang.String name,
org.springframework.messaging.MessageChannel moduleOutputChannel,
java.util.Properties properties)
Bind a message producer on a p2p channel.
|
void |
bindPubSubConsumer(java.lang.String name,
org.springframework.messaging.MessageChannel inputChannel,
java.util.Properties properties)
Bind a message consumer on a pub/sub channel
|
void |
bindPubSubProducer(java.lang.String name,
org.springframework.messaging.MessageChannel outputChannel,
java.util.Properties properties)
Bind a message producer on a pub/sub channel.
|
void |
bindReplier(java.lang.String name,
org.springframework.messaging.MessageChannel requests,
org.springframework.messaging.MessageChannel replies,
java.util.Properties properties)
Bind a consumer that handles requests from a requestor and asynchronously sends replies.
|
void |
bindRequestor(java.lang.String name,
org.springframework.messaging.MessageChannel requests,
org.springframework.messaging.MessageChannel replies,
java.util.Properties properties)
Bind a producer that expects async replies.
|
void |
setDefaultCompressionCodec(java.lang.String defaultCompressionCodec) |
void |
setDefaultReplicationFactor(int defaultReplicationFactor) |
void |
setDefaultRequiredAcks(int defaultRequiredAcks) |
addBinding, afterPropertiesSet, bindDynamicProducer, bindDynamicPubSubProducer, bindExistingProducerDirectlyIfPossible, bindNewProducerDirectlyIfPossible, buildPartitionRoutingExpression, buildRetryTemplateIfRetryEnabled, deleteBinding, deleteBindings, deserializePayloadIfNecessary, determinePartition, doBindDynamicProducer, doBindDynamicPubSubProducer, getApplicationContext, getBeanFactory, getIdGenerator, isNamedChannel, onInit, serializePayloadIfNecessary, setApplicationContext, setCodec, setDefaultBackOffInitialInterval, setDefaultBackOffMaxInterval, setDefaultBackOffMultiplier, setDefaultBatchBufferLimit, setDefaultBatchingEnabled, setDefaultBatchSize, setDefaultBatchTimeout, setDefaultCompress, setDefaultConcurrency, setDefaultMaxAttempts, setIntegrationEvaluationContext, setPartitionSelector, stopBindings, unbindConsumer, unbindConsumers, unbindProducer, unbindProducers, validateConsumerProperties, validateProducerPropertiespublic static final java.lang.String COMPRESSION_CODEC
public static final java.lang.String REQUIRED_ACKS
public static final org.I0Itec.zkclient.serialize.ZkSerializer utf8Serializer
protected static final java.util.Set<java.lang.Object> PRODUCER_COMPRESSION_PROPERTIES
public KafkaMessageBus(java.lang.String brokers,
java.lang.String zkAddress,
MultiTypeCodec<java.lang.Object> codec,
java.lang.String... headersToMap)
public void setDefaultReplicationFactor(int defaultReplicationFactor)
public void setDefaultCompressionCodec(java.lang.String defaultCompressionCodec)
public void setDefaultRequiredAcks(int defaultRequiredAcks)
public void bindConsumer(java.lang.String name,
org.springframework.messaging.MessageChannel moduleInputChannel,
java.util.Properties properties)
MessageBusname - the logical identity of the message sourcemoduleInputChannel - the channel bound as a consumerproperties - arbitrary String key/value pairs that will be used in the bindingpublic void bindPubSubConsumer(java.lang.String name,
org.springframework.messaging.MessageChannel inputChannel,
java.util.Properties properties)
MessageBusname - the logical identity of the message sourceinputChannel - the channel bound as a pub/sub consumerproperties - arbitrary String key/value pairs that will be used in the bindingpublic void bindProducer(java.lang.String name,
org.springframework.messaging.MessageChannel moduleOutputChannel,
java.util.Properties properties)
MessageBusname - the logical identity of the message targetmoduleOutputChannel - the channel bound as a producerproperties - arbitrary String key/value pairs that will be used in the bindingpublic void bindPubSubProducer(java.lang.String name,
org.springframework.messaging.MessageChannel outputChannel,
java.util.Properties properties)
MessageBusname - the logical identity of the message targetoutputChannel - the channel bound as a producerproperties - arbitrary String key/value pairs that will be used in the bindingpublic void bindRequestor(java.lang.String name,
org.springframework.messaging.MessageChannel requests,
org.springframework.messaging.MessageChannel replies,
java.util.Properties properties)
MessageBusname - The name of the requestor.requests - The request channel - sends requests.replies - The reply channel - receives replies.properties - arbitrary String key/value pairs that will be used in the binding.public void bindReplier(java.lang.String name,
org.springframework.messaging.MessageChannel requests,
org.springframework.messaging.MessageChannel replies,
java.util.Properties properties)
MessageBusname - The name of the requestor for which this replier will handle requests.requests - The request channel - receives requests.replies - The reply channel - sends replies.properties - arbitrary String key/value pairs that will be used in the binding.