# MQTT 支持

# MQTT 支持

Spring 集成提供了入站和出站信道适配器,以支持消息队列遥测传输协议。

你需要在项目中包含此依赖项:

Maven

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>5.5.9</version>
</dependency>

Gradle

compile "org.springframework.integration:spring-integration-mqtt:5.5.9"

当前的实现使用Eclipse PAHO MQTT 客户端 (opens new window)库。

XML 配置和本章的大部分内容都是关于 MQTTV3.1 协议支持和相应的 PAHO 客户机的。
有关相应的协议支持,请参见MQTT V5 支持段。

这两个适配器的配置都是使用DefaultMqttPahoClientFactory实现的。有关配置选项的更多信息,请参阅泛美卫生组织的文档。

我们建议配置MqttConnectOptions对象并将其注入工厂,而不是在工厂本身上设置(不推荐的)选项。

# 入站(消息驱动)通道适配器

入站通道适配器由MqttPahoMessageDrivenChannelAdapter实现。为了方便起见,你可以使用名称空间来配置它。最小配置可以如下所示:

<bean id="clientFactory"
        class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
    <property name="connectionOptions">
        <bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
            <property name="userName" value="${mqtt.username}"/>
            <property name="password" value="${mqtt.password}"/>
        </bean>
    </property>
</bean>

<int-mqtt:message-driven-channel-adapter id="mqttInbound"
    client-id="${mqtt.default.client.id}.src"
    url="${mqtt.url}"
    topics="sometopic"
    client-factory="clientFactory"
    channel="output"/>

下面的清单显示了可用的属性:

<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
    client-id="foo"  (1)
    url="tcp://localhost:1883"  (2)
    topics="bar,baz"  (3)
    qos="1,2"  (4)
    converter="myConverter"  (5)
    client-factory="clientFactory"  (6)
    send-timeout="123"  (7)
    error-channel="errors"  (8)
    recovery-interval="10000"  (9)
    manual-acks="false" (10)
    channel="out" />
1 客户 ID。
2 代理 URL。
3 以逗号分隔的主题列表,此适配器从中接收消息。
4 用逗号分隔的 QoS 值列表。
它可以是应用于所有主题的单个值,也可以是每个主题的值(在这种情况下,列表的长度必须相同)。
5 MqttMessageConverter(可选)。
默认情况下,默认的DefaultPahoMessageConverter生成带有String有效负载的消息,其标题如下:

mqtt_topic:接收消息的主题
mqtt_duplicate:true如果消息是重复的

mqtt_qos:服务质量
你可以将DefaultPahoMessageConverter配置为返回有效负载中的 RAWbyte[],方法是将其声明为<bean/>,并将payloadAsBytes属性设置为true
6 客户工厂。
7 Send Timeout.
只有在信道可能阻塞时才适用(例如当前已满的有界QueueChannel)。
8 错误通道。
下游异常被发送到此通道,如果提供的话,在ErrorMessage中。
有效负载是一个MessagingException,其中包含失败的消息和原因。
9 恢复间隔。
它控制适配器在失败后尝试重新连接的间隔。
它默认为10000ms(十秒)。
10 确认模式;为手动确认设置为 true。
从版本 4.1 开始,你可以省略 URL。
相反,你可以在serverURIs属性的serverURIs中提供服务器 URI。
这样做,例如,可以启用到高可用集群的连接。

从版本 4.2.2 开始,当适配器成功订阅主题时,将发布MqttSubscribedEvent事件。当连接或订阅失败时,将发布MqttConnectionFailedEvent事件。实现ApplicationListener的 Bean 可以接收这些事件。

另外,一个名为recoveryInterval的新属性控制适配器在失败后尝试重新连接的时间间隔。它默认为10000ms(10 秒)。

在版本 4.2.3 之前,当适配器停止时,客户端总是取消订阅。
这是不正确的,因为如果客户端 QoS 大于 0,我们需要保持订阅活动,以便在适配器停止时到达
的消息在下一个开始时被传递。
这还需要将客户端工厂上的cleanSession属性设置为false
它默认为true
从 4.2.3 版本开始,如果cleanSession属性是false,则适配器不会取消订阅(默认情况下)。

可以通过在工厂上设置consumerCloseAction属性来重写此行为。,
它可以具有值:UNSUBSCRIBE_ALWAYSUNSUBSCRIBE_NEVER,并且UNSUBSCRIBE_CLEAN.
只有当cleanSession属性是true时,后者(默认)才会取消订阅。

要恢复到 pre-4.2.3 行为,请使用UNSUBSCRIBE_ALWAYS
从版本 5.0 开始,topicqosretained属性被映射到.RECEIVED_…​标题(MqttHeaders.RECEIVED_TOPICMqttHeaders.RECEIVED_QOSMqttHeaders.RECEIVED_RETAINED),以避免无意中传播到出站消息,即(默认情况下)使用MqttHeaders.TOPICMqttHeaders.QOSMqttHeaders.RETAINED标题。

# 在运行时添加和删除主题

