# ZeroMQ 支持

# zeromq 支持

Spring 集成提供组件以支持ZeroMQ (opens new window)在应用程序中的通信。该实现基于JeroMQ (opens new window)库中得到良好支持的 Java API。所有组件都封装了 ZeroMQ 套接字生命周期,并在内部为它们管理线程,从而与这些组件进行无锁和线程安全的交互。

你需要在项目中包含此依赖项:

Maven

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-zeromq</artifactId>
    <version>5.5.9</version>
</dependency>

Gradle

compile "org.springframework.integration:spring-integration-zeromq:5.5.9"

# Zeromq 代理

ZeroMqProxy是内置ZMQ.proxy()function (opens new window)的 Spring 友好的包装器。它封装了套接字生命周期和线程管理。该代理的客户端仍然可以使用标准的 Zeromq 套接字连接和交互 API。除了标准ZContext之外,它还需要一种著名的 ZeroMQ 代理模式:sub/pub、pull/push 或 router/dealer。这样,代理的前端和后端将使用一对适当的 ZeroMQ 套接字类型。详见ZeroMqProxy.Type

ZeroMqProxy实现SmartLifecycle来创建、绑定和配置套接字,并在专用线程中从Executor(如果有的话)启动ZMQ.proxy()。前端和后端套接字的绑定是通过tcp://协议在所有可用的网络接口和提供的端口上完成的。否则,它们被绑定到随机端口,这可以在以后通过相应的getFrontendPort()getBackendPort()API 方法获得。

控制套接字作为SocketType.PAIR公开,并在"inproc://" + beanName + ".control"地址上具有线程间传输;可以通过getControlAddress()获得。它应该与来自另一个SocketType.PAIR套接字的相同应用程序一起使用,以发送ZMQ.PROXY_TERMINATEZMQ.PROXY_PAUSE和/或ZMQ.PROXY_RESUME命令。当ZeroMqProxy的生命周期调用stop()时,ZMQ.PROXY_TERMINATE执行一个ZMQ.PROXY_TERMINATE命令,以终止ZMQ.proxy()循环并优雅地关闭所有绑定的套接字。

setExposeCaptureSocket(boolean)选项将使该组件与SocketType.PUB绑定一个额外的线程间套接字,以捕获并发布前端和后端套接字之间的所有通信,因为它使用ZMQ.proxy()实现。这个套接字绑定到"inproc://" + beanName + ".capture"地址,并且不需要任何特定的订阅来进行过滤。

前端和后端套接字可以使用其他属性进行定制,例如读/写超时或安全性。这种定制可分别通过setFrontendSocketConfigurer(Consumer<ZMQ.Socket>)setBackendSocketConfigurer(Consumer<ZMQ.Socket>)回调进行。

ZeroMqProxy可以像这样简单地提供:

@Bean
ZeroMqProxy zeroMqProxy() {
    ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
    proxy.setExposeCaptureSocket(true);
    proxy.setFrontendPort(6001);
    proxy.setBackendPort(6002);
    return proxy;
}

所有的客户端节点都应该通过tcp://连接到这个代理的主机,并使用各自感兴趣的端口。

# zeromq 消息通道

ZeroMqChannel是一个SubscribableChannel,它使用一对 Zeromq 套接字来连接发布者和订阅者以进行消息交互。它可以在 pub/sub 模式下工作(默认为 push/pull);它也可以用作本地线程间通道(使用PAIR套接字)-在这种情况下不提供connectUrl。在分布式模式下,它必须连接到外部管理的 ZeroMQ 代理,在该代理中,它可以与连接到同一代理的其他类似通道交换消息。Connect URL 选项是一个标准的 ZeroMQ 连接字符串,其中包含协议和主机,以及一对用于 ZeroMQ 代理的前端和后端套接字的冒号端口。为了方便起见,如果通道在与代理相同的应用程序中配置,则可以提供ZeroMqProxy实例,而不是连接字符串。

发送和接收套接字都在自己的专用线程中进行管理,这使得该通道易于并发。通过这种方式,我们可以在不进行同步的情况下,从ZeroMqChannel从不同的线程发布和使用ZeroMqChannel

默认情况下,ZeroMqChannel使用EmbeddedJsonHeadersMessageMapper使用 JacksonJSON 处理器将Message(包括头)从/到byte[]序列化。这个逻辑可以通过setMessageMapper(BytesMessageMapper)进行配置。

发送和接收套接字可以通过相应的setSendSocketConfigurer(Consumer<ZMQ.Socket>)setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>)回调为任何选项(读/写超时,安全性等)定制。

