public class ReactiveRedisStreamMessageProducer extends MessageProducerSupport
MessageProducerSupport for reading messages from a Redis Stream and publishing them into the provided
output channel.
By default this adapter reads message as a standalone client XREAD (Redis command) but can be switched to a
Consumer Group feature XREADGROUP by setting consumerName field.
By default the Consumer Group name is the id of this bean IntegrationObjectSupport.getBeanName().lifecycleCondition, lifecycleLockEXPRESSION_PARSER, loggerDEFAULT_PHASE| Constructor and Description |
|---|
ReactiveRedisStreamMessageProducer(ReactiveRedisConnectionFactory reactiveConnectionFactory,
String streamKey) |
| Modifier and Type | Method and Description |
|---|---|
protected void |
doStart()
Take no action by default.
|
String |
getComponentType()
Subclasses may implement this method to provide component type information.
|
protected void |
onInit()
Subclasses may implement this for initialization logic.
|
void |
setAutoAck(boolean autoAck)
Set whether or not acknowledge message read in the Consumer Group.
|
void |
setConsumerGroup(String consumerGroup)
Set the name of the Consumer Group.
|
void |
setConsumerName(String consumerName)
Set the name of the consumer.
|
void |
setCreateConsumerGroup(boolean createConsumerGroup)
Create the Consumer Group if and only if it does not exist.
|
void |
setExtractPayload(boolean extractPayload)
Configure this channel adapter to extract or not the message payload.
|
void |
setReadOffset(ReadOffset readOffset)
Define the offset from which we want to read message.
|
void |
setStreamReceiverOptions(StreamReceiver.StreamReceiverOptions<String,?> streamReceiverOptions)
Set
ReactiveStreamOperations used to customize the StreamReceiver. |
afterSingletonsInstantiated, buildErrorMessage, doStop, getErrorChannel, getErrorMessageAttributes, getIntegrationPatternType, getMessagingTemplate, getOutputChannel, sendErrorMessageIfNecessary, sendMessage, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setOutputChannel, setOutputChannelName, setSendTimeout, setShouldTrack, subscribeToPublisherdestroy, doStop, getPhase, getRole, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stopafterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringclone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitgetBeanName, getComponentNamepublic ReactiveRedisStreamMessageProducer(ReactiveRedisConnectionFactory reactiveConnectionFactory, String streamKey)
public void setReadOffset(ReadOffset readOffset)
ReadOffset.latest() is used.
ReadOffset.latest() is equal to '$', which is the Id used with XREAD to get new data added to
the stream. Note that when switching to the Consumer Group feature, we set it to
ReadOffset.lastConsumed() if it is still equal to ReadOffset.latest().readOffset - the desired offsetpublic void setExtractPayload(boolean extractPayload)
extractPayload - default truepublic void setAutoAck(boolean autoAck)
true by default.autoAck - the acknowledge option.public void setConsumerGroup(@Nullable String consumerGroup)
createConsumerGroup. If not set, the defined bean name IntegrationObjectSupport.getBeanName() is used.consumerGroup - the Consumer Group on which this adapter should register to listen messages.public void setConsumerName(@Nullable String consumerName)
consumerName - the consumer name in the Consumer Grouppublic void setCreateConsumerGroup(boolean createConsumerGroup)
MKSTREAM.createConsumerGroup - specify if we should create the Consumer Group, false by defaultpublic void setStreamReceiverOptions(@Nullable StreamReceiver.StreamReceiverOptions<String,?> streamReceiverOptions)
ReactiveStreamOperations used to customize the StreamReceiver.
It provides a way to set the polling timeout and the serialization context.
By default the polling timeout is set to infinite and
StringRedisSerializer is used.streamReceiverOptions - the desired receiver optionspublic String getComponentType()
IntegrationObjectSupportgetComponentType in interface NamedComponentgetComponentType in class IntegrationObjectSupportprotected void onInit()
IntegrationObjectSupportonInit in class MessageProducerSupportprotected void doStart()
MessageProducerSupportdoStart in class MessageProducerSupport