@ManagedResource
@IntegrationManagedResource
public class KinesisMessageDrivenChannelAdapter
extends org.springframework.integration.endpoint.MessageProducerSupport
implements org.springframework.beans.factory.DisposableBean, org.springframework.context.ApplicationEventPublisherAware
MessageProducerSupport implementation for receiving data from Amazon Kinesis
stream(s).lifecycleCondition, lifecycleLock| Constructor and Description |
|---|
KinesisMessageDrivenChannelAdapter(AmazonKinesis amazonKinesis,
KinesisShardOffset... shardOffsets) |
KinesisMessageDrivenChannelAdapter(AmazonKinesis amazonKinesis,
java.lang.String... streams) |
| Modifier and Type | Method and Description |
|---|---|
void |
destroy() |
protected void |
doStart() |
protected void |
doStop() |
protected org.springframework.core.AttributeAccessor |
getErrorMessageAttributes(org.springframework.messaging.Message<?> message) |
protected void |
onInit() |
void |
resetCheckpointForShardAtTimestamp(java.lang.String stream,
java.lang.String shard,
long timestamp) |
void |
resetCheckpointForShardToLatest(java.lang.String stream,
java.lang.String shard) |
void |
resetCheckpointForShardToSequenceNumber(java.lang.String stream,
java.lang.String shard,
java.lang.String sequenceNumber) |
void |
resetCheckpointForShardToTrimHorizon(java.lang.String stream,
java.lang.String shard) |
void |
resetCheckpoints() |
void |
setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher applicationEventPublisher) |
void |
setBindSourceRecord(boolean bindSourceRecord)
Set to true to bind the source consumer record in the header named
IntegrationMessageHeaderAccessor.SOURCE_DATA. |
void |
setCheckpointMode(CheckpointMode checkpointMode) |
void |
setCheckpointsInterval(long checkpointsInterval)
Sets the interval between 2 checkpoints.
|
void |
setCheckpointStore(org.springframework.integration.metadata.ConcurrentMetadataStore checkpointStore) |
void |
setConcurrency(int concurrency)
The maximum number of concurrent
ConsumerInvokers running. |
void |
setConsumerBackoff(int consumerBackoff) |
void |
setConsumerExecutor(java.util.concurrent.Executor executor) |
void |
setConsumerGroup(java.lang.String consumerGroup) |
void |
setConverter(org.springframework.core.convert.converter.Converter<byte[],java.lang.Object> converter)
Specify a
Converter to deserialize the byte[] from record's body. |
void |
setDescribeStreamBackoff(int describeStreamBackoff) |
void |
setDescribeStreamRetries(int describeStreamRetries) |
void |
setDispatcherExecutor(java.util.concurrent.Executor dispatcherExecutor) |
void |
setEmbeddedHeadersMapper(org.springframework.integration.mapping.InboundMessageMapper<byte[]> embeddedHeadersMapper)
Specify an
InboundMessageMapper to extract message headers embedded into the record
data. |
void |
setIdleBetweenPolls(int idleBetweenPolls)
The sleep interval in milliseconds used in the main loop between shards polling cycles.
|
void |
setListenerMode(ListenerMode listenerMode) |
void |
setLockRegistry(org.springframework.integration.support.locks.LockRegistry lockRegistry)
Specify a
LockRegistry for an exclusive access to provided streams. |
void |
setLockRenewalTimeout(long lockRenewalTimeout)
Configure a timeout in milliseconds to wait for lock on shard renewal.
|
void |
setRecordsLimit(int recordsLimit)
The maximum record to poll per on get-records request.
|
void |
setShardListFilter(java.util.function.Function<java.util.List<Shard>,java.util.List<Shard>> shardListFilter)
Specify a
Function<List<Shard>, List<Shard>> to filter the shards which will
be read from. |
void |
setStartTimeout(int startTimeout) |
void |
setStreamInitialSequence(KinesisShardOffset streamInitialSequence) |
void |
startConsumer(java.lang.String stream,
java.lang.String shard) |
void |
stopConsumer(java.lang.String stream,
java.lang.String shard) |
java.lang.String |
toString() |
afterSingletonsInstantiated, buildErrorMessage, getErrorChannel, getIntegrationPatternType, getMessagingTemplate, getOutputChannel, sendErrorMessageIfNecessary, sendMessage, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setOutputChannel, setOutputChannelName, setSendTimeout, setShouldTrack, subscribeToPublisherdoStop, getPhase, getRole, isActive, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stopafterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getComponentType, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskSchedulerpublic KinesisMessageDrivenChannelAdapter(AmazonKinesis amazonKinesis, java.lang.String... streams)
public KinesisMessageDrivenChannelAdapter(AmazonKinesis amazonKinesis, KinesisShardOffset... shardOffsets)
public void setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher applicationEventPublisher)
setApplicationEventPublisher in interface org.springframework.context.ApplicationEventPublisherAwarepublic void setConsumerGroup(java.lang.String consumerGroup)
public void setCheckpointStore(org.springframework.integration.metadata.ConcurrentMetadataStore checkpointStore)
public void setConsumerExecutor(java.util.concurrent.Executor executor)
public void setDispatcherExecutor(java.util.concurrent.Executor dispatcherExecutor)
public void setStreamInitialSequence(KinesisShardOffset streamInitialSequence)
public void setConverter(org.springframework.core.convert.converter.Converter<byte[],java.lang.Object> converter)
Converter to deserialize the byte[] from record's body. Can be null meaning no deserialization.converter - the Converter to use or nullpublic void setListenerMode(ListenerMode listenerMode)
public void setCheckpointMode(CheckpointMode checkpointMode)
public void setCheckpointsInterval(long checkpointsInterval)
checkpointsInterval - interval between 2 checkpoints (in milliseconds)public void setRecordsLimit(int recordsLimit)
10000.recordsLimit - the number of records to for per on get-records request.GetRecordsRequest.setLimit(java.lang.Integer)public void setConsumerBackoff(int consumerBackoff)
public void setDescribeStreamBackoff(int describeStreamBackoff)
public void setDescribeStreamRetries(int describeStreamRetries)
public void setStartTimeout(int startTimeout)
public void setLockRenewalTimeout(long lockRenewalTimeout)
lockRenewalTimeout - the timeout to wait for lock renew in milliseconds.public void setConcurrency(int concurrency)
ConsumerInvokers running. The ShardConsumers
are evenly distributed between ConsumerInvokers. Messages from within the same shard
will be processed sequentially. In other words each shard is tied with the particular thread.
By default the concurrency is unlimited and shard is processed in the consumerExecutor
directly.concurrency - the concurrency maximum numberpublic void setIdleBetweenPolls(int idleBetweenPolls)
1000l minimum 250.idleBetweenPolls - the interval to sleep between shards polling cycles.public void setEmbeddedHeadersMapper(org.springframework.integration.mapping.InboundMessageMapper<byte[]> embeddedHeadersMapper)
InboundMessageMapper to extract message headers embedded into the record
data.embeddedHeadersMapper - the InboundMessageMapper to use.public void setLockRegistry(org.springframework.integration.support.locks.LockRegistry lockRegistry)
LockRegistry for an exclusive access to provided streams. This is not used
when shards-based configuration is provided.lockRegistry - the LockRegistry to use.public void setBindSourceRecord(boolean bindSourceRecord)
IntegrationMessageHeaderAccessor.SOURCE_DATA. Does not apply to batch listeners.bindSourceRecord - true to bind.public void setShardListFilter(java.util.function.Function<java.util.List<Shard>,java.util.List<Shard>> shardListFilter)
Function<List<Shard>, List<Shard>> to filter the shards which will
be read from.shardListFilter - the filter Function<List<Shard>, List<Shard>>protected void onInit()
onInit in class org.springframework.integration.endpoint.MessageProducerSupportpublic void destroy()
destroy in interface org.springframework.beans.factory.DisposableBeandestroy in class org.springframework.integration.endpoint.AbstractEndpoint@ManagedOperation
public void stopConsumer(java.lang.String stream,
java.lang.String shard)
@ManagedOperation
public void startConsumer(java.lang.String stream,
java.lang.String shard)
@ManagedOperation
public void resetCheckpointForShardToLatest(java.lang.String stream,
java.lang.String shard)
@ManagedOperation
public void resetCheckpointForShardToTrimHorizon(java.lang.String stream,
java.lang.String shard)
@ManagedOperation
public void resetCheckpointForShardToSequenceNumber(java.lang.String stream,
java.lang.String shard,
java.lang.String sequenceNumber)
@ManagedOperation
public void resetCheckpointForShardAtTimestamp(java.lang.String stream,
java.lang.String shard,
long timestamp)
@ManagedOperation public void resetCheckpoints()
protected void doStart()
doStart in class org.springframework.integration.endpoint.MessageProducerSupportprotected void doStop()
doStop in class org.springframework.integration.endpoint.MessageProducerSupportprotected org.springframework.core.AttributeAccessor getErrorMessageAttributes(org.springframework.messaging.Message<?> message)
getErrorMessageAttributes in class org.springframework.integration.endpoint.MessageProducerSupportpublic java.lang.String toString()
toString in class org.springframework.integration.context.IntegrationObjectSupport