Class JdbcChannelMessageStore
- All Implemented Interfaces:
org.springframework.beans.factory.InitializingBean,org.springframework.integration.store.BasicMessageGroupStore,org.springframework.integration.store.ChannelMessageStore,org.springframework.integration.store.PriorityCapableChannelMessageStore
@ManagedResource
public class JdbcChannelMessageStore
extends java.lang.Object
implements org.springframework.integration.store.PriorityCapableChannelMessageStore, org.springframework.beans.factory.InitializingBean
Channel-specific implementation of
MessageGroupStore using a relational
database via JDBC.
This message store shall be used for message channels only.
As such, the JdbcChannelMessageStore uses database specific SQL queries.
Contrary to the JdbcMessageStore, this implementation uses a single database table,
optimized to operate like a queue.
The SQL scripts for creating the table are packaged
under org/springframework/integration/jdbc/schema-*.sql,
where * denotes the target database type.
- Since:
- 2.2
-
Field Summary
Fields Modifier and Type Field Description static java.lang.StringDEFAULT_REGIONDefault region property, used to partition the message store.static java.lang.StringDEFAULT_TABLE_PREFIXDefault value for the table prefix property. -
Constructor Summary
Constructors Constructor Description JdbcChannelMessageStore()Convenient constructor for configuration use.JdbcChannelMessageStore(javax.sql.DataSource dataSource)Create aMessageStorewith all mandatory properties. -
Method Summary
Modifier and Type Method Description voidaddAllowedPatterns(java.lang.String... patterns)Add patterns for packages/classes that are allowed to be deserialized.org.springframework.integration.store.MessageGroupaddMessageToGroup(java.lang.Object groupId, org.springframework.messaging.Message<?> message)Store a message in the database.voidafterPropertiesSet()Check mandatory properties (DataSourceandsetChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider)).protected org.springframework.messaging.Message<?>doPollForMessage(java.lang.String groupIdKey)This method executes a call to the DB to get the oldest Message in the MessageGroup which in the context of theJdbcChannelMessageStoremeans the channel identifier.org.springframework.integration.store.MessageGroupgetMessageGroup(java.lang.Object groupId)Not fully used.intgetMessageGroupCount()Method not implemented.protected org.springframework.integration.store.MessageGroupFactorygetMessageGroupFactory()protected java.lang.StringgetQuery(org.springframework.integration.jdbc.store.JdbcChannelMessageStore.Query queryName, java.util.function.Supplier<java.lang.String> queryProvider)Replace patterns in the input to produce a valid SQL query.intgetSizeOfIdCache()Returns the size of the Message Id Cache, which caches Message Ids for those messages that are currently being processed.booleanisPriorityEnabled()intmessageGroupSize(java.lang.Object groupId)Returns the number of messages persisted for the specified channel id (groupId) and the specified region (setRegion(String)).org.springframework.messaging.Message<?>pollMessageFromGroup(java.lang.Object groupId)Polls the database for a new message that is persisted for the given group id which represents the channel identifier.voidremoveFromIdCache(java.lang.String messageId)Remove a Message Id from the idCache.voidremoveMessageGroup(java.lang.Object groupId)voidsetChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider)Sets the database specificChannelMessageStoreQueryProviderto use.voidsetDataSource(javax.sql.DataSource dataSource)The JDBCDataSourceto use when interacting with the database.voidsetDeserializer(org.springframework.core.serializer.Deserializer<? extends org.springframework.messaging.Message<?>> deserializer)A converter for deserializing byte arrays to messages.voidsetJdbcTemplate(org.springframework.jdbc.core.JdbcTemplate jdbcTemplate)TheJdbcOperationsto use when interacting with the database.voidsetLobHandler(org.springframework.jdbc.support.lob.LobHandler lobHandler)Override theLobHandlerthat is used to create and unpack large objects in SQL queries.voidsetMessageGroupFactory(org.springframework.integration.store.MessageGroupFactory messageGroupFactory)Specify theMessageGroupFactoryto createMessageGroupobject where it is necessary.voidsetMessageRowMapper(MessageRowMapper messageRowMapper)Allows for passing in a customMessageRowMapper.voidsetPreparedStatementSetter(ChannelMessageStorePreparedStatementSetter preparedStatementSetter)Set aChannelMessageStorePreparedStatementSetterto insert message into the database.voidsetPriorityEnabled(boolean priorityEnabled)voidsetRegion(java.lang.String region)A unique grouping identifier for all messages persisted with this store.voidsetSerializer(org.springframework.core.serializer.Serializer<? super org.springframework.messaging.Message<?>> serializer)A converter for serializing messages to byte arrays for storage.voidsetTablePrefix(java.lang.String tablePrefix)Public setter for the table prefix property.voidsetUsingIdCache(boolean usingIdCache)Consider using this property when polling the database transactionally using multiple parallel threads, meaning when the configured poller is configured using a task executor.
-
Field Details
-
DEFAULT_REGION
public static final java.lang.String DEFAULT_REGIONDefault region property, used to partition the message store. For example, a separate Spring Integration application with overlapping channel names may use the same message store by providing a distinct region name.- See Also:
- Constant Field Values
-
DEFAULT_TABLE_PREFIX
public static final java.lang.String DEFAULT_TABLE_PREFIXDefault value for the table prefix property.- See Also:
- Constant Field Values
-
-
Constructor Details
-
JdbcChannelMessageStore
public JdbcChannelMessageStore()Convenient constructor for configuration use. -
JdbcChannelMessageStore
public JdbcChannelMessageStore(javax.sql.DataSource dataSource)Create aMessageStorewith all mandatory properties. The passed-inDataSourceis used to instantiate aJdbcTemplatewithJdbcTemplate.setFetchSize(int)set to1and withJdbcTemplate.setMaxRows(int)set to1.- Parameters:
dataSource- aDataSource
-
-
Method Details
-
setDataSource
public void setDataSource(javax.sql.DataSource dataSource)The JDBCDataSourceto use when interacting with the database. The passed-inDataSourceis used to instantiate aJdbcTemplatewithJdbcTemplate.setFetchSize(int)set to1and withJdbcTemplate.setMaxRows(int)set to1.- Parameters:
dataSource- aDataSource
-
setDeserializer
public void setDeserializer(org.springframework.core.serializer.Deserializer<? extends org.springframework.messaging.Message<?>> deserializer)A converter for deserializing byte arrays to messages.- Parameters:
deserializer- the deserializer to set
-
addAllowedPatterns
public void addAllowedPatterns(java.lang.String... patterns)Add patterns for packages/classes that are allowed to be deserialized. A class can be fully qualified or a wildcard '*' is allowed at the beginning or end of the class name. Examples:com.foo.*,*.MyClass.- Parameters:
patterns- the patterns.- Since:
- 5.4
-
setJdbcTemplate
public void setJdbcTemplate(org.springframework.jdbc.core.JdbcTemplate jdbcTemplate)TheJdbcOperationsto use when interacting with the database. Either this property can be set or thedataSource. Please consider passing in aJdbcTemplatewith a fetchSize property of 1. This is particularly important for Oracle to ensure First In, First Out (FIFO) message retrieval characteristics.- Parameters:
jdbcTemplate- aJdbcOperations
-
setLobHandler
public void setLobHandler(org.springframework.jdbc.support.lob.LobHandler lobHandler)Override theLobHandlerthat is used to create and unpack large objects in SQL queries. The default is fine for almost all platforms, but some Oracle drivers require a native implementation.- Parameters:
lobHandler- aLobHandler
-
setMessageRowMapper
Allows for passing in a customMessageRowMapper. TheMessageRowMapperis used to convert the selected database row representing the persisted message into the actualMessageobject.- Parameters:
messageRowMapper- Must not be null
-
setPreparedStatementSetter
public void setPreparedStatementSetter(ChannelMessageStorePreparedStatementSetter preparedStatementSetter)Set aChannelMessageStorePreparedStatementSetterto insert message into the database.- Parameters:
preparedStatementSetter-ChannelMessageStorePreparedStatementSetterto use. Must not be null- Since:
- 5.0
-
setChannelMessageStoreQueryProvider
public void setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider)Sets the database specificChannelMessageStoreQueryProviderto use. TheJdbcChannelMessageStoreprovides the SQL queries to retrieve messages from the database. See the JavaDocsChannelMessageStoreQueryProvider(all known implementing classes) to see those implementations provided by the framework.You can provide your own query implementations, if you need to support additional databases and/or need to fine-tune the queries for your requirements.
- Parameters:
channelMessageStoreQueryProvider- Must not be null.
-
setRegion
public void setRegion(java.lang.String region)A unique grouping identifier for all messages persisted with this store. Using multiple regions allows the store to be partitioned (if necessary) for different purposes. Defaults to.DEFAULT_REGION- Parameters:
region- the region name to set
-
setSerializer
public void setSerializer(org.springframework.core.serializer.Serializer<? super org.springframework.messaging.Message<?>> serializer)A converter for serializing messages to byte arrays for storage.- Parameters:
serializer- The serializer to set
-
setTablePrefix
public void setTablePrefix(java.lang.String tablePrefix)Public setter for the table prefix property. This will be prefixed to all the table names before queries are executed. Defaults toDEFAULT_TABLE_PREFIX.- Parameters:
tablePrefix- the tablePrefix to set
-
setUsingIdCache
public void setUsingIdCache(boolean usingIdCache)Consider using this property when polling the database transactionally using multiple parallel threads, meaning when the configured poller is configured using a task executor.
The issue is that the
pollMessageFromGroup(Object)looks for the oldest entry for a giving channel (groupKey) and region (setRegion(String)). If you do that with multiple threads and you are using transactions, other threads may be waiting for that same locked row.If using the provided
OracleChannelMessageStoreQueryProvider, don't setusingIdCacheto true, as the Oracle query will ignore locked rows.Using the id cache, the
JdbcChannelMessageStorewill store each message id in an in-memory collection for the duration of processing. With that, any polling threads will explicitly exclude those messages from being polled.For this to work, you must setup the corresponding
TransactionSynchronizationFactory:
This<int:transaction-synchronization-factory id="syncFactory"> <int:after-commit expression="@jdbcChannelMessageStore.removeFromIdCache(headers.id.toString())" /> <int:after-rollback expression="@jdbcChannelMessageStore.removeFromIdCache(headers.id.toString())" /> </int:transaction-synchronization-factory>TransactionSynchronizationFactoryis then referenced in the transaction configuration of the poller:<int:poller fixed-delay="300" receive-timeout="500" max-messages-per-poll="1" task-executor="pool"> <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory" isolation="READ_COMMITTED" transaction-manager="transactionManager" /> </int:poller>- Parameters:
usingIdCache- Whentruethe id cache will be used.
-
setPriorityEnabled
public void setPriorityEnabled(boolean priorityEnabled) -
isPriorityEnabled
public boolean isPriorityEnabled()- Specified by:
isPriorityEnabledin interfaceorg.springframework.integration.store.PriorityCapableChannelMessageStore
-
setMessageGroupFactory
public void setMessageGroupFactory(org.springframework.integration.store.MessageGroupFactory messageGroupFactory)Specify theMessageGroupFactoryto createMessageGroupobject where it is necessary. Defaults toSimpleMessageGroupFactory.- Parameters:
messageGroupFactory- theMessageGroupFactoryto use.- Since:
- 4.3
-
getMessageGroupFactory
protected org.springframework.integration.store.MessageGroupFactory getMessageGroupFactory() -
afterPropertiesSet
public void afterPropertiesSet()Check mandatory properties (DataSourceandsetChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider)). If noMessageRowMapperandChannelMessageStorePreparedStatementSetterwas explicitly set usingsetMessageRowMapper(MessageRowMapper)andsetPreparedStatementSetter(ChannelMessageStorePreparedStatementSetter)respectively, the defaultMessageRowMapperandChannelMessageStorePreparedStatementSetterwill be instantiate using the specifieddeserializerandlobHandler. Also, if the jdbcTemplate's fetchSize property (JdbcTemplate.getFetchSize()) is not 1, a warning will be logged. When using theJdbcChannelMessageStorewith Oracle, the fetchSize value of 1 is needed to ensure FIFO characteristics of polled messages. Please see the OracleChannelMessageStoreQueryProviderfor more details.- Specified by:
afterPropertiesSetin interfaceorg.springframework.beans.factory.InitializingBean
-
addMessageToGroup
public org.springframework.integration.store.MessageGroup addMessageToGroup(java.lang.Object groupId, org.springframework.messaging.Message<?> message)Store a message in the database. The groupId identifies the channel for which the message is to be stored. Keep in mind that the actual groupId (Channel Identifier) is converted to a String-based UUID identifier.- Specified by:
addMessageToGroupin interfaceorg.springframework.integration.store.BasicMessageGroupStore- Parameters:
groupId- the group id to store the message undermessage- a message
-
getMessageGroup
public org.springframework.integration.store.MessageGroup getMessageGroup(java.lang.Object groupId)Not fully used. Only wraps the provided group id.- Specified by:
getMessageGroupin interfaceorg.springframework.integration.store.BasicMessageGroupStore
-
getMessageGroupCount
@ManagedAttribute public int getMessageGroupCount()Method not implemented.- Returns:
- The message group count.
- Throws:
java.lang.UnsupportedOperationException- Method not supported.
-
getQuery
protected java.lang.String getQuery(org.springframework.integration.jdbc.store.JdbcChannelMessageStore.Query queryName, java.util.function.Supplier<java.lang.String> queryProvider)Replace patterns in the input to produce a valid SQL query. This implementation lazily initializes a simple map-based cache, only replacing the table prefix on the first access to a named query. Further accesses will be resolved from the cache.- Parameters:
queryName- TheJdbcChannelMessageStore.Queryto be transformed.queryProvider- a supplier to provide the query template.- Returns:
- A transformed query with replacements.
-
messageGroupSize
@ManagedAttribute public int messageGroupSize(java.lang.Object groupId)Returns the number of messages persisted for the specified channel id (groupId) and the specified region (setRegion(String)).- Specified by:
messageGroupSizein interfaceorg.springframework.integration.store.BasicMessageGroupStore- Returns:
- The message group size.
-
removeMessageGroup
public void removeMessageGroup(java.lang.Object groupId)- Specified by:
removeMessageGroupin interfaceorg.springframework.integration.store.BasicMessageGroupStore
-
pollMessageFromGroup
public org.springframework.messaging.Message<?> pollMessageFromGroup(java.lang.Object groupId)Polls the database for a new message that is persisted for the given group id which represents the channel identifier.- Specified by:
pollMessageFromGroupin interfaceorg.springframework.integration.store.BasicMessageGroupStore
-
doPollForMessage
protected org.springframework.messaging.Message<?> doPollForMessage(java.lang.String groupIdKey)This method executes a call to the DB to get the oldest Message in the MessageGroup which in the context of theJdbcChannelMessageStoremeans the channel identifier.- Parameters:
groupIdKey- String representation of message group (Channel) ID- Returns:
- a message; could be null if query produced no Messages
-
removeFromIdCache
public void removeFromIdCache(java.lang.String messageId)Remove a Message Id from the idCache. Should be used in conjunction with the Spring Integration Transaction Synchronization feature to remove a message from the Message Id cache once a transaction either succeeded or rolled back.
Only applicable if
.setUsingIdCache(boolean)is set totrue- Parameters:
messageId- The message identifier.
-
getSizeOfIdCache
@ManagedMetric public int getSizeOfIdCache()Returns the size of the Message Id Cache, which caches Message Ids for those messages that are currently being processed.- Returns:
- The size of the Message Id Cache
-