从版本 4.1 开始,你可以通过编程方式更改订阅适配器的主题。 Spring 集成提供了addTopic()removeTopic()方法。在添加主题时,你可以选择指定QoS(默认值:1)。你还可以通过向具有适当有效负载的<control-bus/>发送适当的消息来修改主题,例如:"myMqttAdapter.addTopic('foo', 1)"

停止和启动适配器对主题列表没有影响(它不会恢复到配置中的原始设置)。更改不会保留到应用程序上下文的生命周期之后。新的应用程序上下文将恢复到已配置的设置。

当适配器停止(或与代理断开连接)时更改主题将在下一次建立连接时生效。

# 手动 ACK

从版本 5.3 开始,你可以将manualAcks属性设置为 true。通常用于异步确认交付。当设置为true时,header(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK)将添加到消息中,其值为SimpleAcknowledgment。你必须调用acknowledge()方法来完成交付。有关更多信息,请参见IMqttClient``setManualAcks()messageArrivedComplete()的 Javadocs。为了方便起见,提供了一个头访问器:

StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge();

从版本5.2.11开始,当消息转换器抛出异常或从MqttMessage转换返回null时,MqttPahoMessageDrivenChannelAdapterErrorMessage发送到errorChannel,如果提供的话。将此转换错误重新抛出到 MQTT 客户端回调中。

# 使用 Java 配置进行配置

下面的 Spring 引导应用程序展示了如何使用 Java 配置配置入站适配器的示例:

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
                .web(false)
                .run(args);
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
                                                 "topic1", "topic2");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

# 使用 Java DSL 进行配置

下面的 Spring 引导应用程序提供了一个使用 Java DSL 配置入站适配器的示例:

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow mqttInbound() {
        return IntegrationFlows.from(
                         new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
                                        "testClient", "topic1", "topic2");)
                .handle(m -> System.out.println(m.getPayload()))
                .get();
    }

}

# 出站通道适配器

出站通道适配器由MqttPahoMessageHandler实现,它包装在ConsumerEndpoint中。为了方便起见,你可以使用名称空间来配置它。

从版本 4.1 开始,适配器支持异步发送操作,在确认交付之前避免阻塞。如果需要,可以发出应用程序事件以使应用程序能够确认交付。

下面的清单显示了出站通道适配器可用的属性:

<int-mqtt:outbound-channel-adapter id="withConverter"
    client-id="foo"  (1)
    url="tcp://localhost:1883"  (2)
    converter="myConverter"  (3)
    client-factory="clientFactory"  (4)
    default-qos="1"  (5)
    qos-expression="" (6)
    default-retained="true"  (7)
    retained-expression="" (8)
    default-topic="bar"  (9)
    topic-expression="" (10)
    async="false"  (11)
    async-events="false"  (12)
    channel="target" />
1 客户 ID。
2 代理 URL。
3 MqttMessageConverter(可选)。
缺省的DefaultPahoMessageConverter可以识别以下标题:

mqtt_topic:将消息发送到的主题

mqtt_retained:true如果要保留消息
<><133"/>:服务质量<>
4 客户工厂。
5 默认的服务质量。
如果没有找到mqtt_qos标头,或者qos-expression返回null,则使用它。
如果你提供自定义converter,则不使用它。
6 一个表达式来求值以确定 QoS。
默认值是headers[mqtt_qos]
7 保留标志的默认值。
如果没有mqtt_retained标头,则使用它。
如果提供了自定义converter,则不使用它。
8 计算表达式以确定保留的布尔值。
默认值是headers[mqtt_retained]
9 消息发送到的默认主题(如果没有mqtt_topic头,则使用该主题)。
10 一个表达式来求值以确定目标主题。
默认值是headers['mqtt_topic']
11 true时,调用者不会阻塞。
相反,它会在发送消息时等待发送确认。
缺省值是false(发送阻塞直到确认发送)。
12 asyncasync-eventstrue时,会发出一个MqttMessageSentEvent(参见Events)。
它包含消息、主题、客户库生成的messageIdclientId,和clientInstance(每次连接客户机时递增)。
当交付被客户库确认时,将发出一个MqttMessageDeliveredEvent
它包含messageIdclientIdclientInstance,启用与发送相关的传递。
任何ApplicationListener或事件入站通道适配器都可以接收到这些事件。
注意,对于MqttMessageDeliveredEvent有可能在MqttMessageSentEvent之前接收到的
默认值是false
从版本 4.1 开始,可以省略 URL。
相反,可以在serverURIsserverURIs属性中提供服务器 URI。
例如,这可以实现到高可用集群的连接。

# 使用 Java 配置进行配置

下面的 Spring 引导应用程序展示了如何使用 Java 配置出站适配器的示例:

@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                new SpringApplicationBuilder(MqttJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToMqtt("foo");
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
        options.setUserName("username");
        options.setPassword("password".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                       new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        void sendToMqtt(String data);

    }

}

# 使用 Java DSL 进行配置

