public class ZeroMqChannel extends AbstractMessageChannel implements SubscribableChannel
SubscribableChannel implementation over ZeroMQ sockets.
It can work in two messaging models:
- push-pull, where sent messages are distributed to subscribers in a round-robin manner
according a respective ZeroMQ SocketType.PUSH and SocketType.PULL socket types logic;
- pub-sub, where sent messages are distributed to all subscribers;
This message channel can work in local mode, when a pair of ZeroMQ sockets of SocketType.PAIR type
are connected between publisher (send operation) and subscriber using inter-thread transport binding.
In distributed mode this channel has to be connected to an externally managed ZeroMQ proxy.
The setConnectUrl(String) has to be as a standard ZeroMQ connect string, but with an extra port
over the colon - representing a frontend and backend sockets pair on ZeroMQ proxy.
For example: tcp://localhost:6001:6002.
Another option is to provide a reference to the ZeroMqProxy instance managed in the same application:
frontend and backend ports are evaluated from this proxy and the respective connection string is built from them.
This way sending and receiving operations on this channel are similar to interaction over a messaging broker.
An internal logic of this message channel implementation is based on the project Reactor using its
Mono, Flux and Scheduler API for better thead model and flow control to avoid
concurrency primitives for multi-publisher(subscriber) communication within the same application.
AbstractMessageChannel.ChannelInterceptorListIntegrationManagement.ManagementOverrides| Modifier and Type | Field and Description |
|---|---|
static java.time.Duration |
DEFAULT_CONSUME_DELAY |
interceptors, metersEXPRESSION_PARSER, loggerINDEFINITE_TIMEOUTMETER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME| Constructor and Description |
|---|
ZeroMqChannel(org.zeromq.ZContext context) |
ZeroMqChannel(org.zeromq.ZContext context,
boolean pubSub) |
| Modifier and Type | Method and Description |
|---|---|
void |
destroy() |
protected boolean |
doSend(Message<?> message,
long timeout)
Subclasses must implement this method.
|
protected void |
onInit()
Subclasses may implement this for initialization logic.
|
void |
setConnectUrl(String connectUrl)
Configure a connection to the ZeroMQ proxy with the pair of ports over colon
for proxy frontend and backend sockets.
|
void |
setConsumeDelay(java.time.Duration consumeDelay) |
void |
setMessageMapper(BytesMessageMapper messageMapper) |
void |
setSendSocketConfigurer(java.util.function.Consumer<org.zeromq.ZMQ.Socket> sendSocketConfigurer) |
void |
setSubscribeSocketConfigurer(java.util.function.Consumer<org.zeromq.ZMQ.Socket> subscribeSocketConfigurer) |
void |
setZeroMqProxy(ZeroMqProxy zeroMqProxy)
Specify a reference to a
ZeroMqProxy instance in the same application
to rely on its ports configuration and make a natural lifecycle dependency without guessing
when the proxy is started. |
boolean |
subscribe(MessageHandler handler) |
boolean |
unsubscribe(MessageHandler handler) |
addInterceptor, addInterceptor, getComponentType, getFullChannelName, getIChannelInterceptorList, getIntegrationPatternType, getInterceptors, getMetricsCaptor, getOverrides, isLoggingEnabled, registerMetricsCaptor, removeInterceptor, removeInterceptor, send, send, setDatatypes, setInterceptors, setLoggingEnabled, setMessageConverter, setShouldTrackafterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringclone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitsend, sendgetManagedName, getManagedType, getThisAs, setManagedName, setManagedTypegetBeanName, getComponentNamepublic ZeroMqChannel(org.zeromq.ZContext context)
public ZeroMqChannel(org.zeromq.ZContext context,
boolean pubSub)
public void setConnectUrl(@Nullable String connectUrl)
setZeroMqProxy(ZeroMqProxy).connectUrl - the connection string in format PROTOCOL://HOST:FRONTEND_PORT:BACKEND_PORT,
e.g. tcp://localhost:6001:6002public void setZeroMqProxy(@Nullable ZeroMqProxy zeroMqProxy)
ZeroMqProxy instance in the same application
to rely on its ports configuration and make a natural lifecycle dependency without guessing
when the proxy is started. Mutually exclusive with the setConnectUrl(String).zeroMqProxy - the ZeroMqProxy instance to usepublic void setConsumeDelay(java.time.Duration consumeDelay)
public void setMessageMapper(BytesMessageMapper messageMapper)
public void setSendSocketConfigurer(java.util.function.Consumer<org.zeromq.ZMQ.Socket> sendSocketConfigurer)
public void setSubscribeSocketConfigurer(java.util.function.Consumer<org.zeromq.ZMQ.Socket> subscribeSocketConfigurer)
protected void onInit()
IntegrationObjectSupportonInit in class AbstractMessageChannelprotected boolean doSend(Message<?> message, long timeout)
AbstractMessageChanneldoSend in class AbstractMessageChannelmessage - The message.timeout - The timeout.public boolean subscribe(MessageHandler handler)
subscribe in interface SubscribableChannelpublic boolean unsubscribe(MessageHandler handler)
unsubscribe in interface SubscribableChannelpublic void destroy()
destroy in interface DisposableBeandestroy in interface IntegrationManagementdestroy in class AbstractMessageChannel