Class MessageGroupQueue
java.lang.Object
java.util.AbstractCollection<E>
java.util.AbstractQueue<org.springframework.messaging.Message<?>>
org.springframework.integration.store.MessageGroupQueue
- All Implemented Interfaces:
java.lang.Iterable<org.springframework.messaging.Message<?>>,java.util.Collection<org.springframework.messaging.Message<?>>,java.util.concurrent.BlockingQueue<org.springframework.messaging.Message<?>>,java.util.Queue<org.springframework.messaging.Message<?>>
public class MessageGroupQueue
extends java.util.AbstractQueue<org.springframework.messaging.Message<?>>
implements java.util.concurrent.BlockingQueue<org.springframework.messaging.Message<?>>
A
BlockingQueue that is backed by a MessageGroupStore. Can be used to ensure guaranteed delivery in
the face of transaction rollback (assuming the store is transactional) and also to ensure messages are not lost if
the process dies (assuming the store is durable). To use the queue across process re-starts, the same group id
must be provided, so it needs to be unique but identifiable with a single logical instance of the queue.- Since:
- 2.0
-
Constructor Summary
Constructors Constructor Description MessageGroupQueue(BasicMessageGroupStore messageGroupStore, java.lang.Object groupId)MessageGroupQueue(BasicMessageGroupStore messageGroupStore, java.lang.Object groupId, int capacity)MessageGroupQueue(BasicMessageGroupStore messageGroupStore, java.lang.Object groupId, int capacity, java.util.concurrent.locks.Lock storeLock)MessageGroupQueue(BasicMessageGroupStore messageGroupStore, java.lang.Object groupId, java.util.concurrent.locks.Lock storeLock) -
Method Summary
Modifier and Type Method Description protected booleandoOffer(org.springframework.messaging.Message<?> message)It is assumed that the 'storeLock' is being held by the caller, otherwise IllegalMonitorStateException may be thrownprotected org.springframework.messaging.Message<?>doPoll()It is assumed that the 'storeLock' is being held by the caller, otherwise IllegalMonitorStateException may be thrownintdrainTo(java.util.Collection<? super org.springframework.messaging.Message<?>> c)intdrainTo(java.util.Collection<? super org.springframework.messaging.Message<?>> collection, int maxElements)protected BasicMessageGroupStoregetMessageGroupStore()Get the store.protected java.util.Collection<org.springframework.messaging.Message<?>>getMessages()protected java.util.concurrent.locks.ConditiongetMessageStoreNotEmpty()Get the not empty condition.protected java.util.concurrent.locks.ConditiongetMessageStoreNotFull()Get the not full condition.protected java.util.concurrent.locks.LockgetStoreLock()Get the store lock.java.util.Iterator<org.springframework.messaging.Message<?>>iterator()booleanoffer(org.springframework.messaging.Message<?> message)booleanoffer(org.springframework.messaging.Message<?> message, long timeout, java.util.concurrent.TimeUnit unit)org.springframework.messaging.Message<?>peek()org.springframework.messaging.Message<?>poll()org.springframework.messaging.Message<?>poll(long timeout, java.util.concurrent.TimeUnit unit)voidput(org.springframework.messaging.Message<?> message)intremainingCapacity()voidsetPriority(boolean priority)If true, ensures that the message store supports priority.intsize()java.util.stream.Stream<org.springframework.messaging.Message<?>>stream()org.springframework.messaging.Message<?>take()Methods inherited from class java.util.AbstractCollection
contains, containsAll, isEmpty, remove, removeAll, retainAll, toArray, toArray, toStringMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
-
Constructor Details
-
MessageGroupQueue
-
MessageGroupQueue
public MessageGroupQueue(BasicMessageGroupStore messageGroupStore, java.lang.Object groupId, int capacity) -
MessageGroupQueue
public MessageGroupQueue(BasicMessageGroupStore messageGroupStore, java.lang.Object groupId, java.util.concurrent.locks.Lock storeLock) -
MessageGroupQueue
public MessageGroupQueue(BasicMessageGroupStore messageGroupStore, java.lang.Object groupId, int capacity, java.util.concurrent.locks.Lock storeLock)
-
-
Method Details
-
setPriority
public void setPriority(boolean priority)If true, ensures that the message store supports priority. If false WARNs if the message store uses priority to determine the message order when receiving.- Parameters:
priority- true if priority is expected to be used.
-
iterator
public java.util.Iterator<org.springframework.messaging.Message<?>> iterator()- Specified by:
iteratorin interfacejava.util.Collection<org.springframework.messaging.Message<?>>- Specified by:
iteratorin interfacejava.lang.Iterable<org.springframework.messaging.Message<?>>- Specified by:
iteratorin classjava.util.AbstractCollection<org.springframework.messaging.Message<?>>
-
getMessageGroupStore
Get the store.- Returns:
- the store.
- Since:
- 5.0.11
-
getStoreLock
protected java.util.concurrent.locks.Lock getStoreLock()Get the store lock.- Returns:
- the lock.
- Since:
- 5.0.11
-
getMessageStoreNotFull
protected java.util.concurrent.locks.Condition getMessageStoreNotFull()Get the not full condition.- Returns:
- the condition.
- Since:
- 5.0.11
-
getMessageStoreNotEmpty
protected java.util.concurrent.locks.Condition getMessageStoreNotEmpty()Get the not empty condition.- Returns:
- the condition.
- Since:
- 5.0.11
-
size
public int size()- Specified by:
sizein interfacejava.util.Collection<org.springframework.messaging.Message<?>>- Specified by:
sizein classjava.util.AbstractCollection<org.springframework.messaging.Message<?>>
-
peek
public org.springframework.messaging.Message<?> peek()- Specified by:
peekin interfacejava.util.Queue<org.springframework.messaging.Message<?>>
-
poll
public org.springframework.messaging.Message<?> poll(long timeout, java.util.concurrent.TimeUnit unit) throws java.lang.InterruptedException- Specified by:
pollin interfacejava.util.concurrent.BlockingQueue<org.springframework.messaging.Message<?>>- Throws:
java.lang.InterruptedException
-
poll
public org.springframework.messaging.Message<?> poll()- Specified by:
pollin interfacejava.util.Queue<org.springframework.messaging.Message<?>>
-
drainTo
public int drainTo(java.util.Collection<? super org.springframework.messaging.Message<?>> c)- Specified by:
drainToin interfacejava.util.concurrent.BlockingQueue<org.springframework.messaging.Message<?>>
-
drainTo
public int drainTo(java.util.Collection<? super org.springframework.messaging.Message<?>> collection, int maxElements)- Specified by:
drainToin interfacejava.util.concurrent.BlockingQueue<org.springframework.messaging.Message<?>>
-
offer
public boolean offer(org.springframework.messaging.Message<?> message)- Specified by:
offerin interfacejava.util.concurrent.BlockingQueue<org.springframework.messaging.Message<?>>- Specified by:
offerin interfacejava.util.Queue<org.springframework.messaging.Message<?>>
-
offer
public boolean offer(org.springframework.messaging.Message<?> message, long timeout, java.util.concurrent.TimeUnit unit) throws java.lang.InterruptedException- Specified by:
offerin interfacejava.util.concurrent.BlockingQueue<org.springframework.messaging.Message<?>>- Throws:
java.lang.InterruptedException
-
put
public void put(org.springframework.messaging.Message<?> message) throws java.lang.InterruptedException- Specified by:
putin interfacejava.util.concurrent.BlockingQueue<org.springframework.messaging.Message<?>>- Throws:
java.lang.InterruptedException
-
remainingCapacity
public int remainingCapacity()- Specified by:
remainingCapacityin interfacejava.util.concurrent.BlockingQueue<org.springframework.messaging.Message<?>>
-
take
public org.springframework.messaging.Message<?> take() throws java.lang.InterruptedException- Specified by:
takein interfacejava.util.concurrent.BlockingQueue<org.springframework.messaging.Message<?>>- Throws:
java.lang.InterruptedException
-
getMessages
protected java.util.Collection<org.springframework.messaging.Message<?>> getMessages() -
stream
public java.util.stream.Stream<org.springframework.messaging.Message<?>> stream()- Specified by:
streamin interfacejava.util.Collection<org.springframework.messaging.Message<?>>
-
doPoll
protected org.springframework.messaging.Message<?> doPoll()It is assumed that the 'storeLock' is being held by the caller, otherwise IllegalMonitorStateException may be thrown- Returns:
- a message // TODO @Nullable
-
doOffer
protected boolean doOffer(org.springframework.messaging.Message<?> message)It is assumed that the 'storeLock' is being held by the caller, otherwise IllegalMonitorStateException may be thrown- Parameters:
message- the message to offer.- Returns:
- true if offered.
-