Class KinesisMessageDrivenChannelAdapter
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.endpoint.AbstractEndpoint
org.springframework.integration.endpoint.MessageProducerSupport
org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter
- All Implemented Interfaces:
org.springframework.beans.factory.Aware
,org.springframework.beans.factory.BeanFactoryAware
,org.springframework.beans.factory.BeanNameAware
,org.springframework.beans.factory.DisposableBean
,org.springframework.beans.factory.InitializingBean
,org.springframework.beans.factory.SmartInitializingSingleton
,org.springframework.context.ApplicationContextAware
,org.springframework.context.ApplicationEventPublisherAware
,org.springframework.context.Lifecycle
,org.springframework.context.Phased
,org.springframework.context.SmartLifecycle
,org.springframework.integration.context.ExpressionCapable
,org.springframework.integration.core.MessageProducer
,org.springframework.integration.IntegrationPattern
,org.springframework.integration.support.context.NamedComponent
,org.springframework.integration.support.management.IntegrationInboundManagement
,org.springframework.integration.support.management.IntegrationManagement
,org.springframework.integration.support.management.ManageableLifecycle
,org.springframework.integration.support.management.ManageableSmartLifecycle
,org.springframework.integration.support.management.TrackableComponent
@ManagedResource
@IntegrationManagedResource
public class KinesisMessageDrivenChannelAdapter
extends org.springframework.integration.endpoint.MessageProducerSupport
implements org.springframework.beans.factory.DisposableBean, org.springframework.context.ApplicationEventPublisherAware
The
MessageProducerSupport
implementation for receiving data from Amazon Kinesis
stream(s).- Since:
- 1.1
- Author:
- Artem Bilan, Krzysztof Witkowski, Hervé Fortin, Dirk Bonhomme, Greg Eales, Asiel Caballero, Jonathan Nagayoshi
-
Nested Class Summary
Nested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagement
org.springframework.integration.support.management.IntegrationManagement.ManagementOverrides
-
Field Summary
Fields inherited from class org.springframework.integration.endpoint.AbstractEndpoint
lifecycleCondition, lifecycleLock
Fields inherited from class org.springframework.integration.context.IntegrationObjectSupport
EXPRESSION_PARSER, logger
Fields inherited from interface org.springframework.integration.support.management.IntegrationManagement
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
Fields inherited from interface org.springframework.context.SmartLifecycle
DEFAULT_PHASE
-
Constructor Summary
ConstructorsConstructorDescriptionKinesisMessageDrivenChannelAdapter
(software.amazon.awssdk.services.kinesis.KinesisAsyncClient amazonKinesis, String... streams) KinesisMessageDrivenChannelAdapter
(software.amazon.awssdk.services.kinesis.KinesisAsyncClient amazonKinesis, KinesisShardOffset... shardOffsets) -
Method Summary
Modifier and TypeMethodDescriptionvoid
destroy()
protected void
doStart()
protected void
doStop()
protected org.springframework.core.AttributeAccessor
getErrorMessageAttributes
(org.springframework.messaging.Message<?> message) protected void
onInit()
void
resetCheckpointForShardAtTimestamp
(String stream, String shard, long timestamp) void
resetCheckpointForShardToLatest
(String stream, String shard) void
resetCheckpointForShardToSequenceNumber
(String stream, String shard, String sequenceNumber) void
resetCheckpointForShardToTrimHorizon
(String stream, String shard) void
void
setApplicationEventPublisher
(org.springframework.context.ApplicationEventPublisher applicationEventPublisher) void
setBindSourceRecord
(boolean bindSourceRecord) Set to true to bind the source consumer record in the header namedIntegrationMessageHeaderAccessor.SOURCE_DATA
.void
setCheckpointMode
(CheckpointMode checkpointMode) void
setCheckpointsInterval
(long checkpointsInterval) Sets the interval between 2 checkpoints.void
setCheckpointStore
(org.springframework.integration.metadata.ConcurrentMetadataStore checkpointStore) void
setConcurrency
(int concurrency) The maximum number of concurrentKinesisMessageDrivenChannelAdapter.ConsumerInvoker
s running.void
setConsumerBackoff
(int consumerBackoff) void
setConsumerExecutor
(Executor executor) void
setConsumerGroup
(String consumerGroup) void
setConverter
(org.springframework.core.convert.converter.Converter<byte[], Object> converter) Specify aConverter
to deserialize thebyte[]
from record's body.void
setDescribeStreamBackoff
(int describeStreamBackoff) void
setDescribeStreamRetries
(int describeStreamRetries) void
setDispatcherExecutor
(Executor dispatcherExecutor) void
setEmbeddedHeadersMapper
(org.springframework.integration.mapping.InboundMessageMapper<byte[]> embeddedHeadersMapper) Specify anInboundMessageMapper
to extract message headers embedded into the record data.void
setIdleBetweenPolls
(int idleBetweenPolls) The sleep interval in milliseconds used in the main loop between shards polling cycles.void
setListenerMode
(ListenerMode listenerMode) void
setLockRegistry
(org.springframework.integration.support.locks.LockRegistry lockRegistry) Specify aLockRegistry
for exclusive access to provided streams.void
setLockRenewalTimeout
(long lockRenewalTimeout) Configure a timeout in milliseconds to wait for lock on shard renewal.void
setRecordsLimit
(int recordsLimit) The maximum record to poll per on get-records request.void
setShardListFilter
(Function<List<software.amazon.awssdk.services.kinesis.model.Shard>, List<software.amazon.awssdk.services.kinesis.model.Shard>> shardListFilter) Specify aFunction<List<Shard>, List<Shard>>
to filter the shards which will be read from.void
setStartTimeout
(int startTimeout) void
setStreamInitialSequence
(KinesisShardOffset streamInitialSequence) void
startConsumer
(String stream, String shard) void
stopConsumer
(String stream, String shard) toString()
Methods inherited from class org.springframework.integration.endpoint.MessageProducerSupport
afterSingletonsInstantiated, buildErrorMessage, getErrorChannel, getErrorMessageStrategy, getIntegrationPatternType, getMessagingTemplate, getOutputChannel, isObserved, registerObservationRegistry, sendErrorMessageIfNecessary, sendMessage, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setObservationConvention, setOutputChannel, setOutputChannelName, setSendTimeout, setShouldTrack, subscribeToPublisher
Methods inherited from class org.springframework.integration.endpoint.AbstractEndpoint
doStop, getPhase, getRole, isActive, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stop
Methods inherited from class org.springframework.integration.context.IntegrationObjectSupport
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getComponentType, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
Methods inherited from interface org.springframework.integration.support.management.IntegrationManagement
getManagedName, getManagedType, getOverrides, getThisAs, isLoggingEnabled, registerMetricsCaptor, setLoggingEnabled, setManagedName, setManagedType
Methods inherited from interface org.springframework.integration.support.context.NamedComponent
getBeanName, getComponentName, getComponentType
-
Constructor Details
-
KinesisMessageDrivenChannelAdapter
public KinesisMessageDrivenChannelAdapter(software.amazon.awssdk.services.kinesis.KinesisAsyncClient amazonKinesis, String... streams) -
KinesisMessageDrivenChannelAdapter
public KinesisMessageDrivenChannelAdapter(software.amazon.awssdk.services.kinesis.KinesisAsyncClient amazonKinesis, KinesisShardOffset... shardOffsets)
-
-
Method Details
-
setApplicationEventPublisher
public void setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher applicationEventPublisher) - Specified by:
setApplicationEventPublisher
in interfaceorg.springframework.context.ApplicationEventPublisherAware
-
setConsumerGroup
-
setCheckpointStore
public void setCheckpointStore(org.springframework.integration.metadata.ConcurrentMetadataStore checkpointStore) -
setConsumerExecutor
-
setDispatcherExecutor
-
setStreamInitialSequence
-
setConverter
public void setConverter(org.springframework.core.convert.converter.Converter<byte[], Object> converter) Specify aConverter
to deserialize thebyte[]
from record's body. Can benull
meaning no deserialization.- Parameters:
converter
- theConverter
to use or null
-
setListenerMode
-
setCheckpointMode
-
setCheckpointsInterval
public void setCheckpointsInterval(long checkpointsInterval) Sets the interval between 2 checkpoints. Only used when checkpointMode is periodic.- Parameters:
checkpointsInterval
- interval between 2 checkpoints (in milliseconds)- Since:
- 2.2
-
setRecordsLimit
public void setRecordsLimit(int recordsLimit) The maximum record to poll per on get-records request. Not greater then10000
.- Parameters:
recordsLimit
- the number of records to for per on get-records request.- See Also:
-
GetRecordsRequest.Builder.limit(Integer)
-
setConsumerBackoff
public void setConsumerBackoff(int consumerBackoff) -
setDescribeStreamBackoff
public void setDescribeStreamBackoff(int describeStreamBackoff) -
setDescribeStreamRetries
public void setDescribeStreamRetries(int describeStreamRetries) -
setStartTimeout
public void setStartTimeout(int startTimeout) -
setLockRenewalTimeout
public void setLockRenewalTimeout(long lockRenewalTimeout) Configure a timeout in milliseconds to wait for lock on shard renewal.- Parameters:
lockRenewalTimeout
- the timeout to wait for lock renew in milliseconds.- Since:
- 2.3.5
-
setConcurrency
public void setConcurrency(int concurrency) The maximum number of concurrentKinesisMessageDrivenChannelAdapter.ConsumerInvoker
s running. TheKinesisMessageDrivenChannelAdapter.ShardConsumer
s are evenly distributed betweenKinesisMessageDrivenChannelAdapter.ConsumerInvoker
s. Messages from within the same shard will be processed sequentially. In other words each shard is tied with the particular thread. By default the concurrency is unlimited and shard is processed in theconsumerExecutor
directly.- Parameters:
concurrency
- the concurrency maximum number
-
setIdleBetweenPolls
public void setIdleBetweenPolls(int idleBetweenPolls) The sleep interval in milliseconds used in the main loop between shards polling cycles. Defaults to1000
l minimum250
.- Parameters:
idleBetweenPolls
- the interval to sleep between shards polling cycles.
-
setEmbeddedHeadersMapper
public void setEmbeddedHeadersMapper(org.springframework.integration.mapping.InboundMessageMapper<byte[]> embeddedHeadersMapper) Specify anInboundMessageMapper
to extract message headers embedded into the record data.- Parameters:
embeddedHeadersMapper
- theInboundMessageMapper
to use.- Since:
- 2.0
-
setLockRegistry
public void setLockRegistry(org.springframework.integration.support.locks.LockRegistry lockRegistry) Specify aLockRegistry
for exclusive access to provided streams. This is not used when shards-based configuration is provided.- Parameters:
lockRegistry
- theLockRegistry
to use.- Since:
- 2.0
-
setBindSourceRecord
public void setBindSourceRecord(boolean bindSourceRecord) Set to true to bind the source consumer record in the header namedIntegrationMessageHeaderAccessor.SOURCE_DATA
. Does not apply to batch listeners.- Parameters:
bindSourceRecord
- true to bind.- Since:
- 2.2
-
setShardListFilter
public void setShardListFilter(Function<List<software.amazon.awssdk.services.kinesis.model.Shard>, List<software.amazon.awssdk.services.kinesis.model.Shard>> shardListFilter) Specify aFunction<List<Shard>, List<Shard>>
to filter the shards which will be read from.- Parameters:
shardListFilter
- the filterFunction<List<Shard>, List<Shard>>
- Since:
- 2.3.4
-
onInit
protected void onInit()- Overrides:
onInit
in classorg.springframework.integration.endpoint.MessageProducerSupport
-
destroy
public void destroy()- Specified by:
destroy
in interfaceorg.springframework.beans.factory.DisposableBean
- Specified by:
destroy
in interfaceorg.springframework.integration.support.management.IntegrationManagement
- Overrides:
destroy
in classorg.springframework.integration.endpoint.AbstractEndpoint
-
stopConsumer
-
startConsumer
-
resetCheckpointForShardToLatest
-
resetCheckpointForShardToTrimHorizon
-
resetCheckpointForShardToSequenceNumber
-
resetCheckpointForShardAtTimestamp
-
resetCheckpoints
@ManagedOperation public void resetCheckpoints() -
doStart
protected void doStart()- Overrides:
doStart
in classorg.springframework.integration.endpoint.MessageProducerSupport
-
doStop
protected void doStop()- Overrides:
doStop
in classorg.springframework.integration.endpoint.MessageProducerSupport
-
getErrorMessageAttributes
protected org.springframework.core.AttributeAccessor getErrorMessageAttributes(org.springframework.messaging.Message<?> message) - Overrides:
getErrorMessageAttributes
in classorg.springframework.integration.endpoint.MessageProducerSupport
-
toString
- Overrides:
toString
in classorg.springframework.integration.context.IntegrationObjectSupport
-