Apache Kafka Binder
Usage
To use Apache Kafka binder, you need to add spring-cloud-stream-binder-kafka
as a dependency to your Spring Cloud Stream application, as shown in the following example for Maven:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
Alternatively, you can also use the Spring Cloud Stream Kafka Starter, as shown in the following example for Maven:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
Overview
The following image shows a simplified diagram of how the Apache Kafka binder operates:

The Apache Kafka Binder implementation maps each destination to an Apache Kafka topic. The consumer group maps directly to the same Apache Kafka concept. Partitioning also maps directly to Apache Kafka partitions as well.
The binder currently uses the Apache Kafka kafka-clients
version 2.3.1
.
This client can communicate with older brokers (see the Kafka documentation), but certain features may not be available.
For example, with versions earlier than 0.11.x.x, native headers are not supported.
Also, 0.11.x.x does not support the autoAddPartitions
property.
Configuration Options
This section contains the configuration options used by the Apache Kafka binder.
For common configuration options and properties pertaining to the binder, see the binding properties in core documentation.
Kafka Binder Properties
- spring.cloud.stream.kafka.binder.brokers
-
A list of brokers to which the Kafka binder connects.
Default:
localhost
. - spring.cloud.stream.kafka.binder.defaultBrokerPort
-
brokers
allows hosts specified with or without port information (for example,host1,host2:port2
). This sets the default port when no port is configured in the broker list.Default:
9092
. - spring.cloud.stream.kafka.binder.configuration
-
Key/Value map of client properties (both producers and consumer) passed to all clients created by the binder. Due to the fact that these properties are used by both producers and consumers, usage should be restricted to common properties — for example, security settings. Unknown Kafka producer or consumer properties provided through this configuration are filtered out and not allowed to propagate. Properties here supersede any properties set in boot.
Default: Empty map.
- spring.cloud.stream.kafka.binder.consumerProperties
-
Key/Value map of arbitrary Kafka client consumer properties. In addition to support known Kafka consumer properties, unknown consumer properties are allowed here as well. Properties here supersede any properties set in boot and in the
configuration
property above.Default: Empty map.
- spring.cloud.stream.kafka.binder.headers
-
The list of custom headers that are transported by the binder. Only required when communicating with older applications (⇐ 1.3.x) with a
kafka-clients
version < 0.11.0.0. Newer versions support headers natively.Default: empty.
- spring.cloud.stream.kafka.binder.healthTimeout
-
The time to wait to get partition information, in seconds. Health reports as down if this timer expires.
Default: 10.
- spring.cloud.stream.kafka.binder.requiredAcks
-
The number of required acks on the broker. See the Kafka documentation for the producer
acks
property.Default:
1
. - spring.cloud.stream.kafka.binder.minPartitionCount
-
Effective only if
autoCreateTopics
orautoAddPartitions
is set. The global minimum number of partitions that the binder configures on topics on which it produces or consumes data. It can be superseded by thepartitionCount
setting of the producer or by the value ofinstanceCount * concurrency
settings of the producer (if either is larger).Default:
1
. - spring.cloud.stream.kafka.binder.producerProperties
-
Key/Value map of arbitrary Kafka client producer properties. In addition to support known Kafka producer properties, unknown producer properties are allowed here as well. Properties here supersede any properties set in boot and in the
configuration
property above.Default: Empty map.
- spring.cloud.stream.kafka.binder.replicationFactor
-
The replication factor of auto-created topics if
autoCreateTopics
is active. Can be overridden on each binding.If you are using Kafka broker versions prior to 2.4, then this value should be set to at least 1
. Starting with version 3.0.8, the binder uses-1
as the default value, which indicates that the broker 'default.replication.factor' property will be used to determine the number of replicas. Check with your Kafka broker admins to see if there is a policy in place that requires a minimum replication factor, if that’s the case then, typically, thedefault.replication.factor
will match that value and-1
should be used, unless you need a replication factor greater than the minimum.Default:
-1
. - spring.cloud.stream.kafka.binder.autoCreateTopics
-
If set to
true
, the binder creates new topics automatically. If set tofalse
, the binder relies on the topics being already configured. In the latter case, if the topics do not exist, the binder fails to start.This setting is independent of the auto.create.topics.enable
setting of the broker and does not influence it. If the server is set to auto-create topics, they may be created as part of the metadata retrieval request, with default broker settings.Default:
true
. - spring.cloud.stream.kafka.binder.autoAddPartitions
-
If set to
true
, the binder creates new partitions if required. If set tofalse
, the binder relies on the partition size of the topic being already configured. If the partition count of the target topic is smaller than the expected value, the binder fails to start.Default:
false
. - spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
-
Enables transactions in the binder. See
transaction.id
in the Kafka documentation and Transactions in thespring-kafka
documentation. When transactions are enabled, individualproducer
properties are ignored and all producers use thespring.cloud.stream.kafka.binder.transaction.producer.*
properties.Default
null
(no transactions) - spring.cloud.stream.kafka.binder.transaction.producer.*
-
Global producer properties for producers in a transactional binder. See
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
and Kafka Producer Properties and the general producer properties supported by all binders.Default: See individual producer properties.
- spring.cloud.stream.kafka.binder.headerMapperBeanName
-
The bean name of a
KafkaHeaderMapper
used for mappingspring-messaging
headers to and from Kafka headers. Use this, for example, if you wish to customize the trusted packages in aBinderHeaderMapper
bean that uses JSON deserialization for the headers. If this customBinderHeaderMapper
bean is not made available to the binder using this property, then the binder will look for a header mapper bean with the namekafkaBinderHeaderMapper
that is of typeBinderHeaderMapper
before falling back to a defaultBinderHeaderMapper
created by the binder.Default: none.
- spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader
-
Flag to set the binder health as
down
, when any partitions on the topic, regardless of the consumer that is receiving data from it, is found without a leader.Default:
false
. - spring.cloud.stream.kafka.binder.certificateStoreDirectory
-
When the truststore or keystore certificate location is given as a classpath URL (
classpath:…
), the binder copies the resource from the classpath location inside the JAR file to a location on the filesystem. The file will be moved to the location specified as the value for this property which must be an existing directory on the filesystem that is writable by the process running the application. If this value is not set and the certificate file is a classpath resource, then it will be moved to System’s temp directory as returned bySystem.getProperty("java.io.tmpdir")
. This is also true, if this value is present, but the directory cannot be found on the filesystem or is not writable.Default: none.
Kafka Consumer Properties
To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of spring.cloud.stream.kafka.default.consumer.<property>=<value> .
|
The following properties are available for Kafka consumers only and
must be prefixed with spring.cloud.stream.kafka.bindings.<channelName>.consumer.
.
- admin.configuration
-
Since version 2.1.1, this property is deprecated in favor of
topic.properties
, and support for it will be removed in a future version. - admin.replicas-assignment
-
Since version 2.1.1, this property is deprecated in favor of
topic.replicas-assignment
, and support for it will be removed in a future version. - admin.replication-factor
-
Since version 2.1.1, this property is deprecated in favor of
topic.replication-factor
, and support for it will be removed in a future version. - autoRebalanceEnabled
-
When
true
, topic partitions is automatically rebalanced between the members of a consumer group. Whenfalse
, each consumer is assigned a fixed set of partitions based onspring.cloud.stream.instanceCount
andspring.cloud.stream.instanceIndex
. This requires both thespring.cloud.stream.instanceCount
andspring.cloud.stream.instanceIndex
properties to be set appropriately on each launched instance. The value of thespring.cloud.stream.instanceCount
property must typically be greater than 1 in this case.Default:
true
. - ackEachRecord
-
When
autoCommitOffset
istrue
, this setting dictates whether to commit the offset after each record is processed. By default, offsets are committed after all records in the batch of records returned byconsumer.poll()
have been processed. The number of records returned by a poll can be controlled with themax.poll.records
Kafka property, which is set through the consumerconfiguration
property. Setting this totrue
may cause a degradation in performance, but doing so reduces the likelihood of redelivered records when a failure occurs. Also, see the binderrequiredAcks
property, which also affects the performance of committing offsets. This property is deprecated as of 3.1 in favor of usingackMode
. If theackMode
is not set and batch mode is not enabled,RECORD
ackMode will be used.Default:
false
. - autoCommitOffset
-
Starting with version 3.1, this property is deprecated. See
ackMode
for more details on alternatives. Whether to autocommit offsets when a message has been processed. If set tofalse
, a header with the keykafka_acknowledgment
of the typeorg.springframework.kafka.support.Acknowledgment
header is present in the inbound message. Applications may use this header for acknowledging messages. See the examples section for details. When this property is set tofalse
, Kafka binder sets the ack mode toorg.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL
and the application is responsible for acknowledging records. Also seeackEachRecord
.Default:
true
. - ackMode
-
Specify the container ack mode. This is based on the AckMode enumeration defined in Spring Kafka. If
ackEachRecord
property is set totrue
and consumer is not in batch mode, then this will use the ack mode ofRECORD
, otherwise, use the provided ack mode using this property. - autoCommitOnError
-
In pollable consumers, if set to
true
, it always auto commits on error. If not set (the default) or false, it will not auto commit in pollable consumers. Note that this property is only applicable for pollable consumers.Default: not set.
- resetOffsets
-
Whether to reset offsets on the consumer to the value provided by startOffset. Must be false if a
KafkaBindingRebalanceListener
is provided; see Using a KafkaBindingRebalanceListener. See Resetting Offsets for more information about this property.Default:
false
. - startOffset
-
The starting offset for new groups. Allowed values:
earliest
andlatest
. If the consumer group is set explicitly for the consumer 'binding' (throughspring.cloud.stream.bindings.<channelName>.group
), 'startOffset' is set toearliest
. Otherwise, it is set tolatest
for theanonymous
consumer group. See Resetting Offsets for more information about this property.Default: null (equivalent to
earliest
). - enableDlq
-
When set to true, it enables DLQ behavior for the consumer. By default, messages that result in errors are forwarded to a topic named
error.<destination>.<group>
. The DLQ topic name can be configurable by setting thedlqName
property or by defining a@Bean
of typeDlqDestinationResolver
. This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome. See Dead-Letter Topic Processing processing for more information. Starting with version 2.0, messages sent to the DLQ topic are enhanced with the following headers:x-original-topic
,x-exception-message
, andx-exception-stacktrace
asbyte[]
. By default, a failed record is sent to the same partition number in the DLQ topic as the original record. See Dead-Letter Topic Partition Selection for how to change that behavior. Not allowed whendestinationIsPattern
istrue
.Default:
false
. - dlqPartitions
-
When
enableDlq
is true, and this property is not set, a dead letter topic with the same number of partitions as the primary topic(s) is created. Usually, dead-letter records are sent to the same partition in the dead-letter topic as the original record. This behavior can be changed; see Dead-Letter Topic Partition Selection. If this property is set to1
and there is noDqlPartitionFunction
bean, all dead-letter records will be written to partition0
. If this property is greater than1
, you MUST provide aDlqPartitionFunction
bean. Note that the actual partition count is affected by the binder’sminPartitionCount
property.Default:
none
- configuration
-
Map with a key/value pair containing generic Kafka consumer properties. In addition to having Kafka consumer properties, other configuration properties can be passed here. For example some properties needed by the application such as
spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar
. Thebootstrap.servers
property cannot be set here; use multi-binder support if you need to connect to multiple clusters.Default: Empty map.
- dlqName
-
The name of the DLQ topic to receive the error messages.
Default: null (If not specified, messages that result in errors are forwarded to a topic named
error.<destination>.<group>
). - dlqProducerProperties
-
Using this, DLQ-specific producer properties can be set. All the properties available through kafka producer properties can be set through this property. When native decoding is enabled on the consumer (i.e., useNativeDecoding: true) , the application must provide corresponding key/value serializers for DLQ. This must be provided in the form of
dlqProducerProperties.configuration.key.serializer
anddlqProducerProperties.configuration.value.serializer
.Default: Default Kafka producer properties.
- standardHeaders
-
Indicates which standard headers are populated by the inbound channel adapter. Allowed values:
none
,id
,timestamp
, orboth
. Useful if using native deserialization and the first component to receive a message needs anid
(such as an aggregator that is configured to use a JDBC message store).Default:
none
- converterBeanName
-
The name of a bean that implements
RecordMessageConverter
. Used in the inbound channel adapter to replace the defaultMessagingMessageConverter
.Default:
null
- idleEventInterval
-
The interval, in milliseconds, between events indicating that no messages have recently been received. Use an
ApplicationListener<ListenerContainerIdleEvent>
to receive these events. See Example: Pausing and Resuming the Consumer for a usage example.Default:
30000
- destinationIsPattern
-
When true, the destination is treated as a regular expression
Pattern
used to match topic names by the broker. When true, topics are not provisioned, andenableDlq
is not allowed, because the binder does not know the topic names during the provisioning phase. Note, the time taken to detect new topics that match the pattern is controlled by the consumer propertymetadata.max.age.ms
, which (at the time of writing) defaults to 300,000ms (5 minutes). This can be configured using theconfiguration
property above.Default:
false
- topic.properties
-
A
Map
of Kafka topic properties used when provisioning new topics — for example,spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0
Default: none.
- topic.replicas-assignment
-
A Map<Integer, List<Integer>> of replica assignments, with the key being the partition and the value being the assignments. Used when provisioning new topics. See the
NewTopic
Javadocs in thekafka-clients
jar.Default: none.
- topic.replication-factor
-
The replication factor to use when provisioning topics. Overrides the binder-wide setting. Ignored if
replicas-assignments
is present.Default: none (the binder-wide default of -1 is used).
- pollTimeout
-
Timeout used for polling in pollable consumers.
Default: 5 seconds.
- transactionManager
-
Bean name of a
KafkaAwareTransactionManager
used to override the binder’s transaction manager for this binding. Usually needed if you want to synchronize another transaction with the Kafka transaction, using theChainedKafkaTransactionManaager
. To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.Default: none.
- txCommitRecovered
-
When using a transactional binder, the offset of a recovered record (e.g. when retries are exhausted and the record is sent to a dead letter topic) will be committed via a new transaction, by default. Setting this property to
false
suppresses committing the offset of recovered record.Default: true.
- commonErrorHandlerBeanName
-
CommonErrorHandler
bean name to use per consumer binding. When present, this user providedCommonErrorHandler
takes precedence over any other error handlers defined by the binder. This is a handy way to express error handlers, if the application does not want to use aListenerContainerCustomizer
and then check the destination/group combination to set an error handler.Default: none.
Resetting Offsets
When an application starts, the initial position in each assigned partition depends on two properties startOffset
and resetOffsets
.
If resetOffsets
is false
, normal Kafka consumer auto.offset.reset
semantics apply.
i.e. If there is no committed offset for a partition for the binding’s consumer group, the position is earliest
or latest
.
By default, bindings with an explicit group
use earliest
, and anonymous bindings (with no group
) use latest
.
These defaults can be overridden by setting the startOffset
binding property.
There will be no committed offset(s) the first time the binding is started with a particular group
.
The other condition where no committed offset exists is if the offset has been expired.
With modern brokers (since 2.1), and default broker properties, the offsets are expired 7 days after the last member leaves the group.
See the offsets.retention.minutes
broker property for more information.
When resetOffsets
is true
, the binder applies similar semantics to those that apply when there is no committed offset on the broker, as if this binding has never consumed from the topic; i.e. any current committed offset is ignored.
Following are two use cases when this might be used.
-
Consuming from a compacted topic containing key/value pairs. Set
resetOffsets
totrue
andstartOffset
toearliest
; the binding will perform aseekToBeginning
on all newly assigned partitions. -
Consuming from a topic containing events, where you are only interested in events that occur while this binding is running. Set
resetOffsets
totrue
andstartOffset
tolatest
; the binding will perform aseekToEnd
on all newly assigned partitions.
If a rebalance occurs after the initial assignment, the seeks will only be performed on any newly assigned partitions that were not assigned during the initial assignment. |
For more control over topic offsets, see Using a KafkaBindingRebalanceListener; when a listener is provided, resetOffsets
should not be set to true
, otherwise, that will cause an error.
Consuming Batches
Starting with version 3.0, when spring.cloud.stream.binding.<name>.consumer.batch-mode
is set to true
, all of the records received by polling the Kafka Consumer
will be presented as a List<?>
to the listener method.
Otherwise, the method will be called with one record at a time.
The size of the batch is controlled by Kafka consumer properties max.poll.records
, fetch.min.bytes
, fetch.max.wait.ms
; refer to the Kafka documentation for more information.
Bear in mind that batch mode is not supported with @StreamListener
- it only works with the newer functional programming model.
Retry within the binder is not supported when using batch mode, so maxAttempts will be overridden to 1.
You can configure a SeekToCurrentBatchErrorHandler (using a ListenerContainerCustomizer ) to achieve similar functionality to retry in the binder.
You can also use a manual AckMode and call Ackowledgment.nack(index, sleep) to commit the offsets for a partial batch and have the remaining records redelivered.
Refer to the Spring for Apache Kafka documentation for more information about these techniques.
|
Kafka Producer Properties
To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of spring.cloud.stream.kafka.default.producer.<property>=<value> .
|
The following properties are available for Kafka producers only and
must be prefixed with spring.cloud.stream.kafka.bindings.<channelName>.producer.
.
- admin.configuration
-
Since version 2.1.1, this property is deprecated in favor of
topic.properties
, and support for it will be removed in a future version. - admin.replicas-assignment
-
Since version 2.1.1, this property is deprecated in favor of
topic.replicas-assignment
, and support for it will be removed in a future version. - admin.replication-factor
-
Since version 2.1.1, this property is deprecated in favor of
topic.replication-factor
, and support for it will be removed in a future version. - bufferSize
-
Upper limit, in bytes, of how much data the Kafka producer attempts to batch before sending.
Default:
16384
. - sync
-
Whether the producer is synchronous.
Default:
false
. - sendTimeoutExpression
-
A SpEL expression evaluated against the outgoing message used to evaluate the time to wait for ack when synchronous publish is enabled — for example,
headers['mySendTimeout']
. The value of the timeout is in milliseconds. With versions before 3.0, the payload could not be used unless native encoding was being used because, by the time this expression was evaluated, the payload was already in the form of abyte[]
. Now, the expression is evaluated before the payload is converted.Default:
none
. - batchTimeout
-
How long the producer waits to allow more messages to accumulate in the same batch before sending the messages. (Normally, the producer does not wait at all and simply sends all the messages that accumulated while the previous send was in progress.) A non-zero value may increase throughput at the expense of latency.
Default:
0
. - messageKeyExpression
-
A SpEL expression evaluated against the outgoing message used to populate the key of the produced Kafka message — for example,
headers['myKey']
. With versions before 3.0, the payload could not be used unless native encoding was being used because, by the time this expression was evaluated, the payload was already in the form of abyte[]
. Now, the expression is evaluated before the payload is converted. In the case of a regular processor (Function<String, String>
orFunction<Message<?>, Message<?>
), if the produced key needs to be same as the incoming key from the topic, this property can be set as below.spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']
There is an important caveat to keep in mind for reactive functions. In that case, it is up to the application to manually copy the headers from the incoming messages to outbound messages. You can set the header, e.g.myKey
and useheaders['myKey']
as suggested above or, for convenience, simply set theKafkaHeaders.MESSAGE_KEY
header, and you do not need to set this property at all.Default:
none
. - headerPatterns
-
A comma-delimited list of simple patterns to match Spring messaging headers to be mapped to the Kafka
Headers
in theProducerRecord
. Patterns can begin or end with the wildcard character (asterisk). Patterns can be negated by prefixing with!
. Matching stops after the first match (positive or negative). For example!ask,as*
will passash
but notask
.id
andtimestamp
are never mapped.Default:
*
(all headers - except theid
andtimestamp
) - configuration
-
Map with a key/value pair containing generic Kafka producer properties. The
bootstrap.servers
property cannot be set here; use multi-binder support if you need to connect to multiple clusters.Default: Empty map.
- topic.properties
-
A
Map
of Kafka topic properties used when provisioning new topics — for example,spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0
- topic.replicas-assignment
-
A Map<Integer, List<Integer>> of replica assignments, with the key being the partition and the value being the assignments. Used when provisioning new topics. See the
NewTopic
Javadocs in thekafka-clients
jar.Default: none.
- topic.replication-factor
-
The replication factor to use when provisioning topics. Overrides the binder-wide setting. Ignored if
replicas-assignments
is present.Default: none (the binder-wide default of -1 is used).
- useTopicHeader
-
Set to
true
to override the default binding destination (topic name) with the value of theKafkaHeaders.TOPIC
message header in the outbound message. If the header is not present, the default binding destination is used.Default:
false
. - recordMetadataChannel
-
The bean name of a
MessageChannel
to which successful send results should be sent; the bean must exist in the application context. The message sent to the channel is the sent message (after conversion, if any) with an additional headerKafkaHeaders.RECORD_METADATA
. The header contains aRecordMetadata
object provided by the Kafka client; it includes the partition and offset where the record was written in the topic.ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)
Failed sends go the producer error channel (if configured); see Error Channels.
Default: null.
The Kafka binder uses the partitionCount setting of the producer as a hint to create a topic with the given partition count (in conjunction with the minPartitionCount , the maximum of the two being the value being used).
Exercise caution when configuring both minPartitionCount for a binder and partitionCount for an application, as the larger value is used.
If a topic already exists with a smaller partition count and autoAddPartitions is disabled (the default), the binder fails to start.
If a topic already exists with a smaller partition count and autoAddPartitions is enabled, new partitions are added.
If a topic already exists with a larger number of partitions than the maximum of (minPartitionCount or partitionCount ), the existing partition count is used.
|
- compression
-
Set the
compression.type
producer property. Supported values arenone
,gzip
,snappy
,lz4
andzstd
. If you override thekafka-clients
jar to 2.1.0 (or later), as discussed in the Spring for Apache Kafka documentation, and wish to usezstd
compression, usespring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd
.Default:
none
. - transactionManager
-
Bean name of a
KafkaAwareTransactionManager
used to override the binder’s transaction manager for this binding. Usually needed if you want to synchronize another transaction with the Kafka transaction, using theChainedKafkaTransactionManaager
. To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.Default: none.
- closeTimeout
-
Timeout in number of seconds to wait for when closing the producer.
Default:
30
- allowNonTransactional
-
Normally, all output bindings associated with a transactional binder will publish in a new transaction, if one is not already in process. This property allows you to override that behavior. If set to true, records published to this output binding will not be run in a transaction, unless one is already in process.
Default:
false
Usage examples
In this section, we show the use of the preceding properties for specific scenarios.
Example: Setting ackMode
to MANUAL
and Relying on Manual Acknowledgement
This example illustrates how one may manually acknowledge offsets in a consumer application.
This example requires that spring.cloud.stream.kafka.bindings.input.consumer.ackMode
be set to MANUAL
.
Use the corresponding input channel name for your example.
@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {
public static void main(String[] args) {
SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
}
@StreamListener(Sink.INPUT)
public void process(Message<?> message) {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
}
Example: Security Configuration
Apache Kafka 0.9 supports secure connections between client and brokers.
To take advantage of this feature, follow the guidelines in the Apache Kafka Documentation as well as the Kafka 0.9 security guidelines from the Confluent documentation.
Use the spring.cloud.stream.kafka.binder.configuration
option to set security properties for all clients created by the binder.
For example, to set security.protocol
to SASL_SSL
, set the following property:
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
All the other security properties can be set in a similar manner.
When using Kerberos, follow the instructions in the reference documentation for creating and referencing the JAAS configuration.
Spring Cloud Stream supports passing JAAS configuration information to the application by using a JAAS configuration file and using Spring Boot properties.
The JAAS and (optionally) krb5 file locations can be set for Spring Cloud Stream applications by using system properties. The following example shows how to launch a Spring Cloud Stream application with SASL and Kerberos by using a JAAS configuration file:
java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
--spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
As an alternative to having a JAAS configuration file, Spring Cloud Stream provides a mechanism for setting up the JAAS configuration for Spring Cloud Stream applications by using Spring Boot properties.
The following properties can be used to configure the login context of the Kafka client:
- spring.cloud.stream.kafka.binder.jaas.loginModule
-
The login module name. Not necessary to be set in normal cases.
Default:
com.sun.security.auth.module.Krb5LoginModule
. - spring.cloud.stream.kafka.binder.jaas.controlFlag
-
The control flag of the login module.
Default:
required
. - spring.cloud.stream.kafka.binder.jaas.options
-
Map with a key/value pair containing the login module options.
Default: Empty map.
The following example shows how to launch a Spring Cloud Stream application with SASL and Kerberos by using Spring Boot configuration properties:
java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.autoCreateTopics=false \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM
The preceding example represents the equivalent of the following JAAS file:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_client.keytab"
principal="kafka-client-1@EXAMPLE.COM";
};
If the topics required already exist on the broker or will be created by an administrator, autocreation can be turned off and only client JAAS properties need to be sent.
Do not mix JAAS configuration files and Spring Boot properties in the same application.
If the -Djava.security.auth.login.config system property is already present, Spring Cloud Stream ignores the Spring Boot properties.
|
Be careful when using the autoCreateTopics and autoAddPartitions with Kerberos.
Usually, applications may use principals that do not have administrative rights in Kafka and Zookeeper.
Consequently, relying on Spring Cloud Stream to create/modify topics may fail.
In secure environments, we strongly recommend creating topics and managing ACLs administratively by using Kafka tooling.
|
When connecting to multiple clusters in which each one requires separate JAAS configuration, then set the JAAS configuration using the property sasl.jaas.config
.
When this property is present in the applicaiton, it takes precedence over the other strategies mentioned above.
See this KIP-85 for more details.
For example, if you have two clusters in your application with separate JAAS configuration, then the following is a template that you can use:
spring.cloud.stream:
binders:
kafka1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"
kafka2:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9093
configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user1\" password=\"user1-secret\";"
kafka.binder:
configuration:
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
Note that both the Kafka clusters, and the sasl.jaas.config
values for each of them are different in the above configuration.
See this sample application for more details on how to setup and run such an application.
Example: Pausing and Resuming the Consumer
If you wish to suspend consumption but not cause a partition rebalance, you can pause and resume the consumer.
This is facilitated by adding the Consumer
as a parameter to your @StreamListener
.
To resume, you need an ApplicationListener
for ListenerContainerIdleEvent
instances.
The frequency at which events are published is controlled by the idleEventInterval
property.
Since the consumer is not thread-safe, you must call these methods on the calling thread.
The following simple application shows how to pause and resume:
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
System.out.println(in);
consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
}
@Bean
public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
return event -> {
System.out.println(event);
if (event.getConsumer().paused().size() > 0) {
event.getConsumer().resume(event.getConsumer().paused());
}
};
}
}
Transactional Binder
Enable transactions by setting spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
to a non-empty value, e.g. tx-
.
When used in a processor application, the consumer starts the transaction; any records sent on the consumer thread participate in the same transaction.
When the listener exits normally, the listener container will send the offset to the transaction and commit it.
A common producer factory is used for all producer bindings configured using spring.cloud.stream.kafka.binder.transaction.producer.*
properties; individual binding Kafka producer properties are ignored.
Normal binder retries (and dead lettering) are not supported with transactions because the retries will run in the original transaction, which may be rolled back and any published records will be rolled back too.
When retries are enabled (the common property maxAttempts is greater than zero) the retry properties are used to configure a DefaultAfterRollbackProcessor to enable retries at the container level.
Similarly, instead of publishing dead-letter records within the transaction, this functionality is moved to the listener container, again via the DefaultAfterRollbackProcessor which runs after the main transaction has rolled back.
|
If you wish to use transactions in a source application, or from some arbitrary thread for producer-only transaction (e.g. @Scheduled
method), you must get a reference to the transactional producer factory and define a KafkaTransactionManager
bean using it.
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
@Value("${unique.tx.id.per.instance}") String txId) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
tm.setTransactionId(txId)
return tm;
}
Notice that we get a reference to the binder using the BinderFactory
; use null
in the first argument when there is only one binder configured.
If more than one binder is configured, use the binder name to get the reference.
Once we have a reference to the binder, we can obtain a reference to the ProducerFactory
and create a transaction manager.
Then you would use normal Spring transaction support, e.g. TransactionTemplate
or @Transactional
, for example:
public static class Sender {
@Transactional
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
}
}
If you wish to synchronize producer-only transactions with those from some other transaction manager, use a ChainedTransactionManager
.
If you deploy multiple instances of your application, each instance needs a unique transactionIdPrefix .
|
Error Channels
Starting with version 1.3, the binder unconditionally sends exceptions to an error channel for each consumer destination and can also be configured to send async producer send failures to an error channel. See this section on error handling for more information.
The payload of the ErrorMessage
for a send failure is a KafkaSendFailureException
with properties:
-
failedMessage
: The Spring MessagingMessage<?>
that failed to be sent. -
record
: The rawProducerRecord
that was created from thefailedMessage
There is no automatic handling of producer exceptions (such as sending to a Dead-Letter queue). You can consume these exceptions with your own Spring Integration flow.
Kafka Metrics
Kafka binder module exposes the following metrics:
spring.cloud.stream.binder.kafka.offset
: This metric indicates how many messages have not been yet consumed from a given binder’s topic by a given consumer group.
The metrics provided are based on the Micrometer library.
The binder creates the KafkaBinderMetrics
bean if Micrometer is on the classpath and no other such beans provided by the application.
The metric contains the consumer group information, topic and the actual lag in committed offset from the latest offset on the topic.
This metric is particularly useful for providing auto-scaling feedback to a PaaS platform.
You can exclude KafkaBinderMetrics
from creating the necessary infrastructure like consumers and then reporting the metrics by providing the following component in the application.
@Component
class NoOpBindingMeters {
NoOpBindingMeters(MeterRegistry registry) {
registry.config().meterFilter(
MeterFilter.denyNameStartsWith(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME));
}
}
More details on how to suppress meters selectively can be found here.
Tombstone Records (null record values)
When using compacted topics, a record with a null
value (also called a tombstone record) represents the deletion of a key.
To receive such messages in a @StreamListener
method, the parameter must be marked as not required to receive a null
value argument.
@StreamListener(Sink.INPUT)
public void in(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key,
@Payload(required = false) Customer customer) {
// customer is null if a tombstone record
...
}
Using a KafkaBindingRebalanceListener
Applications may wish to seek topics/partitions to arbitrary offsets when the partitions are initially assigned, or perform other operations on the consumer.
Starting with version 2.1, if you provide a single KafkaBindingRebalanceListener
bean in the application context, it will be wired into all Kafka consumer bindings.
public interface KafkaBindingRebalanceListener {
/**
* Invoked by the container before any pending offsets are committed.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
*/
default void onPartitionsRevokedBeforeCommit(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
}
/**
* Invoked by the container after any pending offsets are committed.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
*/
default void onPartitionsRevokedAfterCommit(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
}
/**
* Invoked when partitions are initially assigned or after a rebalance.
* Applications might only want to perform seek operations on an initial assignment.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
boolean initial) {
}
}
You cannot set the resetOffsets
consumer property to true
when you provide a rebalance listener.
Customizing Consumer and Producer configuration
If you want advanced customization of consumer and producer configuration that is used for creating ConsumerFactory
and ProducerFactory
in Kafka,
you can implement the following customizers.
-
ConsumerConfigCustomizer
-
ProducerConfigCustomizer
Both of these interfaces provide a way to configure the config map used for consumer and producer properties.
For example, if you want to gain access to a bean that is defined at the application level, you can inject that in the implementation of the configure
method.
When the binder discovers that these customizers are available as beans, it will invoke the configure
method right before creating the consumer and producer factories.
Both of these interfaces also provide access to both the binding and destination names so that they can be accessed while customizing producer and consumer properties.
Customizing AdminClient Configuration
As with consumer and producer config customization above, applications can also customize the configuration for admin clients by providing an AdminClientConfigCustomizer
.
AdminClientConfigCustomizer’s configure method provides access to the admin client properties, using which you can define further customization.
Binder’s Kafka topic provisioner gives the highest precedence for the properties given through this customizer.
Here is an example of providing this customizer bean.
@Bean
public AdminClientConfigCustomizer adminClientConfigCustomizer() {
return props -> {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
};
}
Dead-Letter Topic Processing
Dead-Letter Topic Partition Selection
By default, records are published to the Dead-Letter topic using the same partition as the original record. This means the Dead-Letter topic must have at least as many partitions as the original record.
To change this behavior, add a DlqPartitionFunction
implementation as a @Bean
to the application context.
Only one such bean can be present.
The function is provided with the consumer group, the failed ConsumerRecord
and the exception.
For example, if you always want to route to partition 0, you might use:
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
If you set a consumer binding’s dlqPartitions property to 1 (and the binder’s minPartitionCount is equal to 1 ), there is no need to supply a DlqPartitionFunction ; the framework will always use partition 0.
If you set a consumer binding’s dlqPartitions property to a value greater than 1 (or the binder’s minPartitionCount is greater than 1 ), you must provide a DlqPartitionFunction bean, even if the partition count is the same as the original topic’s.
|
It is also possible to define a custom name for the DLQ topic.
In order to do so, create an implementation of DlqDestinationResolver
as a @Bean
to the application context.
When the binder detects such a bean, that takes precedence, otherwise it will use the dlqName
property.
If neither of these are found, it will default to error.<destination>.<group>
.
Here is an example of DlqDestinationResolver
as a @Bean
.
@Bean
public DlqDestinationResolver dlqDestinationResolver() {
return (rec, ex) -> {
if (rec.topic().equals("word1")) {
return "topic1-dlq";
}
else {
return "topic2-dlq";
}
};
}
One important thing to keep in mind when providing an implementation for DlqDestinationResolver
is that the provisioner in the binder will not auto create topics for the application.
This is because there is no way for the binder to infer the names of all the DLQ topics the implementation might send to.
Therefore, if you provide DLQ names using this strategy, it is the application’s responsibility to ensure that those topics are created beforehand.
Handling Records in a Dead-Letter Topic
Because the framework cannot anticipate how users would want to dispose of dead-lettered messages, it does not provide any standard mechanism to handle them. If the reason for the dead-lettering is transient, you may wish to route the messages back to the original topic. However, if the problem is a permanent issue, that could cause an infinite loop. The sample Spring Boot application within this topic is an example of how to route those messages back to the original topic, but it moves them to a “parking lot” topic after three attempts. The application is another spring-cloud-stream application that reads from the dead-letter topic. It exits when no messages are received for 5 seconds.
The examples assume the original destination is so8400out
and the consumer group is so8400
.
There are a couple of strategies to consider:
-
Consider running the rerouting only when the main application is not running. Otherwise, the retries for transient errors are used up very quickly.
-
Alternatively, use a two-stage approach: Use this application to route to a third topic and another to route from there back to the main topic.
The following code listings show the sample application:
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.headers=x-retries
@SpringBootApplication
@EnableBinding(TwoOutputProcessor.class)
public class ReRouteDlqKApplication implements CommandLineRunner {
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) {
SpringApplication.run(ReRouteDlqKApplication.class, args).close();
}
private final AtomicInteger processed = new AtomicInteger();
@Autowired
private MessageChannel parkingLot;
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Message<?> reRoute(Message<?> failed) {
processed.incrementAndGet();
Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
if (retries == null) {
System.out.println("First retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, new Integer(1))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else if (retries.intValue() < 3) {
System.out.println("Another retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else {
System.out.println("Retries exhausted for " + failed);
parkingLot.send(MessageBuilder.fromMessage(failed)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build());
}
return null;
}
@Override
public void run(String... args) throws Exception {
while (true) {
int count = this.processed.get();
Thread.sleep(5000);
if (count == this.processed.get()) {
System.out.println("Idle, exiting");
return;
}
}
}
public interface TwoOutputProcessor extends Processor {
@Output("parkingLot")
MessageChannel parkingLot();
}
}
Partitioning with the Kafka Binder
Apache Kafka supports topic partitioning natively.
Sometimes it is advantageous to send data to specific partitions — for example, when you want to strictly order message processing (all messages for a particular customer should go to the same partition).
The following example shows how to configure the producer and consumer side:
@SpringBootApplication
@EnableBinding(Source.class)
public class KafkaPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"foo1", "bar1", "qux1",
"foo2", "bar2", "qux2",
"foo3", "bar3", "qux3",
"foo4", "bar4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
.web(false)
.run(args);
}
@InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
public Message<?> generate() {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
}
}
spring:
cloud:
stream:
bindings:
output:
destination: partitioned.topic
producer:
partition-key-expression: headers['partitionKey']
partition-count: 12
The topic must be provisioned to have enough partitions to achieve the desired concurrency for all consumer groups.
The above configuration supports up to 12 consumer instances (6 if their concurrency is 2, 4 if their concurrency is 3, and so on).
It is generally best to “over-provision” the partitions to allow for future increases in consumers or concurrency.
|
The preceding configuration uses the default partitioning (key.hashCode() % partitionCount ).
This may or may not provide a suitably balanced algorithm, depending on the key values.
You can override this default by using the partitionSelectorExpression or partitionSelectorClass properties.
|
Since partitions are natively handled by Kafka, no special configuration is needed on the consumer side. Kafka allocates partitions across the instances.
The following Spring Boot application listens to a Kafka stream and prints (to the console) the partition ID to which each message goes:
@SpringBootApplication
@EnableBinding(Sink.class)
public class KafkaPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
.web(false)
.run(args);
}
@StreamListener(Sink.INPUT)
public void listen(@Payload String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(in + " received from partition " + partition);
}
}
spring:
cloud:
stream:
bindings:
input:
destination: partitioned.topic
group: myGroup
You can add instances as needed.
Kafka rebalances the partition allocations.
If the instance count (or instance count * concurrency
) exceeds the number of partitions, some consumers are idle.