Class KinesisMessageDrivenChannelAdapter

java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.endpoint.AbstractEndpoint
org.springframework.integration.endpoint.MessageProducerSupport
org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter
All Implemented Interfaces:
org.springframework.beans.factory.Aware, org.springframework.beans.factory.BeanFactoryAware, org.springframework.beans.factory.BeanNameAware, org.springframework.beans.factory.DisposableBean, org.springframework.beans.factory.InitializingBean, org.springframework.beans.factory.SmartInitializingSingleton, org.springframework.context.ApplicationContextAware, org.springframework.context.ApplicationEventPublisherAware, org.springframework.context.Lifecycle, org.springframework.context.Phased, org.springframework.context.SmartLifecycle, org.springframework.integration.context.ExpressionCapable, org.springframework.integration.core.MessageProducer, org.springframework.integration.IntegrationPattern, org.springframework.integration.support.context.NamedComponent, org.springframework.integration.support.management.IntegrationInboundManagement, org.springframework.integration.support.management.IntegrationManagement, org.springframework.integration.support.management.ManageableLifecycle, org.springframework.integration.support.management.ManageableSmartLifecycle, org.springframework.integration.support.management.TrackableComponent

@ManagedResource @IntegrationManagedResource public class KinesisMessageDrivenChannelAdapter extends org.springframework.integration.endpoint.MessageProducerSupport implements org.springframework.beans.factory.DisposableBean, org.springframework.context.ApplicationEventPublisherAware
The MessageProducerSupport implementation for receiving data from Amazon Kinesis stream(s).
Since:
1.1
Author:
Artem Bilan, Krzysztof Witkowski, Hervé Fortin, Dirk Bonhomme, Greg Eales, Asiel Caballero, Jonathan Nagayoshi
  • Nested Class Summary

    Nested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagement

    org.springframework.integration.support.management.IntegrationManagement.ManagementOverrides
  • Field Summary

    Fields inherited from class org.springframework.integration.endpoint.AbstractEndpoint

    lifecycleCondition, lifecycleLock

    Fields inherited from class org.springframework.integration.context.IntegrationObjectSupport

    EXPRESSION_PARSER, logger

    Fields inherited from interface org.springframework.integration.support.management.IntegrationManagement

    METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME

    Fields inherited from interface org.springframework.context.SmartLifecycle

    DEFAULT_PHASE
  • Constructor Summary

    Constructors
    Constructor
    Description
    KinesisMessageDrivenChannelAdapter(software.amazon.awssdk.services.kinesis.KinesisAsyncClient amazonKinesis, String... streams)
     
    KinesisMessageDrivenChannelAdapter(software.amazon.awssdk.services.kinesis.KinesisAsyncClient amazonKinesis, KinesisShardOffset... shardOffsets)
     
  • Method Summary

    Modifier and Type
    Method
    Description
    void
     
    protected void
     
    protected void
     
    protected org.springframework.core.AttributeAccessor
    getErrorMessageAttributes(org.springframework.messaging.Message<?> message)
     
    protected void
     
    void
    resetCheckpointForShardAtTimestamp(String stream, String shard, long timestamp)
     
    void
     
    void
     
    void
     
    void
     
    void
    setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher applicationEventPublisher)
     
    void
    setBindSourceRecord(boolean bindSourceRecord)
    Set to true to bind the source consumer record in the header named IntegrationMessageHeaderAccessor.SOURCE_DATA.
    void
     
    void
    setCheckpointsInterval(long checkpointsInterval)
    Sets the interval between 2 checkpoints.
    void
    setCheckpointStore(org.springframework.integration.metadata.ConcurrentMetadataStore checkpointStore)
     
    void
    setConcurrency(int concurrency)
    The maximum number of concurrent KinesisMessageDrivenChannelAdapter.ConsumerInvokers running.
    void
    setConsumerBackoff(int consumerBackoff)
     
    void
     
    void
    setConsumerGroup(String consumerGroup)
     
    void
    setConverter(org.springframework.core.convert.converter.Converter<byte[],Object> converter)
    Specify a Converter to deserialize the byte[] from record's body.
    void
    setDescribeStreamBackoff(int describeStreamBackoff)
     
    void
    setDescribeStreamRetries(int describeStreamRetries)
     
    void
    setDispatcherExecutor(Executor dispatcherExecutor)
     
    void
    setEmbeddedHeadersMapper(org.springframework.integration.mapping.InboundMessageMapper<byte[]> embeddedHeadersMapper)
    Specify an InboundMessageMapper to extract message headers embedded into the record data.
    void
    setIdleBetweenPolls(int idleBetweenPolls)
    The sleep interval in milliseconds used in the main loop between shards polling cycles.
    void
     
    void
    setLockRegistry(org.springframework.integration.support.locks.LockRegistry lockRegistry)
    Specify a LockRegistry for exclusive access to provided streams.
    void
    setLockRenewalTimeout(long lockRenewalTimeout)
    Configure a timeout in milliseconds to wait for lock on shard renewal.
    void
    setRecordsLimit(int recordsLimit)
    The maximum record to poll per on get-records request.
    void
    setShardListFilter(Function<List<software.amazon.awssdk.services.kinesis.model.Shard>,List<software.amazon.awssdk.services.kinesis.model.Shard>> shardListFilter)
    Specify a Function<List<Shard>, List<Shard>> to filter the shards which will be read from.
    void
    setStartTimeout(int startTimeout)
     
    void
     
    void
    startConsumer(String stream, String shard)
     
    void
    stopConsumer(String stream, String shard)
     
     

    Methods inherited from class org.springframework.integration.endpoint.MessageProducerSupport

    afterSingletonsInstantiated, buildErrorMessage, getErrorChannel, getErrorMessageStrategy, getIntegrationPatternType, getMessagingTemplate, getOutputChannel, isObserved, registerObservationRegistry, sendErrorMessageIfNecessary, sendMessage, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setObservationConvention, setOutputChannel, setOutputChannelName, setSendTimeout, setShouldTrack, subscribeToPublisher

    Methods inherited from class org.springframework.integration.endpoint.AbstractEndpoint

    doStop, getPhase, getRole, isActive, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stop

    Methods inherited from class org.springframework.integration.context.IntegrationObjectSupport

    afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getComponentType, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler

    Methods inherited from class java.lang.Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait

    Methods inherited from interface org.springframework.integration.support.management.IntegrationManagement

    getManagedName, getManagedType, getOverrides, getThisAs, isLoggingEnabled, registerMetricsCaptor, setLoggingEnabled, setManagedName, setManagedType

    Methods inherited from interface org.springframework.integration.support.context.NamedComponent

    getBeanName, getComponentName, getComponentType
  • Constructor Details

    • KinesisMessageDrivenChannelAdapter

      public KinesisMessageDrivenChannelAdapter(software.amazon.awssdk.services.kinesis.KinesisAsyncClient amazonKinesis, String... streams)
    • KinesisMessageDrivenChannelAdapter

      public KinesisMessageDrivenChannelAdapter(software.amazon.awssdk.services.kinesis.KinesisAsyncClient amazonKinesis, KinesisShardOffset... shardOffsets)
  • Method Details

    • setApplicationEventPublisher

      public void setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher applicationEventPublisher)
      Specified by:
      setApplicationEventPublisher in interface org.springframework.context.ApplicationEventPublisherAware
    • setConsumerGroup

      public void setConsumerGroup(String consumerGroup)
    • setCheckpointStore

      public void setCheckpointStore(org.springframework.integration.metadata.ConcurrentMetadataStore checkpointStore)
    • setConsumerExecutor

      public void setConsumerExecutor(Executor executor)
    • setDispatcherExecutor

      public void setDispatcherExecutor(Executor dispatcherExecutor)
    • setStreamInitialSequence

      public void setStreamInitialSequence(KinesisShardOffset streamInitialSequence)
    • setConverter

      public void setConverter(org.springframework.core.convert.converter.Converter<byte[],Object> converter)
      Specify a Converter to deserialize the byte[] from record's body. Can be null meaning no deserialization.
      Parameters:
      converter - the Converter to use or null
    • setListenerMode

      public void setListenerMode(ListenerMode listenerMode)
    • setCheckpointMode

      public void setCheckpointMode(CheckpointMode checkpointMode)
    • setCheckpointsInterval

      public void setCheckpointsInterval(long checkpointsInterval)
      Sets the interval between 2 checkpoints. Only used when checkpointMode is periodic.
      Parameters:
      checkpointsInterval - interval between 2 checkpoints (in milliseconds)
      Since:
      2.2
    • setRecordsLimit

      public void setRecordsLimit(int recordsLimit)
      The maximum record to poll per on get-records request. Not greater then 10000.
      Parameters:
      recordsLimit - the number of records to for per on get-records request.
      See Also:
      • GetRecordsRequest.Builder.limit(Integer)
    • setConsumerBackoff

      public void setConsumerBackoff(int consumerBackoff)
    • setDescribeStreamBackoff

      public void setDescribeStreamBackoff(int describeStreamBackoff)
    • setDescribeStreamRetries

      public void setDescribeStreamRetries(int describeStreamRetries)
    • setStartTimeout

      public void setStartTimeout(int startTimeout)
    • setLockRenewalTimeout

      public void setLockRenewalTimeout(long lockRenewalTimeout)
      Configure a timeout in milliseconds to wait for lock on shard renewal.
      Parameters:
      lockRenewalTimeout - the timeout to wait for lock renew in milliseconds.
      Since:
      2.3.5
    • setConcurrency

      public void setConcurrency(int concurrency)
      The maximum number of concurrent KinesisMessageDrivenChannelAdapter.ConsumerInvokers running. The KinesisMessageDrivenChannelAdapter.ShardConsumers are evenly distributed between KinesisMessageDrivenChannelAdapter.ConsumerInvokers. Messages from within the same shard will be processed sequentially. In other words each shard is tied with the particular thread. By default the concurrency is unlimited and shard is processed in the consumerExecutor directly.
      Parameters:
      concurrency - the concurrency maximum number
    • setIdleBetweenPolls

      public void setIdleBetweenPolls(int idleBetweenPolls)
      The sleep interval in milliseconds used in the main loop between shards polling cycles. Defaults to 1000l minimum 250.
      Parameters:
      idleBetweenPolls - the interval to sleep between shards polling cycles.
    • setEmbeddedHeadersMapper

      public void setEmbeddedHeadersMapper(org.springframework.integration.mapping.InboundMessageMapper<byte[]> embeddedHeadersMapper)
      Specify an InboundMessageMapper to extract message headers embedded into the record data.
      Parameters:
      embeddedHeadersMapper - the InboundMessageMapper to use.
      Since:
      2.0
    • setLockRegistry

      public void setLockRegistry(org.springframework.integration.support.locks.LockRegistry lockRegistry)
      Specify a LockRegistry for exclusive access to provided streams. This is not used when shards-based configuration is provided.
      Parameters:
      lockRegistry - the LockRegistry to use.
      Since:
      2.0
    • setBindSourceRecord

      public void setBindSourceRecord(boolean bindSourceRecord)
      Set to true to bind the source consumer record in the header named IntegrationMessageHeaderAccessor.SOURCE_DATA. Does not apply to batch listeners.
      Parameters:
      bindSourceRecord - true to bind.
      Since:
      2.2
    • setShardListFilter

      public void setShardListFilter(Function<List<software.amazon.awssdk.services.kinesis.model.Shard>,List<software.amazon.awssdk.services.kinesis.model.Shard>> shardListFilter)
      Specify a Function<List<Shard>, List<Shard>> to filter the shards which will be read from.
      Parameters:
      shardListFilter - the filter Function<List<Shard>, List<Shard>>
      Since:
      2.3.4
    • onInit

      protected void onInit()
      Overrides:
      onInit in class org.springframework.integration.endpoint.MessageProducerSupport
    • destroy

      public void destroy()
      Specified by:
      destroy in interface org.springframework.beans.factory.DisposableBean
      Specified by:
      destroy in interface org.springframework.integration.support.management.IntegrationManagement
      Overrides:
      destroy in class org.springframework.integration.endpoint.AbstractEndpoint
    • stopConsumer

      @ManagedOperation public void stopConsumer(String stream, String shard)
    • startConsumer

      @ManagedOperation public void startConsumer(String stream, String shard)
    • resetCheckpointForShardToLatest

      @ManagedOperation public void resetCheckpointForShardToLatest(String stream, String shard)
    • resetCheckpointForShardToTrimHorizon

      @ManagedOperation public void resetCheckpointForShardToTrimHorizon(String stream, String shard)
    • resetCheckpointForShardToSequenceNumber

      @ManagedOperation public void resetCheckpointForShardToSequenceNumber(String stream, String shard, String sequenceNumber)
    • resetCheckpointForShardAtTimestamp

      @ManagedOperation public void resetCheckpointForShardAtTimestamp(String stream, String shard, long timestamp)
    • resetCheckpoints

      @ManagedOperation public void resetCheckpoints()
    • doStart

      protected void doStart()
      Overrides:
      doStart in class org.springframework.integration.endpoint.MessageProducerSupport
    • doStop

      protected void doStop()
      Overrides:
      doStop in class org.springframework.integration.endpoint.MessageProducerSupport
    • getErrorMessageAttributes

      protected org.springframework.core.AttributeAccessor getErrorMessageAttributes(org.springframework.messaging.Message<?> message)
      Overrides:
      getErrorMessageAttributes in class org.springframework.integration.endpoint.MessageProducerSupport
    • toString

      public String toString()
      Overrides:
      toString in class org.springframework.integration.context.IntegrationObjectSupport