Class KinesisMessageDrivenChannelAdapter

java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.endpoint.AbstractEndpoint
org.springframework.integration.endpoint.MessageProducerSupport
org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter
All Implemented Interfaces:
org.springframework.beans.factory.Aware, org.springframework.beans.factory.BeanFactoryAware, org.springframework.beans.factory.BeanNameAware, org.springframework.beans.factory.DisposableBean, org.springframework.beans.factory.InitializingBean, org.springframework.beans.factory.SmartInitializingSingleton, org.springframework.context.ApplicationContextAware, org.springframework.context.ApplicationEventPublisherAware, org.springframework.context.Lifecycle, org.springframework.context.Phased, org.springframework.context.SmartLifecycle, org.springframework.integration.context.ExpressionCapable, org.springframework.integration.core.MessageProducer, org.springframework.integration.IntegrationPattern, org.springframework.integration.support.context.NamedComponent, org.springframework.integration.support.management.ManageableLifecycle, org.springframework.integration.support.management.ManageableSmartLifecycle, org.springframework.integration.support.management.TrackableComponent

@ManagedResource @IntegrationManagedResource public class KinesisMessageDrivenChannelAdapter extends org.springframework.integration.endpoint.MessageProducerSupport implements org.springframework.beans.factory.DisposableBean, org.springframework.context.ApplicationEventPublisherAware
The MessageProducerSupport implementation for receiving data from Amazon Kinesis stream(s).
Since:
1.1
Author:
Artem Bilan, Krzysztof Witkowski, Hervé Fortin, Dirk Bonhomme, Greg Eales, Asiel Caballero, Jonathan Nagayoshi
  • Constructor Details

    • KinesisMessageDrivenChannelAdapter

      public KinesisMessageDrivenChannelAdapter(AmazonKinesis amazonKinesis, String... streams)
    • KinesisMessageDrivenChannelAdapter

      public KinesisMessageDrivenChannelAdapter(AmazonKinesis amazonKinesis, KinesisShardOffset... shardOffsets)
  • Method Details

    • setApplicationEventPublisher

      public void setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher applicationEventPublisher)
      Specified by:
      setApplicationEventPublisher in interface org.springframework.context.ApplicationEventPublisherAware
    • setConsumerGroup

      public void setConsumerGroup(String consumerGroup)
    • setCheckpointStore

      public void setCheckpointStore(org.springframework.integration.metadata.ConcurrentMetadataStore checkpointStore)
    • setConsumerExecutor

      public void setConsumerExecutor(Executor executor)
    • setDispatcherExecutor

      public void setDispatcherExecutor(Executor dispatcherExecutor)
    • setStreamInitialSequence

      public void setStreamInitialSequence(KinesisShardOffset streamInitialSequence)
    • setConverter

      public void setConverter(org.springframework.core.convert.converter.Converter<byte[],Object> converter)
      Specify a Converter to deserialize the byte[] from record's body. Can be null meaning no deserialization.
      Parameters:
      converter - the Converter to use or null
    • setListenerMode

      public void setListenerMode(ListenerMode listenerMode)
    • setCheckpointMode

      public void setCheckpointMode(CheckpointMode checkpointMode)
    • setCheckpointsInterval

      public void setCheckpointsInterval(long checkpointsInterval)
      Sets the interval between 2 checkpoints. Only used when checkpointMode is periodic.
      Parameters:
      checkpointsInterval - interval between 2 checkpoints (in milliseconds)
      Since:
      2.2
    • setRecordsLimit

      public void setRecordsLimit(int recordsLimit)
      The maximum record to poll per on get-records request. Not greater then 10000.
      Parameters:
      recordsLimit - the number of records to for per on get-records request.
      See Also:
    • setConsumerBackoff

      public void setConsumerBackoff(int consumerBackoff)
    • setDescribeStreamBackoff

      public void setDescribeStreamBackoff(int describeStreamBackoff)
    • setDescribeStreamRetries

      public void setDescribeStreamRetries(int describeStreamRetries)
    • setStartTimeout

      public void setStartTimeout(int startTimeout)
    • setLockRenewalTimeout

      public void setLockRenewalTimeout(long lockRenewalTimeout)
      Configure a timeout in milliseconds to wait for lock on shard renewal.
      Parameters:
      lockRenewalTimeout - the timeout to wait for lock renew in milliseconds.
      Since:
      2.3.5
    • setConcurrency

      public void setConcurrency(int concurrency)
      The maximum number of concurrent KinesisMessageDrivenChannelAdapter.ConsumerInvokers running. The KinesisMessageDrivenChannelAdapter.ShardConsumers are evenly distributed between KinesisMessageDrivenChannelAdapter.ConsumerInvokers. Messages from within the same shard will be processed sequentially. In other words each shard is tied with the particular thread. By default the concurrency is unlimited and shard is processed in the consumerExecutor directly.
      Parameters:
      concurrency - the concurrency maximum number
    • setIdleBetweenPolls

      public void setIdleBetweenPolls(int idleBetweenPolls)
      The sleep interval in milliseconds used in the main loop between shards polling cycles. Defaults to 1000l minimum 250.
      Parameters:
      idleBetweenPolls - the interval to sleep between shards polling cycles.
    • setEmbeddedHeadersMapper

      public void setEmbeddedHeadersMapper(org.springframework.integration.mapping.InboundMessageMapper<byte[]> embeddedHeadersMapper)
      Specify an InboundMessageMapper to extract message headers embedded into the record data.
      Parameters:
      embeddedHeadersMapper - the InboundMessageMapper to use.
      Since:
      2.0
    • setLockRegistry

      public void setLockRegistry(org.springframework.integration.support.locks.LockRegistry lockRegistry)
      Specify a LockRegistry for an exclusive access to provided streams. This is not used when shards-based configuration is provided.
      Parameters:
      lockRegistry - the LockRegistry to use.
      Since:
      2.0
    • setBindSourceRecord

      public void setBindSourceRecord(boolean bindSourceRecord)
      Set to true to bind the source consumer record in the header named IntegrationMessageHeaderAccessor.SOURCE_DATA. Does not apply to batch listeners.
      Parameters:
      bindSourceRecord - true to bind.
      Since:
      2.2
    • setShardListFilter

      public void setShardListFilter(Function<List<Shard>,List<Shard>> shardListFilter)
      Specify a Function<List<Shard>, List<Shard>> to filter the shards which will be read from.
      Parameters:
      shardListFilter - the filter Function<List<Shard>, List<Shard>>
      Since:
      2.3.4
    • onInit

      protected void onInit()
      Overrides:
      onInit in class org.springframework.integration.endpoint.MessageProducerSupport
    • destroy

      public void destroy()
      Specified by:
      destroy in interface org.springframework.beans.factory.DisposableBean
      Overrides:
      destroy in class org.springframework.integration.endpoint.AbstractEndpoint
    • stopConsumer

      @ManagedOperation public void stopConsumer(String stream, String shard)
    • startConsumer

      @ManagedOperation public void startConsumer(String stream, String shard)
    • resetCheckpointForShardToLatest

      @ManagedOperation public void resetCheckpointForShardToLatest(String stream, String shard)
    • resetCheckpointForShardToTrimHorizon

      @ManagedOperation public void resetCheckpointForShardToTrimHorizon(String stream, String shard)
    • resetCheckpointForShardToSequenceNumber

      @ManagedOperation public void resetCheckpointForShardToSequenceNumber(String stream, String shard, String sequenceNumber)
    • resetCheckpointForShardAtTimestamp

      @ManagedOperation public void resetCheckpointForShardAtTimestamp(String stream, String shard, long timestamp)
    • resetCheckpoints

      @ManagedOperation public void resetCheckpoints()
    • doStart

      protected void doStart()
      Overrides:
      doStart in class org.springframework.integration.endpoint.MessageProducerSupport
    • doStop

      protected void doStop()
      Overrides:
      doStop in class org.springframework.integration.endpoint.MessageProducerSupport
    • getErrorMessageAttributes

      protected org.springframework.core.AttributeAccessor getErrorMessageAttributes(org.springframework.messaging.Message<?> message)
      Overrides:
      getErrorMessageAttributes in class org.springframework.integration.endpoint.MessageProducerSupport
    • toString

      public String toString()
      Overrides:
      toString in class org.springframework.integration.context.IntegrationObjectSupport