Class ZeroMqChannel

java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.channel.AbstractMessageChannel
org.springframework.integration.zeromq.channel.ZeroMqChannel
All Implemented Interfaces:
org.springframework.beans.factory.Aware, org.springframework.beans.factory.BeanFactoryAware, org.springframework.beans.factory.BeanNameAware, org.springframework.beans.factory.DisposableBean, org.springframework.beans.factory.InitializingBean, org.springframework.context.ApplicationContextAware, org.springframework.integration.context.ExpressionCapable, org.springframework.integration.IntegrationPattern, org.springframework.integration.support.context.NamedComponent, org.springframework.integration.support.management.IntegrationManagement, org.springframework.integration.support.management.TrackableComponent, org.springframework.messaging.MessageChannel, org.springframework.messaging.SubscribableChannel, org.springframework.messaging.support.InterceptableChannel

public class ZeroMqChannel
extends org.springframework.integration.channel.AbstractMessageChannel
implements org.springframework.messaging.SubscribableChannel
The SubscribableChannel implementation over ZeroMQ sockets. It can work in two messaging models: - push-pull, where sent messages are distributed to subscribers in a round-robin manner according a respective ZeroMQ SocketType.PUSH and SocketType.PULL socket types logic; - pub-sub, where sent messages are distributed to all subscribers;

This message channel can work in local mode, when a pair of ZeroMQ sockets of SocketType.PAIR type are connected between publisher (send operation) and subscriber using inter-thread transport binding.

In distributed mode this channel has to be connected to an externally managed ZeroMQ proxy. The setConnectUrl(String) has to be as a standard ZeroMQ connect string, but with an extra port over the colon - representing a frontend and backend sockets pair on ZeroMQ proxy. For example: tcp://localhost:6001:6002. Another option is to provide a reference to the ZeroMqProxy instance managed in the same application: frontend and backend ports are evaluated from this proxy and the respective connection string is built from them.

This way sending and receiving operations on this channel are similar to interaction over a messaging broker.

An internal logic of this message channel implementation is based on the project Reactor using its Mono, Flux and Scheduler API for better thead model and flow control to avoid concurrency primitives for multi-publisher(subscriber) communication within the same application.

