1.1.0.M1
Copyright © 2013-2016 Pivotal Software, Inc.
Table of Contents
This guide describes the Apache Kafka implementation of the Spring Cloud Stream Binder. It contains information about its design, usage and configuration options, as well as information on how the Stream Cloud Stream concepts map into Apache Kafka specific constructs.
For using the Apache Kafka binder, you just need to add it to your Spring Cloud Stream application, using the following Maven coordinates:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
Alternatively, you can also use the Spring Cloud Stream Kafka Starter.
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency>
A simplified diagram of how the Apache Kafka binder operates can be seen below.
The Apache Kafka Binder implementation maps each destination to an Apache Kafka topic. The consumer group maps directly to the same Apache Kafka concept. Partitioning also maps directly to Apache Kafka partitions as well.
This section contains the configuration options used by the Apache Kafka binder.
For common configuration options and properties pertaining to binder, refer to the core docs.
A list of brokers to which the Kafka binder will connect.
Default: localhost
.
brokers
allows hosts specified with or without port information (e.g., host1,host2:port2
).
This sets the default port when no port is configured in the broker list.
Default: 9092
.
A list of ZooKeeper nodes to which the Kafka binder can connect.
Default: localhost
.
zkNodes
allows hosts specified with or without port information (e.g., host1,host2:port2
).
This sets the default port when no port is configured in the node list.
Default: 2181
.
Key/Value map of client properties (both producers and consumer) passed to all clients created by the binder. Due to the fact that these properties will be used by both producers and consumers, usage should be restricted to common properties, especially security settings.
Default: Empty map.
The list of custom headers that will be transported by the binder.
Default: empty.
The frequency, in milliseconds, with which offsets are saved.
Ignored if 0
.
Default: 10000
.
The frequency, in number of updates, which which consumed offsets are persisted.
Ignored if 0
.
Mutually exclusive with offsetUpdateTimeWindow
.
Default: 0
.
The number of required acks on the broker.
Default: 1
.
Effective only if autoCreateTopics
or autoAddPartitions
is set.
The global minimum number of partitions that the binder will configure on topics on which it produces/consumes data.
It can be superseded by the partitionCount
setting of the producer or by the value of instanceCount
* concurrency
settings of the producer (if either is larger).
Default: 1
.
The replication factor of auto-created topics if autoCreateTopics
is active.
Default: 1
.
If set to true
, the binder will create new topics automatically.
If set to false
, the binder will rely on the topics being already configured.
In the latter case, if the topics do not exist, the binder will fail to start.
Of note, this setting is independent of the auto.topic.create.enable
setting of the broker and it does not influence it: if the server is set to auto-create topics, they may be created as part of the metadata retrieval request, with default broker settings.
Default: true
.
If set to true
, the binder will create add new partitions if required.
If set to false
, the binder will rely on the partition size of the topic being already configured.
If the partition count of the target topic is smaller than the expected value, the binder will fail to start.
Default: false
.
Size (in bytes) of the socket buffer to be used by the Kafka consumers.
Default: 2097152
.
The following properties are available for Kafka consumers only and
must be prefixed with spring.cloud.stream.kafka.bindings.<channelName>.consumer.
.
Whether to autocommit offsets when a message has been processed.
If set to false
, an Acknowledgment
header will be available in the message headers for late acknowledgment.
Default: true
.
Effective only if autoCommitOffset
is set to true
.
If set to false
it suppresses auto-commits for messages that result in errors, and will commit only for successful messages, allows a stream to automatically replay from the last successfully processed message, in case of persistent failures.
If set to true
, it will always auto-commit (if auto-commit is enabled).
If not set (default), it effectively has the same value as enableDlq
, auto-committing erroneous messages if they are sent to a DLQ, and not committing them otherwise.
Default: not set.
The interval between connection recovery attempts, in milliseconds.
Default: 5000
.
Whether to reset offsets on the consumer to the value provided by startOffset
.
Default: false
.
The starting offset for new groups, or when resetOffsets
is true
.
Allowed values: earliest
, latest
.
Default: null (equivalent to earliest
).
When set to true, it will send enable DLQ behavior for the consumer.
Messages that result in errors will be forwarded to a topic named error.<destination>.<group>
.
This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome.
Default: false
.
Map with a key/value pair containing generic Kafka consumer properties.
Default: Empty map.
The following properties are available for Kafka producers only and
must be prefixed with spring.cloud.stream.kafka.bindings.<channelName>.producer.
.
Upper limit, in bytes, of how much data the Kafka producer will attempt to batch before sending.
Default: 16384
.
Whether the producer is synchronous.
Default: false
.
How long the producer will wait before sending in order to allow more messages to accumulate in the same batch. (Normally the producer does not wait at all, and simply sends all the messages that accumulated while the previous send was in progress.) A non-zero value may increase throughput at the expense of latency.
Default: 0
.
Map with a key/value pair containing generic Kafka producer properties.
Default: Empty map.
![]() | Note |
---|---|
The Kafka binder will use the |
In this section, we illustrate the use of the above properties for specific scenarios.
Apache Kafka 0.9 supports secure connections between client and brokers.
To take advantage of this feature, follow the guidelines in the Apache Kafka Documentation as well as the Kafka 0.9 security guidelines from the Confluent documentation.
Use the spring.cloud.stream.kafka.binder.configuration
option to set security properties for all clients created by the binder.
For example, for setting security.protocol
to SASL_SSL
, set:
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
All the other security properties can be set in a similar manner.
When using Kerberos, follow the instructions in the reference documentation for creating and referencing the JAAS configuration. At the time of this release, the JAAS, and (optionally) krb5 file locations must be set for Spring Cloud Stream applications by using system properties. Here is an example of launching a Spring Cloud Stream application with SASL and Kerberos.
java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \\ --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \\ --spring.cloud.stream.kafka.binder.zkNodes=secure.zookeeper:2181 \\ --spring.cloud.stream.bindings.input.destination=stream.ticktock \\ --spring.cloud.stream.kafka.binder.clientConfiguration.security.protocol=SASL_PLAINTEXT
![]() | Note |
---|---|
Exercise caution when using the |
To build the source you will need to install JDK 1.7.
The build uses the Maven wrapper so you don’t have to install a specific version of Maven. To enable the tests, you should have Kafka server 0.9 or above running before building. See below for more information on running the servers.
The main build command is
$ ./mvnw clean install
You can also add '-DskipTests' if you like, to avoid running the tests.
![]() | Note |
---|---|
You can also install Maven (>=3.3.3) yourself and run the |
![]() | Note |
---|---|
Be aware that you might need to increase the amount of memory
available to Maven by setting a |
The projects that require middleware generally include a
docker-compose.yml
, so consider using
Docker Compose to run the middeware servers
in Docker containers.
If you don’t have an IDE preference we would recommend that you use Spring Tools Suite or Eclipse when working with the code. We use the m2eclipe eclipse plugin for maven support. Other IDEs and tools should also work without issue.
We recommend the m2eclipe eclipse plugin when working with eclipse. If you don’t already have m2eclipse installed it is available from the "eclipse marketplace".
Unfortunately m2e does not yet support Maven 3.3, so once the projects
are imported into Eclipse you will also need to tell m2eclipse to use
the .settings.xml
file for the projects. If you do not do this you
may see many different errors related to the POMs in the
projects. Open your Eclipse preferences, expand the Maven
preferences, and select User Settings. In the User Settings field
click Browse and navigate to the Spring Cloud project you imported
selecting the .settings.xml
file in that project. Click Apply and
then OK to save the preference changes.
![]() | Note |
---|---|
Alternatively you can copy the repository settings from |
If you prefer not to use m2eclipse you can generate eclipse project metadata using the following command:
$ ./mvnw eclipse:eclipse
The generated eclipse projects can be imported by selecting import existing projects
from the file
menu.
[[contributing]
== Contributing
Spring Cloud is released under the non-restrictive Apache 2.0 license, and follows a very standard Github development process, using Github tracker for issues and merging pull requests into master. If you want to contribute even something trivial please do not hesitate, but follow the guidelines below.
Before we accept a non-trivial patch or pull request we will need you to sign the contributor’s agreement. Signing the contributor’s agreement does not grant anyone commit rights to the main repository, but it does mean that we can accept your contributions, and you will get an author credit if we do. Active contributors might be asked to join the core team, and given the ability to merge pull requests.
None of these is essential for a pull request, but they will all help. They can also be added after the original pull request but before a merge.
eclipse-code-formatter.xml
file from the
Spring
Cloud Build project. If using IntelliJ, you can use the
Eclipse Code Formatter
Plugin to import the same file..java
files to have a simple Javadoc class comment with at least an
@author
tag identifying you, and preferably at least a paragraph on what the class is
for..java
files (copy from existing files
in the project)@author
to the .java files that you modify substantially (more
than cosmetic changes).Fixes gh-XXXX
at the end of the commit
message (where XXXX is the issue number).