Reactive Kafka Binder

Kafka binder in Spring Cloud Stream provides a dedicated reactive binder based on the Reactor Kafka project. This reactive Kafka binder enables full end-to-end reactive capabilities such as backpressure, reactive streams etc. in applications based on Apache Kafka. When your Spring Cloud Stream Kafka application is written using reactive types (Flux, Mono etc.), it is recommended to use this reactive Kafka binder instead of the regular message channel based Kafka binder.

Maven Coordinates

Following are the maven coordinates for the reactive Kafka binder.

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka-reactive</artifactId>
</dependency>

Basic Example using the Reactive Kafka Binder

In this section, we show some basic code snippets for writing a reactive Kafka application using the reactive binder and details around them.

@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
    return s -> s.map(String::toUpperCase);
}

You can use the above upppercase function with both message channel based Kafka binder (spring-cloud-stream-binder-kafka) as well as the reactive Kafka binder (spring-cloud-stream-binder-kafka-reactive), the topic of discussion in this section. When using this function with the regular Kafka binder, although you are using reactive types in the application (i.e., in the uppercase function), you only get the reactive streams within the execution of your function. Outside the function’s execution context, there is no reactive benefits since the underlying binder is not based on the reactive stack. Therefore, although this might look like it is bringing a full end-to-end reactive stack, this application is only partially reactive.

Now assume that you are using the proper reactive binder for Kafka - spring-cloud-stream-binder-kafka-reactive with the above function’s application. This binder implementation will give the full reactive benefits all the way from consumption on the top end to publishing at the bottom end of the chain. This is because the underlying binder is built on top of Reactor Kafka's core API’s. On the consumer side, it makes use of the KafkaReceiver which is a reactive implementation of a Kafka consumer. Similarly, on the producer side, it uses KafkaSender API which is the reactive implementation of a Kafka producer. Since the foundations of the reactive Kafka binder is built upon a proper reactive Kafka API, applications get the full benefits of using reactive technologies. Things like automatic backpressure among other reactive capabilities are built-in for the application when using this reactive Kafka binder.

Consuming Records in the Raw Format

In the above upppercase function, we are consuming the record as Flux<String> and then produce it as Flux<String>. There might be occasions in which you need to receive the record in the original received format - the ReceiverRecord. Here is such a function.

@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
    return s -> s.map(rec -> new String(rec.value()).toLowerCase());
}

In this function, note that, we are consuming the record as Flux<ReceiverRecord<byte[], byte[]>> and then producing it as Flux<String>. ReceiverRecord is the basic received record which is a specialized Kafka ConsumerRecord in Reactor Kafka. When using the reactive Kafka binder, the above function will give you access to the ReceiverRecord type for each incoming record. However, in this case, you need to provide a custom implementation for a RecordMessageConverter. By default, the reactive Kafka binder uses a MessagingMessageConverter that converts the payload and headers from the ConsumerRecord. Therefore, by the time your handler method receives it, the payload is already extracted from the received record and passed onto the method as in the case of the first function we looked above. By providing a custom RecordMessageConverter implementation in the application, you can override the default behavior. For example, if you want to consume the record as raw Flux<ReceiverRecord<byte[], byte[]>>, then you can provide the following bean definition in the application.

@Bean
RecordMessageConverter fullRawReceivedRecord() {
    return new RecordMessageConverter() {

        private final RecordMessageConverter converter = new MessagingMessageConverter();

        @Override
        public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
                Consumer<?, ?> consumer, Type payloadType) {
            return MessageBuilder.withPayload(record).build();
        }

        @Override
        public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
            return this.converter.fromMessage(message, defaultTopic);
        }

    };
}

Then, you need to instruct the framework to use this converter for the required binding. Here is an example based on our lowercase function.

spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"

lowercase-in-0 is the input binding name for our lowercase function. For the outbound (lowecase-out-0), we still use the regular MessagingMessageConverter.

In the toMessage implementation above, we receive the raw ConsumerRecord (ReceiverRecord since we are in a reactive binder context) and then wrap it inside a Message. Then that message payload which is the ReceiverRecord is provided to the user method.

Concurrency

When using reactive functions with the reactive Kafka binder, if you set concurrency on the consumer binding, then the binder creates as many dedicated KafkaReceiver objects as provided by the concurrency value. In other words, this creates multiple reactive streams with separate Flux implementations. This could be useful when you are consuming records from a partitioned topic.

For example, assume that the incoming topic has at least three partitions. Then you can set the following property.

spring.cloud.stream.bindings.lowercase-in-0.consumer.concurrency=3

That will create three dedicated KafkaReceiver objects that generate three separate Flux implementations and then stream them to the handler method.