Class JdbcChannelMessageStore

java.lang.Object
org.springframework.integration.jdbc.store.JdbcChannelMessageStore
All Implemented Interfaces:
org.springframework.beans.factory.InitializingBean, org.springframework.integration.store.BasicMessageGroupStore, org.springframework.integration.store.ChannelMessageStore, org.springframework.integration.store.PriorityCapableChannelMessageStore

@ManagedResource
public class JdbcChannelMessageStore
extends java.lang.Object
implements org.springframework.integration.store.PriorityCapableChannelMessageStore, org.springframework.beans.factory.InitializingBean

Channel-specific implementation of MessageGroupStore using a relational database via JDBC. This message store shall be used for message channels only.

As such, the JdbcChannelMessageStore uses database specific SQL queries.

Contrary to the JdbcMessageStore, this implementation uses a single database table, optimized to operate like a queue. The SQL scripts for creating the table are packaged under org/springframework/integration/jdbc/schema-*.sql, where * denotes the target database type.

Since:
2.2
  • Field Summary

    Fields 
    Modifier and Type Field Description
    static java.lang.String DEFAULT_REGION
    Default region property, used to partition the message store.
    static java.lang.String DEFAULT_TABLE_PREFIX
    Default value for the table prefix property.
  • Constructor Summary

    Constructors 
    Constructor Description
    JdbcChannelMessageStore()
    Convenient constructor for configuration use.
    JdbcChannelMessageStore​(javax.sql.DataSource dataSource)
    Create a MessageStore with all mandatory properties.
  • Method Summary

    Modifier and Type Method Description
    void addAllowedPatterns​(java.lang.String... patterns)
    Add patterns for packages/classes that are allowed to be deserialized.
    org.springframework.integration.store.MessageGroup addMessageToGroup​(java.lang.Object groupId, org.springframework.messaging.Message<?> message)
    Store a message in the database.
    void afterPropertiesSet()
    protected org.springframework.messaging.Message<?> doPollForMessage​(java.lang.String groupIdKey)
    This method executes a call to the DB to get the oldest Message in the MessageGroup which in the context of the JdbcChannelMessageStore means the channel identifier.
    org.springframework.integration.store.MessageGroup getMessageGroup​(java.lang.Object groupId)
    Not fully used.
    int getMessageGroupCount()
    Method not implemented.
    protected org.springframework.integration.store.MessageGroupFactory getMessageGroupFactory()  
    protected java.lang.String getQuery​(org.springframework.integration.jdbc.store.JdbcChannelMessageStore.Query queryName, java.util.function.Supplier<java.lang.String> queryProvider)
    Replace patterns in the input to produce a valid SQL query.
    int getSizeOfIdCache()
    Returns the size of the Message Id Cache, which caches Message Ids for those messages that are currently being processed.
    boolean isPriorityEnabled()  
    int messageGroupSize​(java.lang.Object groupId)
    Returns the number of messages persisted for the specified channel id (groupId) and the specified region (setRegion(String)).
    org.springframework.messaging.Message<?> pollMessageFromGroup​(java.lang.Object groupId)
    Polls the database for a new message that is persisted for the given group id which represents the channel identifier.
    void removeFromIdCache​(java.lang.String messageId)
    Remove a Message Id from the idCache.
    void removeMessageGroup​(java.lang.Object groupId)  
    void setChannelMessageStoreQueryProvider​(ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider)
    Sets the database specific ChannelMessageStoreQueryProvider to use.
    void setDataSource​(javax.sql.DataSource dataSource)
    The JDBC DataSource to use when interacting with the database.
    void setDeserializer​(org.springframework.core.serializer.Deserializer<? extends org.springframework.messaging.Message<?>> deserializer)
    A converter for deserializing byte arrays to messages.
    void setJdbcTemplate​(org.springframework.jdbc.core.JdbcTemplate jdbcTemplate)
    The JdbcOperations to use when interacting with the database.
    void setLobHandler​(org.springframework.jdbc.support.lob.LobHandler lobHandler)
    Override the LobHandler that is used to create and unpack large objects in SQL queries.
    void setMessageGroupFactory​(org.springframework.integration.store.MessageGroupFactory messageGroupFactory)
    Specify the MessageGroupFactory to create MessageGroup object where it is necessary.
    void setMessageRowMapper​(MessageRowMapper messageRowMapper)
    Allows for passing in a custom MessageRowMapper.
    void setPreparedStatementSetter​(ChannelMessageStorePreparedStatementSetter preparedStatementSetter)
    Set a ChannelMessageStorePreparedStatementSetter to insert message into the database.
    void setPriorityEnabled​(boolean priorityEnabled)  
    void setRegion​(java.lang.String region)
    A unique grouping identifier for all messages persisted with this store.
    void setSerializer​(org.springframework.core.serializer.Serializer<? super org.springframework.messaging.Message<?>> serializer)
    A converter for serializing messages to byte arrays for storage.
    void setTablePrefix​(java.lang.String tablePrefix)
    Public setter for the table prefix property.
    void setUsingIdCache​(boolean usingIdCache)
    Consider using this property when polling the database transactionally using multiple parallel threads, meaning when the configured poller is configured using a task executor.

    Methods inherited from class java.lang.Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
  • Field Details

    • DEFAULT_REGION

      public static final java.lang.String DEFAULT_REGION
      Default region property, used to partition the message store. For example, a separate Spring Integration application with overlapping channel names may use the same message store by providing a distinct region name.
      See Also:
      Constant Field Values
    • DEFAULT_TABLE_PREFIX

      public static final java.lang.String DEFAULT_TABLE_PREFIX
      Default value for the table prefix property.
      See Also:
      Constant Field Values
  • Constructor Details

    • JdbcChannelMessageStore

      public JdbcChannelMessageStore()
      Convenient constructor for configuration use.
    • JdbcChannelMessageStore

      public JdbcChannelMessageStore​(javax.sql.DataSource dataSource)
      Create a MessageStore with all mandatory properties. The passed-in DataSource is used to instantiate a JdbcTemplate with JdbcTemplate.setFetchSize(int) set to 1 and with JdbcTemplate.setMaxRows(int) set to 1.
      Parameters:
      dataSource - a DataSource
  • Method Details

    • setDataSource

      public void setDataSource​(javax.sql.DataSource dataSource)
      The JDBC DataSource to use when interacting with the database. The passed-in DataSource is used to instantiate a JdbcTemplate with JdbcTemplate.setFetchSize(int) set to 1 and with JdbcTemplate.setMaxRows(int) set to 1.
      Parameters:
      dataSource - a DataSource
    • setDeserializer

      public void setDeserializer​(org.springframework.core.serializer.Deserializer<? extends org.springframework.messaging.Message<?>> deserializer)
      A converter for deserializing byte arrays to messages.
      Parameters:
      deserializer - the deserializer to set
    • addAllowedPatterns

      public void addAllowedPatterns​(java.lang.String... patterns)
      Add patterns for packages/classes that are allowed to be deserialized. A class can be fully qualified or a wildcard '*' is allowed at the beginning or end of the class name. Examples: com.foo.*, *.MyClass.
      Parameters:
      patterns - the patterns.
      Since:
      5.4
    • setJdbcTemplate

      public void setJdbcTemplate​(org.springframework.jdbc.core.JdbcTemplate jdbcTemplate)
      The JdbcOperations to use when interacting with the database. Either this property can be set or the dataSource. Please consider passing in a JdbcTemplate with a fetchSize property of 1. This is particularly important for Oracle to ensure First In, First Out (FIFO) message retrieval characteristics.
      Parameters:
      jdbcTemplate - a JdbcOperations
    • setLobHandler

      public void setLobHandler​(org.springframework.jdbc.support.lob.LobHandler lobHandler)
      Override the LobHandler that is used to create and unpack large objects in SQL queries. The default is fine for almost all platforms, but some Oracle drivers require a native implementation.
      Parameters:
      lobHandler - a LobHandler
    • setMessageRowMapper

      public void setMessageRowMapper​(MessageRowMapper messageRowMapper)
      Allows for passing in a custom MessageRowMapper. The MessageRowMapper is used to convert the selected database row representing the persisted message into the actual Message object.
      Parameters:
      messageRowMapper - Must not be null
    • setPreparedStatementSetter

      public void setPreparedStatementSetter​(ChannelMessageStorePreparedStatementSetter preparedStatementSetter)
      Set a ChannelMessageStorePreparedStatementSetter to insert message into the database.
      Parameters:
      preparedStatementSetter - ChannelMessageStorePreparedStatementSetter to use. Must not be null
      Since:
      5.0
    • setChannelMessageStoreQueryProvider

      public void setChannelMessageStoreQueryProvider​(ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider)
      Sets the database specific ChannelMessageStoreQueryProvider to use. The JdbcChannelMessageStore provides the SQL queries to retrieve messages from the database. See the JavaDocs ChannelMessageStoreQueryProvider (all known implementing classes) to see those implementations provided by the framework.

      You can provide your own query implementations, if you need to support additional databases and/or need to fine-tune the queries for your requirements.

      Parameters:
      channelMessageStoreQueryProvider - Must not be null.
    • setRegion

      public void setRegion​(java.lang.String region)
      A unique grouping identifier for all messages persisted with this store. Using multiple regions allows the store to be partitioned (if necessary) for different purposes. Defaults to DEFAULT_REGION.
      Parameters:
      region - the region name to set
    • setSerializer

      public void setSerializer​(org.springframework.core.serializer.Serializer<? super org.springframework.messaging.Message<?>> serializer)
      A converter for serializing messages to byte arrays for storage.
      Parameters:
      serializer - The serializer to set
    • setTablePrefix

      public void setTablePrefix​(java.lang.String tablePrefix)
      Public setter for the table prefix property. This will be prefixed to all the table names before queries are executed. Defaults to DEFAULT_TABLE_PREFIX.
      Parameters:
      tablePrefix - the tablePrefix to set
    • setUsingIdCache

      public void setUsingIdCache​(boolean usingIdCache)

      Consider using this property when polling the database transactionally using multiple parallel threads, meaning when the configured poller is configured using a task executor.

      The issue is that the pollMessageFromGroup(Object) looks for the oldest entry for a giving channel (groupKey) and region (setRegion(String)). If you do that with multiple threads and you are using transactions, other threads may be waiting for that same locked row.

      If using the provided OracleChannelMessageStoreQueryProvider, don't set usingIdCache to true, as the Oracle query will ignore locked rows.

      Using the id cache, the JdbcChannelMessageStore will store each message id in an in-memory collection for the duration of processing. With that, any polling threads will explicitly exclude those messages from being polled.

      For this to work, you must setup the corresponding TransactionSynchronizationFactory:

       
       <int:transaction-synchronization-factory id="syncFactory">
           <int:after-commit   expression="@jdbcChannelMessageStore.removeFromIdCache(headers.id.toString())" />
           <int:after-rollback expression="@jdbcChannelMessageStore.removeFromIdCache(headers.id.toString())" />
       </int:transaction-synchronization-factory>
       
       
      This TransactionSynchronizationFactory is then referenced in the transaction configuration of the poller:
       
       <int:poller fixed-delay="300" receive-timeout="500"
           max-messages-per-poll="1" task-executor="pool">
           <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory"
               isolation="READ_COMMITTED" transaction-manager="transactionManager" />
       </int:poller>
       
       
      Parameters:
      usingIdCache - When true the id cache will be used.
    • setPriorityEnabled

      public void setPriorityEnabled​(boolean priorityEnabled)
    • isPriorityEnabled

      public boolean isPriorityEnabled()
      Specified by:
      isPriorityEnabled in interface org.springframework.integration.store.PriorityCapableChannelMessageStore
    • setMessageGroupFactory

      public void setMessageGroupFactory​(org.springframework.integration.store.MessageGroupFactory messageGroupFactory)
      Specify the MessageGroupFactory to create MessageGroup object where it is necessary. Defaults to SimpleMessageGroupFactory.
      Parameters:
      messageGroupFactory - the MessageGroupFactory to use.
      Since:
      4.3
    • getMessageGroupFactory

      protected org.springframework.integration.store.MessageGroupFactory getMessageGroupFactory()
    • afterPropertiesSet

      public void afterPropertiesSet()
      Check mandatory properties (DataSource and setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider)). If no MessageRowMapper and ChannelMessageStorePreparedStatementSetter was explicitly set using setMessageRowMapper(MessageRowMapper) and setPreparedStatementSetter(ChannelMessageStorePreparedStatementSetter) respectively, the default MessageRowMapper and ChannelMessageStorePreparedStatementSetter will be instantiate using the specified deserializer and lobHandler. Also, if the jdbcTemplate's fetchSize property (JdbcTemplate.getFetchSize()) is not 1, a warning will be logged. When using the JdbcChannelMessageStore with Oracle, the fetchSize value of 1 is needed to ensure FIFO characteristics of polled messages. Please see the Oracle ChannelMessageStoreQueryProvider for more details.
      Specified by:
      afterPropertiesSet in interface org.springframework.beans.factory.InitializingBean
    • addMessageToGroup

      public org.springframework.integration.store.MessageGroup addMessageToGroup​(java.lang.Object groupId, org.springframework.messaging.Message<?> message)
      Store a message in the database. The groupId identifies the channel for which the message is to be stored. Keep in mind that the actual groupId (Channel Identifier) is converted to a String-based UUID identifier.
      Specified by:
      addMessageToGroup in interface org.springframework.integration.store.BasicMessageGroupStore
      Parameters:
      groupId - the group id to store the message under
      message - a message
    • getMessageGroup

      public org.springframework.integration.store.MessageGroup getMessageGroup​(java.lang.Object groupId)
      Not fully used. Only wraps the provided group id.
      Specified by:
      getMessageGroup in interface org.springframework.integration.store.BasicMessageGroupStore
    • getMessageGroupCount

      @ManagedAttribute public int getMessageGroupCount()
      Method not implemented.
      Returns:
      The message group count.
      Throws:
      java.lang.UnsupportedOperationException - Method not supported.
    • getQuery

      protected java.lang.String getQuery​(org.springframework.integration.jdbc.store.JdbcChannelMessageStore.Query queryName, java.util.function.Supplier<java.lang.String> queryProvider)
      Replace patterns in the input to produce a valid SQL query. This implementation lazily initializes a simple map-based cache, only replacing the table prefix on the first access to a named query. Further accesses will be resolved from the cache.
      Parameters:
      queryName - The JdbcChannelMessageStore.Query to be transformed.
      queryProvider - a supplier to provide the query template.
      Returns:
      A transformed query with replacements.
    • messageGroupSize

      @ManagedAttribute public int messageGroupSize​(java.lang.Object groupId)
      Returns the number of messages persisted for the specified channel id (groupId) and the specified region (setRegion(String)).
      Specified by:
      messageGroupSize in interface org.springframework.integration.store.BasicMessageGroupStore
      Returns:
      The message group size.
    • removeMessageGroup

      public void removeMessageGroup​(java.lang.Object groupId)
      Specified by:
      removeMessageGroup in interface org.springframework.integration.store.BasicMessageGroupStore
    • pollMessageFromGroup

      public org.springframework.messaging.Message<?> pollMessageFromGroup​(java.lang.Object groupId)
      Polls the database for a new message that is persisted for the given group id which represents the channel identifier.
      Specified by:
      pollMessageFromGroup in interface org.springframework.integration.store.BasicMessageGroupStore
    • doPollForMessage

      protected org.springframework.messaging.Message<?> doPollForMessage​(java.lang.String groupIdKey)
      This method executes a call to the DB to get the oldest Message in the MessageGroup which in the context of the JdbcChannelMessageStore means the channel identifier.
      Parameters:
      groupIdKey - String representation of message group (Channel) ID
      Returns:
      a message; could be null if query produced no Messages
    • removeFromIdCache

      public void removeFromIdCache​(java.lang.String messageId)

      Remove a Message Id from the idCache. Should be used in conjunction with the Spring Integration Transaction Synchronization feature to remove a message from the Message Id cache once a transaction either succeeded or rolled back.

      Only applicable if setUsingIdCache(boolean) is set to true

      .
      Parameters:
      messageId - The message identifier.
    • getSizeOfIdCache

      @ManagedMetric public int getSizeOfIdCache()
      Returns the size of the Message Id Cache, which caches Message Ids for those messages that are currently being processed.
      Returns:
      The size of the Message Id Cache