@ManagedResource
@IntegrationManagedResource
public class KinesisMessageDrivenChannelAdapter
extends org.springframework.integration.endpoint.MessageProducerSupport
implements org.springframework.beans.factory.DisposableBean
MessageProducerSupport
implementation for receiving data from Amazon Kinesis stream(s).Constructor and Description |
---|
KinesisMessageDrivenChannelAdapter(com.amazonaws.services.kinesis.AmazonKinesis amazonKinesis,
KinesisShardOffset... shardOffsets) |
KinesisMessageDrivenChannelAdapter(com.amazonaws.services.kinesis.AmazonKinesis amazonKinesis,
java.lang.String... streams) |
Modifier and Type | Method and Description |
---|---|
void |
destroy() |
protected void |
doStart() |
protected void |
doStop() |
protected org.springframework.core.AttributeAccessor |
getErrorMessageAttributes(org.springframework.messaging.Message<?> message) |
protected void |
onInit() |
void |
resetCheckpointForShardAtTimestamp(java.lang.String stream,
java.lang.String shard,
long timestamp) |
void |
resetCheckpointForShardToLatest(java.lang.String stream,
java.lang.String shard) |
void |
resetCheckpointForShardToSequenceNumber(java.lang.String stream,
java.lang.String shard,
java.lang.String sequenceNumber) |
void |
resetCheckpointForShardToTrimHorizon(java.lang.String stream,
java.lang.String shard) |
void |
resetCheckpoints() |
void |
setCheckpointMode(CheckpointMode checkpointMode) |
void |
setCheckpointStore(org.springframework.integration.metadata.MetadataStore checkpointStore) |
void |
setConcurrency(int concurrency)
The maximum number of concurrent
ConsumerInvoker s running. |
void |
setConsumerBackoff(int consumerBackoff) |
void |
setConsumerExecutor(java.util.concurrent.Executor executor) |
void |
setConsumerGroup(java.lang.String consumerGroup) |
void |
setConverter(org.springframework.core.convert.converter.Converter<byte[],java.lang.Object> converter)
Specify a
Converter to deserialize the byte[] from record's body. |
void |
setDescribeStreamBackoff(int describeStreamBackoff) |
void |
setDescribeStreamRetries(int describeStreamRetries) |
void |
setDispatcherExecutor(java.util.concurrent.Executor dispatcherExecutor) |
void |
setIdleBetweenPolls(int idleBetweenPolls)
The sleep interval in milliseconds used in the main loop between shards polling cycles.
|
void |
setListenerMode(ListenerMode listenerMode) |
void |
setRecordsLimit(int recordsLimit)
The maximum record to poll per on get-records request.
|
void |
setStartTimeout(int startTimeout) |
void |
setStreamInitialSequence(KinesisShardOffset streamInitialSequence) |
void |
startConsumer(java.lang.String stream,
java.lang.String shard) |
void |
stopConsumer(java.lang.String stream,
java.lang.String shard) |
java.lang.String |
toString() |
afterSingletonsInstantiated, buildErrorMessage, getErrorChannel, getMessagingTemplate, getOutputChannel, sendErrorMessageIfNecessary, sendMessage, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setOutputChannel, setOutputChannelName, setSendTimeout, setShouldTrack
doStop, getPhase, getRole, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, setTaskScheduler, start, stop, stop
afterPropertiesSet, extractTypeIfPossible, getApplicationContext, getApplicationContextId, getBeanFactory, getChannelResolver, getComponentName, getComponentType, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression
public KinesisMessageDrivenChannelAdapter(com.amazonaws.services.kinesis.AmazonKinesis amazonKinesis, java.lang.String... streams)
public KinesisMessageDrivenChannelAdapter(com.amazonaws.services.kinesis.AmazonKinesis amazonKinesis, KinesisShardOffset... shardOffsets)
public void setConsumerGroup(java.lang.String consumerGroup)
public void setCheckpointStore(org.springframework.integration.metadata.MetadataStore checkpointStore)
public void setConsumerExecutor(java.util.concurrent.Executor executor)
public void setDispatcherExecutor(java.util.concurrent.Executor dispatcherExecutor)
public void setStreamInitialSequence(KinesisShardOffset streamInitialSequence)
public void setConverter(org.springframework.core.convert.converter.Converter<byte[],java.lang.Object> converter)
Converter
to deserialize the byte[]
from record's body.
Can be null
meaning no deserialization.converter
- the Converter
to use or nullpublic void setListenerMode(ListenerMode listenerMode)
public void setCheckpointMode(CheckpointMode checkpointMode)
public void setRecordsLimit(int recordsLimit)
10000
.recordsLimit
- the number of records to for per on get-records request.GetRecordsRequest.setLimit(java.lang.Integer)
public void setConsumerBackoff(int consumerBackoff)
public void setDescribeStreamBackoff(int describeStreamBackoff)
public void setDescribeStreamRetries(int describeStreamRetries)
public void setStartTimeout(int startTimeout)
public void setConcurrency(int concurrency)
ConsumerInvoker
s running.
The ShardConsumer
s are evenly distributed between ConsumerInvoker
s.
Messages from within the same shard will be processed sequentially.
In other words each shard is tied with the particular thread.
By default the concurrency is unlimited and shard
is processed in the consumerExecutor
directly.concurrency
- the concurrency maximum numberpublic void setIdleBetweenPolls(int idleBetweenPolls)
1000
l minimum 250
.idleBetweenPolls
- the interval to sleep between shards polling cycles.protected void onInit()
onInit
in class org.springframework.integration.endpoint.MessageProducerSupport
public void destroy()
destroy
in interface org.springframework.beans.factory.DisposableBean
destroy
in class org.springframework.integration.endpoint.AbstractEndpoint
@ManagedOperation public void stopConsumer(java.lang.String stream, java.lang.String shard)
@ManagedOperation public void startConsumer(java.lang.String stream, java.lang.String shard)
@ManagedOperation public void resetCheckpointForShardToLatest(java.lang.String stream, java.lang.String shard)
@ManagedOperation public void resetCheckpointForShardToTrimHorizon(java.lang.String stream, java.lang.String shard)
@ManagedOperation public void resetCheckpointForShardToSequenceNumber(java.lang.String stream, java.lang.String shard, java.lang.String sequenceNumber)
@ManagedOperation public void resetCheckpointForShardAtTimestamp(java.lang.String stream, java.lang.String shard, long timestamp)
@ManagedOperation public void resetCheckpoints()
protected void doStart()
doStart
in class org.springframework.integration.endpoint.MessageProducerSupport
protected void doStop()
doStop
in class org.springframework.integration.endpoint.MessageProducerSupport
protected org.springframework.core.AttributeAccessor getErrorMessageAttributes(org.springframework.messaging.Message<?> message)
getErrorMessageAttributes
in class org.springframework.integration.endpoint.MessageProducerSupport
public java.lang.String toString()
toString
in class org.springframework.integration.context.IntegrationObjectSupport