# ZeroMQ 支持
# zeromq 支持
Spring 集成提供组件以支持ZeroMQ (opens new window)在应用程序中的通信。该实现基于JeroMQ (opens new window)库中得到良好支持的 Java API。所有组件都封装了 ZeroMQ 套接字生命周期,并在内部为它们管理线程,从而与这些组件进行无锁和线程安全的交互。
你需要在项目中包含此依赖项:
Maven
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-zeromq</artifactId>
<version>5.5.9</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-zeromq:5.5.9"
# Zeromq 代理
ZeroMqProxy
是内置ZMQ.proxy()
function (opens new window)的 Spring 友好的包装器。它封装了套接字生命周期和线程管理。该代理的客户端仍然可以使用标准的 Zeromq 套接字连接和交互 API。除了标准ZContext
之外,它还需要一种著名的 ZeroMQ 代理模式:sub/pub、pull/push 或 router/dealer。这样,代理的前端和后端将使用一对适当的 ZeroMQ 套接字类型。详见ZeroMqProxy.Type
。
ZeroMqProxy
实现SmartLifecycle
来创建、绑定和配置套接字,并在专用线程中从Executor
(如果有的话)启动ZMQ.proxy()
。前端和后端套接字的绑定是通过tcp://
协议在所有可用的网络接口和提供的端口上完成的。否则,它们被绑定到随机端口,这可以在以后通过相应的getFrontendPort()
和getBackendPort()
API 方法获得。
控制套接字作为SocketType.PAIR
公开,并在"inproc://" + beanName + ".control"
地址上具有线程间传输;可以通过getControlAddress()
获得。它应该与来自另一个SocketType.PAIR
套接字的相同应用程序一起使用,以发送ZMQ.PROXY_TERMINATE
、ZMQ.PROXY_PAUSE
和/或ZMQ.PROXY_RESUME
命令。当ZeroMqProxy
的生命周期调用stop()
时,ZMQ.PROXY_TERMINATE
执行一个ZMQ.PROXY_TERMINATE
命令,以终止ZMQ.proxy()
循环并优雅地关闭所有绑定的套接字。
setExposeCaptureSocket(boolean)
选项将使该组件与SocketType.PUB
绑定一个额外的线程间套接字,以捕获并发布前端和后端套接字之间的所有通信,因为它使用ZMQ.proxy()
实现。这个套接字绑定到"inproc://" + beanName + ".capture"
地址,并且不需要任何特定的订阅来进行过滤。
前端和后端套接字可以使用其他属性进行定制,例如读/写超时或安全性。这种定制可分别通过setFrontendSocketConfigurer(Consumer<ZMQ.Socket>)
和setBackendSocketConfigurer(Consumer<ZMQ.Socket>)
回调进行。
ZeroMqProxy
可以像这样简单地提供:
@Bean
ZeroMqProxy zeroMqProxy() {
ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
proxy.setExposeCaptureSocket(true);
proxy.setFrontendPort(6001);
proxy.setBackendPort(6002);
return proxy;
}
所有的客户端节点都应该通过tcp://
连接到这个代理的主机,并使用各自感兴趣的端口。
# zeromq 消息通道
ZeroMqChannel
是一个SubscribableChannel
,它使用一对 Zeromq 套接字来连接发布者和订阅者以进行消息交互。它可以在 pub/sub 模式下工作(默认为 push/pull);它也可以用作本地线程间通道(使用PAIR
套接字)-在这种情况下不提供connectUrl
。在分布式模式下,它必须连接到外部管理的 ZeroMQ 代理,在该代理中,它可以与连接到同一代理的其他类似通道交换消息。Connect URL 选项是一个标准的 ZeroMQ 连接字符串,其中包含协议和主机,以及一对用于 ZeroMQ 代理的前端和后端套接字的冒号端口。为了方便起见,如果通道在与代理相同的应用程序中配置,则可以提供ZeroMqProxy
实例,而不是连接字符串。
发送和接收套接字都在自己的专用线程中进行管理,这使得该通道易于并发。通过这种方式,我们可以在不进行同步的情况下,从ZeroMqChannel
从不同的线程发布和使用ZeroMqChannel
。
默认情况下,ZeroMqChannel
使用EmbeddedJsonHeadersMessageMapper
使用 JacksonJSON 处理器将Message
(包括头)从/到byte[]
序列化。这个逻辑可以通过setMessageMapper(BytesMessageMapper)
进行配置。
发送和接收套接字可以通过相应的setSendSocketConfigurer(Consumer<ZMQ.Socket>)
和setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>)
回调为任何选项(读/写超时,安全性等)定制。
该ZeroMqChannel
的内部逻辑是基于经由工程反应器的反应流Flux
和Mono
的操作符。这提供了更容易的线程控制,并允许无锁的并发发布和从通道到/从通道消费。本地发布/订阅逻辑被实现为一个Flux.publish()
操作符,以允许该信道的所有本地订阅者接收相同的发布消息,作为PUB
套接字的分布式订阅者。
以下是ZeroMqChannel
配置的一个简单示例:
@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
ZeroMqChannel channel = new ZeroMqChannel(context, true);
channel.setConnectUrl("tcp://localhost:6001:6002");
channel.setConsumeDelay(Duration.ofMillis(100));
return channel;
}
# zeromq 入站通道适配器
ZeroMqMessageProducer
是一个具有反应语义的MessageProducerSupport
实现。它以非阻塞的方式不断地从 ZeroMQ 套接字读取数据,并将消息发布到由Flux
订阅的无限FluxMessageChannel
,或者在start()
方法中显式地订阅的Flux
,如果输出通道不是反应性的。当套接字上没有接收到数据时,将在下一次读取尝试之前应用consumeDelay
(默认值为 1 秒)。
只有SocketType.PAIR
、SocketType.PULL
和SocketType.SUB
是由ZeroMqMessageProducer
支持的。该组件可以连接到远程套接字,也可以通过提供的端口或随机端口绑定到 TCP 协议。在启动此组件并绑定 ZeroMQ 套接字之后,可以通过getBoundPort()
获得实际的端口。可以通过setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)
回调配置套接字选项(例如,安全性或写超时)。
如果receiveRaw
选项被设置为true
,则从套接字中消费的ZMsg
将在产生的Message
的有效负载中按原样发送:由下游流来解析和转换ZMsg
。否则,将使用InboundMessageMapper
将所消耗的数据转换为Message
。如果接收到的ZMsg
是多帧的,则将第一个帧作为ZeroMqHeaders.TOPIC
头来处理此 zeromq 消息已发布到。
对于SocketType.SUB
,ZeroMqMessageProducer
使用所提供的topics
选项进行订阅;默认情况下订阅所有内容。订阅可以在运行时使用subscribeToTopics()
和unsubscribeFromTopics()``@ManagedOperation
s 进行调整。
下面是ZeroMqMessageProducer
配置的示例:
@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
messageProducer.setOutputChannel(outputChannel);
messageProducer.setTopics("some");
messageProducer.setReceiveRaw(true);
messageProducer.setBindPort(7070);
messageProducer.setConsumeDelay(Duration.ofMillis(100));
return messageProducer;
}
# zeromq 出站通道适配器
ZeroMqMessageHandler
是一个ReactiveMessageHandler
实现,用于将发布消息生成到 Zeromq 套接字中。只支持SocketType.PAIR
、SocketType.PUSH
和SocketType.PUB
。ZeroMqMessageHandler
只支持连接 zeromq 套接字;不支持绑定。当使用SocketType.PUB
时,将根据请求消息对topicExpression
进行求值,以便在不为空的情况下将主题框架注入到 ZeroMQ 消息中。订阅方方(SocketType.SUB
)在解析实际数据之前必须首先接收主题帧。当请求消息的有效负载是ZMsg
时,不执行转换或主题提取:ZMsg
按原样发送到套接字中,并且不会为可能的进一步重用而销毁它。否则,将使用OutboundMessageMapper<byte[]>
将请求消息(或仅其有效负载)转换为要发布的 zeromq 帧。默认情况下,ConvertingBytesMessageMapper
是与ConfigurableCompositeMessageConverter
一起使用的。可以通过setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)
回调配置套接字选项(例如,安全性或写超时)。
下面是ZeroMqMessageHandler
配置的示例:
@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(context, "tcp://localhost:6060", SocketType.PUB);
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}
# Zeromq Java DSL 支持
spring-integration-zeromq
通过ZeroMq
工厂和IntegrationComponentSpec
实现,为上面提到的组件提供了一个方便的 Java DSL Fluent API。
这是ZeroMqChannel
的 Java DSL 示例:
.channel(ZeroMq.zeroMqChannel(this.context)
.connectUrl("tcp://localhost:6001:6002")
.consumeDelay(Duration.ofMillis(100)))
}
Zeromq Java DSL 的入站通道适配器为:
IntegrationFlows.from(
ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
.connectUrl("tcp://localhost:9000")
.topics("someTopic")
.receiveRaw(true)
.consumeDelay(Duration.ofMillis(100)))
}
Zeromq Java DSL 的出站通道适配器为:
.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://localhost:9001", SocketType.PUB)
.topicFunction(message -> message.getHeaders().get("myTopic")))
}
← XMPP 支持 Zookeeper 支持 →