Quick Tour
Prerequisites: You must install and run Apache Kafka. Then you must put the spring-kafka JAR and all of its dependencies on your class path. The easiest way to do that is to declare a dependency in your build tool.
If you are not using Spring Boot, declare the spring-kafka jar as a dependency in your project.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.9</version>
</dependency>
compile 'org.springframework.kafka:spring-kafka:2.6.9'
| When using Spring Boot, (and you haven’t used start.spring.io to create your project), omit the version and Boot will automatically bring in the correct version that is compatible with your Boot version: |
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
compile 'org.springframework.kafka:spring-kafka'
However, the quickest way to get started is to use start.spring.io (or the wizards in Spring Tool Suits and Intellij IDEA) and create a project, selecting 'Spring for Apache Kafka' as a dependency.
Compatibility
This quick tour works with the following versions:
-
Apache Kafka Clients 2.6.1
-
Spring Framework 5.3.x
-
Minimum Java version: 8
Getting Started
The simplest way to get started is to use start.spring.io (or the wizards in Spring Tool Suits and Intellij IDEA) and create a project, selecting 'Spring for Apache Kafka' as a dependency. Refer to the Spring Boot documentation for more information about its opinionated auto configuration of the infrastructure beans.
Here is a minimal consumer application.
Spring Boot Consumer App
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}
@KafkaListener(id = "myId", topics = "topic1")
public void listen(String in) {
System.out.println(in);
}
}
spring.kafka.consumer.auto-offset-reset=earliest
The NewTopic bean causes the topic to be created on the broker; it is not needed if the topic already exists.
Spring Boot Producer App
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("topic1", "test");
};
}
}
With Java Configuration (No Spring Boot)
Spring for Apache Kafka is designed to be used in a Spring Application Context.
For example, if you create the listener container yourself outside of a Spring context, not all functions will work unless you satisfy all of the …Aware interfaces that the container implements.
|
Here is an example of an application that does not use Spring Boot.
@Autowired
private Listener listener;
@Autowired
private KafkaTemplate<Integer, String> template;
@Test
public void testSimple() throws Exception {
template.send("annotated1", 0, "foo");
template.flush();
assertTrue(this.listener.latch1.await(10, TimeUnit.SECONDS));
}
@Configuration
@EnableKafka
public class Config {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProps());
}
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
return props;
}
@Bean
public Listener listener() {
return new Listener();
}
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(senderProps());
}
private Map<String, Object> senderProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
...
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
}
public class Listener {
private final CountDownLatch latch1 = new CountDownLatch(1);
@KafkaListener(id = "foo", topics = "annotated1")
public void listen1(String foo) {
this.latch1.countDown();
}
}
As you can see, you have to define several infrastructure beans when not using Spring Boot.