Class ZeroMqChannel
- All Implemented Interfaces:
org.springframework.beans.factory.Aware,org.springframework.beans.factory.BeanFactoryAware,org.springframework.beans.factory.BeanNameAware,org.springframework.beans.factory.DisposableBean,org.springframework.beans.factory.InitializingBean,org.springframework.context.ApplicationContextAware,org.springframework.integration.context.ExpressionCapable,org.springframework.integration.IntegrationPattern,org.springframework.integration.support.context.NamedComponent,org.springframework.integration.support.management.IntegrationManagement,org.springframework.integration.support.management.TrackableComponent,org.springframework.messaging.MessageChannel,org.springframework.messaging.SubscribableChannel,org.springframework.messaging.support.InterceptableChannel
public class ZeroMqChannel
extends org.springframework.integration.channel.AbstractMessageChannel
implements org.springframework.messaging.SubscribableChannel
SubscribableChannel implementation over ZeroMQ sockets.
It can work in two messaging models:
- push-pull, where sent messages are distributed to subscribers in a round-robin manner
according a respective ZeroMQ SocketType.PUSH and SocketType.PULL socket types logic;
- pub-sub, where sent messages are distributed to all subscribers;
This message channel can work in local mode, when a pair of ZeroMQ sockets of SocketType.PAIR type
are connected between publisher (send operation) and subscriber using inter-thread transport binding.
In distributed mode this channel has to be connected to an externally managed ZeroMQ proxy.
The setConnectUrl(String) has to be as a standard ZeroMQ connect string, but with an extra port
over the colon - representing a frontend and backend sockets pair on ZeroMQ proxy.
For example: tcp://localhost:6001:6002.
Another option is to provide a reference to the ZeroMqProxy instance managed in the same application:
frontend and backend ports are evaluated from this proxy and the respective connection string is built from them.
This way sending and receiving operations on this channel are similar to interaction over a messaging broker.
An internal logic of this message channel implementation is based on the project Reactor using its
Mono, Flux and Scheduler API for better thead model and flow control to avoid
concurrency primitives for multi-publisher(subscriber) communication within the same application.
- Since:
- 5.4
-
Nested Class Summary
-
Field Summary
Fields Modifier and Type Field Description static java.time.DurationDEFAULT_CONSUME_DELAYFields inherited from class org.springframework.integration.channel.AbstractMessageChannel
interceptors, metersFields inherited from class org.springframework.integration.context.IntegrationObjectSupport
EXPRESSION_PARSER, logger -
Constructor Summary
Constructors Constructor Description ZeroMqChannel(org.zeromq.ZContext context)Create a channel instance based on the providedZContextwith push/pull communication model.ZeroMqChannel(org.zeromq.ZContext context, boolean pubSub)Create a channel instance based on the providedZContextand provided communication model. -
Method Summary
Modifier and Type Method Description voiddestroy()protected booleandoSend(org.springframework.messaging.Message<?> message, long timeout)protected voidonInit()voidsetConnectUrl(java.lang.String connectUrl)Configure a connection to the ZeroMQ proxy with the pair of ports over colon for proxy frontend and backend sockets.voidsetConsumeDelay(java.time.Duration consumeDelay)Specify aDurationto delay consumption when no data received.voidsetMessageMapper(org.springframework.integration.mapping.BytesMessageMapper messageMapper)Provide aBytesMessageMapperto convert to/from messages when send or receive happens on the sockets.voidsetSendSocketConfigurer(java.util.function.Consumer<org.zeromq.ZMQ.Socket> sendSocketConfigurer)TheConsumercallback to configure a publishing socket.voidsetSubscribeSocketConfigurer(java.util.function.Consumer<org.zeromq.ZMQ.Socket> subscribeSocketConfigurer)TheConsumercallback to configure a consuming socket.voidsetZeroMqProxy(ZeroMqProxy zeroMqProxy)Specify a reference to aZeroMqProxyinstance in the same application to rely on its ports configuration and make a natural lifecycle dependency without guessing when the proxy is started.booleansubscribe(org.springframework.messaging.MessageHandler handler)booleanunsubscribe(org.springframework.messaging.MessageHandler handler)Methods inherited from class org.springframework.integration.channel.AbstractMessageChannel
addInterceptor, addInterceptor, getComponentType, getFullChannelName, getIChannelInterceptorList, getIntegrationPatternType, getInterceptors, getMetricsCaptor, getOverrides, isLoggingEnabled, registerMetricsCaptor, removeInterceptor, removeInterceptor, send, send, setDatatypes, setInterceptors, setLoggingEnabled, setMessageConverter, setShouldTrackMethods inherited from class org.springframework.integration.context.IntegrationObjectSupport
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
-
Field Details
-
DEFAULT_CONSUME_DELAY
public static final java.time.Duration DEFAULT_CONSUME_DELAY
-
-
Constructor Details
-
ZeroMqChannel
public ZeroMqChannel(org.zeromq.ZContext context)Create a channel instance based on the providedZContextwith push/pull communication model.- Parameters:
context- theZContextto use.
-
ZeroMqChannel
public ZeroMqChannel(org.zeromq.ZContext context, boolean pubSub)Create a channel instance based on the providedZContextand provided communication model.- Parameters:
context- theZContextto use.pubSub- the communication model: push/pull or pub/sub.
-
-
Method Details
-
setConnectUrl
public void setConnectUrl(@Nullable java.lang.String connectUrl)Configure a connection to the ZeroMQ proxy with the pair of ports over colon for proxy frontend and backend sockets. Mutually exclusive with thesetZeroMqProxy(ZeroMqProxy).- Parameters:
connectUrl- the connection string in formatPROTOCOL://HOST:FRONTEND_PORT:BACKEND_PORT, e.g.tcp://localhost:6001:6002
-
setZeroMqProxy
Specify a reference to aZeroMqProxyinstance in the same application to rely on its ports configuration and make a natural lifecycle dependency without guessing when the proxy is started. Mutually exclusive with thesetConnectUrl(String).- Parameters:
zeroMqProxy- theZeroMqProxyinstance to use
-
setConsumeDelay
public void setConsumeDelay(java.time.Duration consumeDelay)Specify aDurationto delay consumption when no data received.- Parameters:
consumeDelay- theDurationto delay consumption when empty; defaults toDEFAULT_CONSUME_DELAY.
-
setMessageMapper
public void setMessageMapper(org.springframework.integration.mapping.BytesMessageMapper messageMapper)Provide aBytesMessageMapperto convert to/from messages when send or receive happens on the sockets.- Parameters:
messageMapper- theBytesMessageMapperto use; defaults toEmbeddedJsonHeadersMessageMapper.
-
setSendSocketConfigurer
public void setSendSocketConfigurer(java.util.function.Consumer<org.zeromq.ZMQ.Socket> sendSocketConfigurer)TheConsumercallback to configure a publishing socket.- Parameters:
sendSocketConfigurer- theConsumerto use.
-
setSubscribeSocketConfigurer
public void setSubscribeSocketConfigurer(java.util.function.Consumer<org.zeromq.ZMQ.Socket> subscribeSocketConfigurer)TheConsumercallback to configure a consuming socket.- Parameters:
subscribeSocketConfigurer- theConsumerto use.
-
onInit
protected void onInit()- Overrides:
onInitin classorg.springframework.integration.channel.AbstractMessageChannel
-
doSend
protected boolean doSend(org.springframework.messaging.Message<?> message, long timeout)- Specified by:
doSendin classorg.springframework.integration.channel.AbstractMessageChannel
-
subscribe
public boolean subscribe(org.springframework.messaging.MessageHandler handler)- Specified by:
subscribein interfaceorg.springframework.messaging.SubscribableChannel
-
unsubscribe
public boolean unsubscribe(org.springframework.messaging.MessageHandler handler)- Specified by:
unsubscribein interfaceorg.springframework.messaging.SubscribableChannel
-
destroy
public void destroy()- Specified by:
destroyin interfaceorg.springframework.beans.factory.DisposableBean- Specified by:
destroyin interfaceorg.springframework.integration.support.management.IntegrationManagement- Overrides:
destroyin classorg.springframework.integration.channel.AbstractMessageChannel
-