public class ReactiveRedisStreamMessageProducer
extends org.springframework.integration.endpoint.MessageProducerSupport
MessageProducerSupport for reading messages from a Redis Stream and publishing them into the provided
output channel.
By default this adapter reads message as a standalone client XREAD (Redis command) but can be switched to a
Consumer Group feature XREADGROUP by setting consumerName field.
By default the Consumer Group name is the id of this bean IntegrationObjectSupport.getBeanName().lifecycleCondition, lifecycleLock| Constructor and Description |
|---|
ReactiveRedisStreamMessageProducer(org.springframework.data.redis.connection.ReactiveRedisConnectionFactory reactiveConnectionFactory,
java.lang.String streamKey) |
| Modifier and Type | Method and Description |
|---|---|
protected void |
doStart() |
java.lang.String |
getComponentType() |
protected void |
onInit() |
void |
setAutoAck(boolean autoAck)
Set whether or not acknowledge message read in the Consumer Group.
|
void |
setConsumerGroup(java.lang.String consumerGroup)
Set the name of the Consumer Group.
|
void |
setConsumerName(java.lang.String consumerName)
Set the name of the consumer.
|
void |
setCreateConsumerGroup(boolean createConsumerGroup)
Create the Consumer Group if and only if it does not exist.
|
void |
setExtractPayload(boolean extractPayload)
Configure this channel adapter to extract or not the message payload.
|
void |
setReadOffset(org.springframework.data.redis.connection.stream.ReadOffset readOffset)
Define the offset from which we want to read message.
|
void |
setStreamReceiverOptions(org.springframework.data.redis.stream.StreamReceiver.StreamReceiverOptions<java.lang.String,?> streamReceiverOptions)
Set
ReactiveStreamOperations used to customize the StreamReceiver. |
afterSingletonsInstantiated, buildErrorMessage, doStop, getErrorChannel, getErrorMessageAttributes, getIntegrationPatternType, getMessagingTemplate, getOutputChannel, sendErrorMessageIfNecessary, sendMessage, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setOutputChannel, setOutputChannelName, setSendTimeout, setShouldTrack, subscribeToPublisherdestroy, doStop, getPhase, getRole, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stopafterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringpublic ReactiveRedisStreamMessageProducer(org.springframework.data.redis.connection.ReactiveRedisConnectionFactory reactiveConnectionFactory,
java.lang.String streamKey)
public void setReadOffset(org.springframework.data.redis.connection.stream.ReadOffset readOffset)
ReadOffset.latest() is used.
ReadOffset.latest() is equal to '$', which is the Id used with XREAD to get new data added to
the stream. Note that when switching to the Consumer Group feature, we set it to
ReadOffset.lastConsumed() if it is still equal to ReadOffset.latest().readOffset - the desired offsetpublic void setExtractPayload(boolean extractPayload)
extractPayload - default truepublic void setAutoAck(boolean autoAck)
true by default.autoAck - the acknowledge option.public void setConsumerGroup(@Nullable
java.lang.String consumerGroup)
createConsumerGroup. If not set, the defined bean name IntegrationObjectSupport.getBeanName() is used.consumerGroup - the Consumer Group on which this adapter should register to listen messages.public void setConsumerName(@Nullable
java.lang.String consumerName)
consumerName - the consumer name in the Consumer Grouppublic void setCreateConsumerGroup(boolean createConsumerGroup)
MKSTREAM.createConsumerGroup - specify if we should create the Consumer Group, false by defaultpublic void setStreamReceiverOptions(@Nullable
org.springframework.data.redis.stream.StreamReceiver.StreamReceiverOptions<java.lang.String,?> streamReceiverOptions)
ReactiveStreamOperations used to customize the StreamReceiver.
It provides a way to set the polling timeout and the serialization context.
By default the polling timeout is set to infinite and
StringRedisSerializer is used.streamReceiverOptions - the desired receiver optionspublic java.lang.String getComponentType()
getComponentType in interface org.springframework.integration.support.context.NamedComponentgetComponentType in class org.springframework.integration.context.IntegrationObjectSupportprotected void onInit()
onInit in class org.springframework.integration.endpoint.MessageProducerSupportprotected void doStart()
doStart in class org.springframework.integration.endpoint.MessageProducerSupport