Since:
5.4
  • Nested Class Summary

    Nested classes/interfaces inherited from class org.springframework.integration.channel.AbstractMessageChannel

    org.springframework.integration.channel.AbstractMessageChannel.ChannelInterceptorList

    Nested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagement

    org.springframework.integration.support.management.IntegrationManagement.ManagementOverrides
  • Field Summary

    Fields 
    Modifier and Type Field Description
    static java.time.Duration DEFAULT_CONSUME_DELAY  

    Fields inherited from class org.springframework.integration.channel.AbstractMessageChannel

    interceptors, meters

    Fields inherited from class org.springframework.integration.context.IntegrationObjectSupport

    EXPRESSION_PARSER, logger

    Fields inherited from interface org.springframework.integration.support.management.IntegrationManagement

    METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME

    Fields inherited from interface org.springframework.messaging.MessageChannel

    INDEFINITE_TIMEOUT
  • Constructor Summary

    Constructors 
    Constructor Description
    ZeroMqChannel​(org.zeromq.ZContext context)
    Create a channel instance based on the provided ZContext with push/pull communication model.
    ZeroMqChannel​(org.zeromq.ZContext context, boolean pubSub)
    Create a channel instance based on the provided ZContext and provided communication model.
  • Method Summary

    Modifier and Type Method Description
    void destroy()  
    protected boolean doSend​(org.springframework.messaging.Message<?> message, long timeout)  
    protected void onInit()  
    void setConnectUrl​(java.lang.String connectUrl)
    Configure a connection to the ZeroMQ proxy with the pair of ports over colon for proxy frontend and backend sockets.
    void setConsumeDelay​(java.time.Duration consumeDelay)
    Specify a Duration to delay consumption when no data received.
    void setMessageMapper​(org.springframework.integration.mapping.BytesMessageMapper messageMapper)
    Provide a BytesMessageMapper to convert to/from messages when send or receive happens on the sockets.
    void setSendSocketConfigurer​(java.util.function.Consumer<org.zeromq.ZMQ.Socket> sendSocketConfigurer)
    The Consumer callback to configure a publishing socket.
    void setSubscribeSocketConfigurer​(java.util.function.Consumer<org.zeromq.ZMQ.Socket> subscribeSocketConfigurer)
    The Consumer callback to configure a consuming socket.
    void setZeroMqProxy​(ZeroMqProxy zeroMqProxy)
    Specify a reference to a ZeroMqProxy instance in the same application to rely on its ports configuration and make a natural lifecycle dependency without guessing when the proxy is started.
    boolean subscribe​(org.springframework.messaging.MessageHandler handler)  
    boolean unsubscribe​(org.springframework.messaging.MessageHandler handler)  

    Methods inherited from class org.springframework.integration.channel.AbstractMessageChannel

    addInterceptor, addInterceptor, getComponentType, getFullChannelName, getIChannelInterceptorList, getIntegrationPatternType, getInterceptors, getMetricsCaptor, getOverrides, isLoggingEnabled, registerMetricsCaptor, removeInterceptor, removeInterceptor, send, send, setDatatypes, setInterceptors, setLoggingEnabled, setMessageConverter, setShouldTrack

    Methods inherited from class org.springframework.integration.context.IntegrationObjectSupport

    afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString

    Methods inherited from class java.lang.Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait

    Methods inherited from interface org.springframework.integration.support.management.IntegrationManagement

    getManagedName, getManagedType, getThisAs, setManagedName, setManagedType

    Methods inherited from interface org.springframework.messaging.MessageChannel

    send, send

    Methods inherited from interface org.springframework.integration.support.context.NamedComponent

    getBeanName, getComponentName
  • Field Details

  • Constructor Details

    • ZeroMqChannel

      public ZeroMqChannel​(org.zeromq.ZContext context)
      Create a channel instance based on the provided ZContext with push/pull communication model.
      Parameters:
      context - the ZContext to use.
    • ZeroMqChannel

      public ZeroMqChannel​(org.zeromq.ZContext context, boolean pubSub)
      Create a channel instance based on the provided ZContext and provided communication model.
      Parameters:
      context - the ZContext to use.
      pubSub - the communication model: push/pull or pub/sub.
  • Method Details

    • setConnectUrl

      public void setConnectUrl​(@Nullable java.lang.String connectUrl)
      Configure a connection to the ZeroMQ proxy with the pair of ports over colon for proxy frontend and backend sockets. Mutually exclusive with the setZeroMqProxy(ZeroMqProxy).
      Parameters:
      connectUrl - the connection string in format PROTOCOL://HOST:FRONTEND_PORT:BACKEND_PORT, e.g. tcp://localhost:6001:6002
    • setZeroMqProxy

      public void setZeroMqProxy​(@Nullable ZeroMqProxy zeroMqProxy)
      Specify a reference to a ZeroMqProxy instance in the same application to rely on its ports configuration and make a natural lifecycle dependency without guessing when the proxy is started. Mutually exclusive with the setConnectUrl(String).
      Parameters:
      zeroMqProxy - the ZeroMqProxy instance to use
    • setConsumeDelay

      public void setConsumeDelay​(java.time.Duration consumeDelay)
      Specify a Duration to delay consumption when no data received.
      Parameters:
      consumeDelay - the Duration to delay consumption when empty; defaults to DEFAULT_CONSUME_DELAY.
    • setMessageMapper

      public void setMessageMapper​(org.springframework.integration.mapping.BytesMessageMapper messageMapper)
      Provide a BytesMessageMapper to convert to/from messages when send or receive happens on the sockets.
      Parameters:
      messageMapper - the BytesMessageMapper to use; defaults to EmbeddedJsonHeadersMessageMapper.
    • setSendSocketConfigurer

      public void setSendSocketConfigurer​(java.util.function.Consumer<org.zeromq.ZMQ.Socket> sendSocketConfigurer)
      The Consumer callback to configure a publishing socket.
      Parameters:
      sendSocketConfigurer - the Consumer to use.
    • setSubscribeSocketConfigurer

      public void setSubscribeSocketConfigurer​(java.util.function.Consumer<org.zeromq.ZMQ.Socket> subscribeSocketConfigurer)
      The Consumer callback to configure a consuming socket.
      Parameters:
      subscribeSocketConfigurer - the Consumer to use.
    • onInit

      protected void onInit()
      Overrides:
      onInit in class org.springframework.integration.channel.AbstractMessageChannel
    • doSend

      protected boolean doSend​(org.springframework.messaging.Message<?> message, long timeout)
      Specified by:
      doSend in class org.springframework.integration.channel.AbstractMessageChannel
    • subscribe

      public boolean subscribe​(org.springframework.messaging.MessageHandler handler)
      Specified by:
      subscribe in interface org.springframework.messaging.SubscribableChannel
    • unsubscribe

      public boolean unsubscribe​(org.springframework.messaging.MessageHandler handler)
      Specified by:
      unsubscribe in interface org.springframework.messaging.SubscribableChannel
    • destroy

      public void destroy()
      Specified by:
      destroy in interface org.springframework.beans.factory.DisposableBean
      Specified by:
      destroy in interface org.springframework.integration.support.management.IntegrationManagement
      Overrides:
      destroy in class org.springframework.integration.channel.AbstractMessageChannel