ZeroMqChannel的内部逻辑是基于经由工程反应器的反应流FluxMono的操作符。这提供了更容易的线程控制,并允许无锁的并发发布和从通道到/从通道消费。本地发布/订阅逻辑被实现为一个Flux.publish()操作符,以允许该信道的所有本地订阅者接收相同的发布消息,作为PUB套接字的分布式订阅者。

以下是ZeroMqChannel配置的一个简单示例:

@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
    ZeroMqChannel channel = new ZeroMqChannel(context, true);
    channel.setConnectUrl("tcp://localhost:6001:6002");
    channel.setConsumeDelay(Duration.ofMillis(100));
    return channel;
}

# zeromq 入站通道适配器

ZeroMqMessageProducer是一个具有反应语义的MessageProducerSupport实现。它以非阻塞的方式不断地从 ZeroMQ 套接字读取数据,并将消息发布到由Flux订阅的无限FluxMessageChannel,或者在start()方法中显式地订阅的Flux,如果输出通道不是反应性的。当套接字上没有接收到数据时,将在下一次读取尝试之前应用consumeDelay(默认值为 1 秒)。

只有SocketType.PAIRSocketType.PULLSocketType.SUB是由ZeroMqMessageProducer支持的。该组件可以连接到远程套接字,也可以通过提供的端口或随机端口绑定到 TCP 协议。在启动此组件并绑定 ZeroMQ 套接字之后,可以通过getBoundPort()获得实际的端口。可以通过setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)回调配置套接字选项(例如,安全性或写超时)。

如果receiveRaw选项被设置为true,则从套接字中消费的ZMsg将在产生的Message的有效负载中按原样发送:由下游流来解析和转换ZMsg。否则,将使用InboundMessageMapper将所消耗的数据转换为Message。如果接收到的ZMsg是多帧的,则将第一个帧作为ZeroMqHeaders.TOPIC头来处理此 zeromq 消息已发布到。

对于SocketType.SUBZeroMqMessageProducer使用所提供的topics选项进行订阅;默认情况下订阅所有内容。订阅可以在运行时使用subscribeToTopics()unsubscribeFromTopics()``@ManagedOperations 进行调整。

下面是ZeroMqMessageProducer配置的示例:

@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
    ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
    messageProducer.setOutputChannel(outputChannel);
    messageProducer.setTopics("some");
    messageProducer.setReceiveRaw(true);
    messageProducer.setBindPort(7070);
    messageProducer.setConsumeDelay(Duration.ofMillis(100));
    return messageProducer;
}

# zeromq 出站通道适配器

ZeroMqMessageHandler是一个ReactiveMessageHandler实现,用于将发布消息生成到 Zeromq 套接字中。只支持SocketType.PAIRSocketType.PUSHSocketType.PUBZeroMqMessageHandler只支持连接 zeromq 套接字;不支持绑定。当使用SocketType.PUB时,将根据请求消息对topicExpression进行求值,以便在不为空的情况下将主题框架注入到 ZeroMQ 消息中。订阅方方(SocketType.SUB)在解析实际数据之前必须首先接收主题帧。当请求消息的有效负载是ZMsg时,不执行转换或主题提取:ZMsg按原样发送到套接字中,并且不会为可能的进一步重用而销毁它。否则,将使用OutboundMessageMapper<byte[]>将请求消息(或仅其有效负载)转换为要发布的 zeromq 帧。默认情况下,ConvertingBytesMessageMapper是与ConfigurableCompositeMessageConverter一起使用的。可以通过setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)回调配置套接字选项(例如,安全性或写超时)。

下面是ZeroMqMessageHandler配置的示例:

@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
    ZeroMqMessageHandler messageHandler =
                  new ZeroMqMessageHandler(context, "tcp://localhost:6060", SocketType.PUB);
    messageHandler.setTopicExpression(
                  new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
    messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}

# Zeromq Java DSL 支持

spring-integration-zeromq通过ZeroMq工厂和IntegrationComponentSpec实现,为上面提到的组件提供了一个方便的 Java DSL Fluent API。

这是ZeroMqChannel的 Java DSL 示例:

.channel(ZeroMq.zeroMqChannel(this.context)
            .connectUrl("tcp://localhost:6001:6002")
            .consumeDelay(Duration.ofMillis(100)))
}

Zeromq Java DSL 的入站通道适配器为:

IntegrationFlows.from(
            ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
                        .connectUrl("tcp://localhost:9000")
                        .topics("someTopic")
                        .receiveRaw(true)
                        .consumeDelay(Duration.ofMillis(100)))
}

Zeromq Java DSL 的出站通道适配器为:

.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://localhost:9001", SocketType.PUB)
                  .topicFunction(message -> message.getHeaders().get("myTopic")))
}