Using the RabbitMQ Stream Plugin
Version 2.4 introduces initial support for the RabbitMQ Stream Plugin Java Client for the RabbitMQ Stream Plugin.
-
RabbitStreamTemplate -
StreamListenerContainer
Sending Messages
The RabbitStreamTemplate provides a subset of the RabbitTemplate (AMQP) functionality.
public interface RabbitStreamOperations extends AutoCloseable {
ListenableFuture<Boolean> send(Message message);
ListenableFuture<Boolean> convertAndSend(Object message);
ListenableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);
ListenableFuture<Boolean> send(com.rabbitmq.stream.Message message);
MessageBuilder messageBuilder();
MessageConverter messageConverter();
StreamMessageConverter streamMessageConverter();
@Override
void close() throws AmqpException;
}
The RabbitStreamTemplate implementation has the following constructor and properties:
public RabbitStreamTemplate(Environment environment, String streamName) {
}
public void setMessageConverter(MessageConverter messageConverter) {
}
public void setStreamConverter(StreamMessageConverter streamConverter) {
}
public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}
The MessageConverter is used in the convertAndSend methods to convert the object to a Spring AMQP Message.
The StreamMessageConverter is used to convert from a Spring AMQP Message to a native stream Message.
You can also send native stream Message s directly; with the messageBuilder() method provding access to the Producer 's message builder.
The ProducerCustomizer provides a mechanism to customize the producer before it is built.
Refer to the Java Client Documentation about customizing the Environment and Producer.
Receiving Messages
Asynchronous message reception is provided by the StreamListenerContainer (and the StreamRabbitListenerContainerFactory when using @RabbitListener).
The listener container requires an Environment as well as a single stream name.
You can either receive Spring AMQP Message s using the classic MessageListener, or you can receive native stream Message s using a new interface:
public interface StreamMessageListener extends MessageListener {
void onStreamMessage(Message message, Context context);
}
See [containerAttributes] for information about supported properties.
Similar the template, the container has a ConsumerCustomizer property.
Refer to the Java Client Documentation about customizing the Environment and Consumer.
When using @RabbitListener, configure a StreamRabbitListenerContainerFactory; at this time, most @RabbitListener properties (concurrency, etc) are ignored. Only id, queues, autoStartup and containerFactory are supported.
In addition, queues can only contain one stream name.
Examples
@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
template.setProducerCustomizer((name, builder) -> builder.name("test"));
return template;
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
return new StreamRabbitListenerContainerFactory(env);
}
@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
...
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
factory.setNativeListener(true);
factory.setConsumerCustomizer((id, builder) -> {
builder.name("myConsumer")
.offset(OffsetSpecification.first())
.manualTrackingStrategy();
});
return factory;
}
@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
...
context.storeOffset();
}
Version 2.4.5 added the adviceChain property to the StreamListenerContainer (and its factory).
A new factory bean is also provided to create a stateless retry interceptor with an optional StreamMessageRecoverer for use when consuming raw stream messages.
@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
StreamRetryOperationsInterceptorFactoryBean rfb =
new StreamRetryOperationsInterceptorFactoryBean();
rfb.setRetryOperations(retryTemplate);
rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
...
});
return rfb;
}
| Stateful retry is not supported with this container. |