下面的 Spring 引导应用程序提供了一个使用 Java DSL 配置出站适配器的示例:

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

   	@Bean
   	public IntegrationFlow mqttOutboundFlow() {
   	    return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
    }

}

# 事件

某些应用程序事件由适配器发布。

  • MqttConnectionFailedEvent-如果连接失败或随后丢失连接,则由两个适配器发布。

  • MqttMessageSentEvent-如果以异步模式运行,则在消息已发送时由出站适配器发布。

  • MqttMessageDeliveredEvent-如果以异步模式运行,则由出站适配器在客户端指示消息已被传递时发布。

  • MqttSubscribedEvent-由入站适配器在订阅主题后发布。

这些事件可以通过ApplicationListener<MqttIntegrationEvent>@EventListener方法接收。

要确定事件的源,请使用以下方法;你可以检查 Bean 名称和/或连接选项(以访问服务器 URI 等)。

MqttPahoComponent source = event.getSourceAsType();
String beanName = source.getBeanName();
MqttConnectOptions options = source.getConnectionInfo();

# MQTT v5 支持

从版本 5.5.5 开始,spring-integration-mqtt模块为 MQTV5 协议提供通道适配器实现。org.eclipse.paho:org.eclipse.paho.mqttv5.client是一个optional依赖项,因此必须显式地包含在目标项目中。

由于 MQTT V5 协议在 MQTT 消息中支持额外的任意属性,因此引入了MqttHeaderMapper实现,以在发布和接收操作时映射到/来自头。默认情况下(通过*模式),它映射所有接收到的PUBLISH帧属性(包括用户属性)。在出站方面,它映射了PUBLISH框架的这个子集的头:contentTypemqtt_messageExpiryIntervalmqtt_responseTopicmqtt_correlationData

MQTT V5 协议的出站通道适配器以Mqttv5PahoMessageHandler的形式存在。它需要clientId和 MQTT 代理 URL 或MqttConnectionOptions引用。它支持MqttClientPersistence选项,可以是async,并且在那种情况下可以发出MqttIntegrationEvent对象(参见asyncEvents选项)。如果请求消息的有效负载是org.eclipse.paho.mqttv5.common.MqttMessage,则它将通过内部IMqttAsyncClient按原样发布。如果有效载荷是byte[],则按原样用于发布目标MqttMessage有效载荷。如果有效负载是String,则将其转换为byte[]以进行发布。剩余的用例被委托给所提供的MessageConverter,这是来自应用程序上下文的IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME``ConfigurableCompositeMessageConverter Bean。注意:当请求的消息有效负载已经是MqttMessage时,将不使用所提供的HeaderMapper<MqttProperties>。下面的 Java DSL 配置示例演示了如何在集成流中使用此通道适配器:

@Bean
public IntegrationFlow mqttOutFlow() {
    Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout");
    MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
    mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE);
    messageHandler.setHeaderMapper(mqttHeaderMapper);
    messageHandler.setAsync(true);
    messageHandler.setAsyncEvents(true);
    messageHandler.setConverter(mqttStringToBytesConverter());

    return f -> f.handle(messageHandler);
}
org.springframework.integration.mqtt.support.MqttMessageConverter不能与Mqttv5PahoMessageHandler一起使用,因为它的契约仅针对 MQTV3 协议。

有关更多信息,请参见Mqttv5PahoMessageHandlerJavadocs 及其超类。

MQTT V5 协议的入站通道适配器以Mqttv5PahoMessageDrivenChannelAdapter的形式存在。它需要clientId和 MQTT 代理 URL 或MqttConnectionOptions引用,以及要订阅和使用的主题。它支持MqttClientPersistence选项,默认情况下是在内存中。可以配置预期的payloadType(默认情况下byte[])并将其传播到所提供的SmartMessageConverter,用于从byte[]中转换所接收的MqttMessage。如果设置了manualAck选项,则在消息中添加一个IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK头,以生成SimpleAcknowledgment的实例。HeaderMapper<MqttProperties>用于将PUBLISH帧属性(包括用户属性)映射到目标消息头。标准的MqttMessage属性,如qosiddupretained,加上接收到的主题总是映射到标题。有关更多信息,请参见MqttHeaders

下面的 Java DSL 配置示例演示了如何在集成流中使用此通道适配器:

@Bean
public IntegrationFlow mqttInFlow() {
    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest");
    messageProducer.setPayloadType(String.class);
    messageProducer.setMessageConverter(mqttStringToBytesConverter());
    messageProducer.setManualAcks(true);

    return IntegrationFlows.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}
org.springframework.integration.mqtt.support.MqttMessageConverter不能与Mqttv5PahoMessageDrivenChannelAdapter一起使用,因为其契约仅针对 MQTT v3 协议。

有关更多信息,请参见Mqttv5PahoMessageDrivenChannelAdapterJavadocs 及其超类。

建议将MqttConnectionOptions#setAutomaticReconnect(boolean)设置为 true,以让内部IMqttAsyncClient实例处理重新连接。否则,只有手动重新启动这些通道适配器才能处理重新连接,例如通过MqttConnectionFailedEvent处理断开连接。