# Java DSL
# Java DSL
Spring 集成 Java 配置和 DSL 提供了一组方便的构建器和 Fluent API,允许你配置 Spring 来自 Spring @Configuration
类的集成消息流。
(另见Kotlin DSL。
用于 Spring 集成的 Java DSL 本质上是用于 Spring 集成的外观。DSL 提供了一种简单的方式,通过使用 FluentBuilder
模式以及 Spring Framework 和 Spring Integration 中的现有 Java 配置,将 Spring 集成消息流嵌入到你的应用程序中。我们还使用并支持 Lambdas(在 Java8 中可用),以进一步简化 Java 配置。
cafe (opens new window)提供了使用 DSL 的一个很好的示例。
DSL 是由IntegrationFlows
工厂为IntegrationFlowBuilder
提供的。这将生成IntegrationFlow
组件,该组件应注册为 Spring Bean(通过使用@Bean
注释)。Builder 模式用于将任意复杂的结构表示为可以接受 lambda 作为参数的方法的层次结构。
IntegrationFlowBuilder
只在IntegrationFlow
Bean 中收集集成组件(MessageChannel
实例,AbstractEndpoint
实例,以此类推),用于通过IntegrationFlowBeanPostProcessor
在应用程序上下文中进一步解析和注册具体 bean。
Java DSL 直接使用 Spring 集成类,并绕过任何 XML 生成和解析。然而,DSL 在 XML 之上提供的不仅仅是语法方面的优势。它最引人注目的特性之一是能够定义内联 lambda 来实现端点逻辑,从而消除了对外部类来实现自定义逻辑的需求。从某种意义上说, Spring 集成对 Spring 表达式语言和内联脚本的支持解决了这一问题,但 Lambda 更简单,功能也更强大。
下面的示例展示了如何使用 Java 配置进行 Spring 集成:
@Configuration
@EnableIntegration
public class MyConfiguration {
@Bean
public AtomicInteger integerSource() {
return new AtomicInteger();
}
@Bean
public IntegrationFlow myFlow() {
return IntegrationFlows.fromSupplier(integerSource()::getAndIncrement,
c -> c.poller(Pollers.fixedRate(100)))
.channel("inputChannel")
.filter((Integer p) -> p > 0)
.transform(Object::toString)
.channel(MessageChannels.queue())
.get();
}
}
前面的配置示例的结果是,它在ApplicationContext
启动后创建 Spring 集成端点和消息通道。Java 配置既可以用来替换 XML 配置,也可以用来增强 XML 配置。你不需要替换所有现有的 XML 配置来使用 Java 配置。
# DSL 基础知识
org.springframework.integration.dsl
包包含前面提到的IntegrationFlowBuilder
API 和许多IntegrationComponentSpec
实现,它们也是构建器,并提供 Fluent API 来配置具体的端点。IntegrationFlowBuilder
基础架构为基于消息的应用程序(例如通道、端点、poller 和通道拦截器)提供了常见的Enterprise 整合模式 (opens new window)。
端点在 DSL 中用动词表示,以提高可读性。以下列表包括常见的 DSL 方法名称和相关的 EIP 端点:
转换
Transformer
过滤器
Filter
句柄
ServiceActivator
分割
Splitter
总计
Aggregator
Route
Router
桥梁
Bridge
从概念上讲,集成过程是通过将这些端点组合成一个或多个消息流来构建的。请注意,EIP 并未正式定义术语“消息流”,但将其视为使用众所周知的消息传递模式的工作单元是有用的。DSL 提供了一个IntegrationFlow
组件来定义通道和端点之间的组合,但是现在IntegrationFlow
只扮演配置角色来填充应用程序上下文中的实际 bean,并且在运行时不使用。然而,对于IntegrationFlow
的 Bean 可以自动连线为Lifecycle
,以控制start()
和stop()
的整个流,该流被委派给与此IntegrationFlow
相关的所有 Spring 集成组件。下面的示例使用IntegrationFlows
工厂,通过使用IntegrationFlowBuilder
中的 eip-methods 来定义IntegrationFlow
Bean:
@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlows.from("input")
.<String, Integer>transform(Integer::parseInt)
.get();
}
transform
方法接受一个 lambda 作为端点参数来对消息有效负载进行操作。该方法的实参为GenericTransformer<S, T>
。因此,所提供的任何变压器(ObjectToJsonTransformer
、FileToStringTransformer
等)都可以在此使用。
在封面下,IntegrationFlowBuilder
识别MessageHandler
及其端点,分别为MessageTransformingHandler
和ConsumerEndpointFactoryBean
。再举一个例子:
@Bean
public IntegrationFlow myFlow() {
return IntegrationFlows.from("input")
.filter("World"::equals)
.transform("Hello "::concat)
.handle(System.out::println)
.get();
}
前面的示例构成了Filter → Transformer → Service Activator
的序列。流程是“单向”。也就是说,它不提供回复消息,而只将有效负载打印到 Stdout。端点通过直接通道自动连接在一起。
lambdas 和Message<?> 参数在 EIP 方法中使用 lambdas 时,“input”参数通常是消息有效载荷。 如果你希望访问整个消息,请使用一个重载方法,该方法将 Class<?> 作为第一个参数。,例如,这将不起作用:<br/>.<Message<?>, Foo>transform(m -> newFooFromMessage(m))<br/> 这将在运行时使用 ClassCastException 时失败,因为 lambda 不保留参数类型,并且框架将尝试将有效负载强制转换为,而是使用: r=“112”/><gt="96"/> |
---|
Bean 定义重写 该 Java DSL 可以为在流定义中在线定义的对象注册 bean,并且可以重用现有的、注入的 bean。 在为在线对象定义的名称与现有的 Bean 定义相同的情况下,抛出一个 BeanDefinitionOverrideException ,表示这样的配置是错误的。但是,当你处理 prototype bean 时,没有办法从集成流处理器中检测到现有的 Bean 定义,因为每次我们从BeanFactory 调用prototype Bean 时,都会得到一个新的实例。这样,在 IntegrationFlow 中就会使用所提供的实例。正如没有任何 Bean 注册和任何可能的检查现有的prototype Bean 定义。但是 BeanFactory.initializeBean() 是为这个对象调用的,如果它有一个显式的id 并且 Bean 这个名称的定义在prototype 范围内。 |
---|
# 消息通道
除了带有 EIP 方法的IntegrationFlowBuilder
之外,Java DSL 还提供了一个 Fluent API 来配置MessageChannel
实例。为此,提供了MessageChannels
建设者工厂。下面的示例展示了如何使用它:
@Bean
public MessageChannel priorityChannel() {
return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
.interceptor(wireTap())
.get();
}
相同的MessageChannels
Builder Factory 可以在channel()
EIP 方法中从IntegrationFlowBuilder
连接到端点,类似于在 XML 配置中连接一个input-channel
/output-channel
对。默认情况下,端点用DirectChannel
实例连接,其中 Bean 名称基于以下模式:[IntegrationFlow.beanName].channel#[channelNameIndex]
。此规则也适用于 inlineMessageChannels
Builder Factory Usage 产生的未命名通道。但是,所有MessageChannels
方法都有一个知道channelId
的变体,你可以使用它为MessageChannel
实例设置 Bean 名称。MessageChannel
引用和beanName
引用可以用作 Bean-方法调用。下面的示例显示了使用channel()
EIP 方法的可能方法:
@Bean
public MessageChannel queueChannel() {
return MessageChannels.queue().get();
}
@Bean
public MessageChannel publishSubscribe() {
return MessageChannels.publishSubscribe().get();
}
@Bean
public IntegrationFlow channelFlow() {
return IntegrationFlows.from("input")
.fixedSubscriberChannel()
.channel("queueChannel")
.channel(publishSubscribe())
.channel(MessageChannels.executor("executorChannel", this.taskExecutor))
.channel("output")
.get();
}
from("input")
表示“用”输入“ID 查找并使用MessageChannel
,或者创建一个”。fixedSubscriberChannel()
生成FixedSubscriberChannel
的实例,并将其注册为channelFlow.channel#0
。channel("queueChannel")
的工作方式相同,但使用现有的queueChannel
Bean。channel(publishSubscribe())
是 Bean-方法引用。channel(MessageChannels.executor("executorChannel", this.taskExecutor))
是将IntegrationFlowBuilder
暴露给IntegrationComponentSpec
的ExecutorChannel
并将其注册为executorChannel
的IntegrationFlowBuilder
。channel("output")
以DirectChannel
Bean 为名称注册DirectChannel
Bean,只要没有这个名称的 bean 已经存在。
注意:前面的IntegrationFlow
定义是有效的,它的所有通道都应用于具有BridgeHandler
实例的端点。
注意通过MessageChannels 工厂从不同的IntegrationFlow 实例使用相同的内联通道定义。即使 DSL 解析器将不存在的对象注册为 bean,也无法从不同的 IntegrationFlow 容器中确定相同的对象(MessageChannel )。以下示例错误: |
---|
@Bean
public IntegrationFlow startFlow() {
return IntegrationFlows.from("input")
.transform(...)
.channel(MessageChannels.queue("queueChannel"))
.get();
}
@Bean
public IntegrationFlow endFlow() {
return IntegrationFlows.from(MessageChannels.queue("queueChannel"))
.handle(...)
.get();
}
这个坏例子的结果是出现了以下例外:
Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
there is already object [queueChannel] bound
at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)
要使其工作,你需要为该通道声明@Bean
,并从不同的IntegrationFlow
实例中使用其 Bean 方法。
# 调查者
Spring 集成还提供了一个 Fluent API,它允许你为PollerMetadata
实现配置AbstractPollingEndpoint
。你可以使用Pollers
Builder 工厂来配置公共 Bean 定义或从IntegrationFlowBuilder
EIP 方法创建的定义,如下例所示:
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
return Pollers.fixedRate(500)
.errorChannel("myErrors");
}
参见[Pollers
](https://DOCS. Spring.io/ Spring-integration/api/org/springframework/integration/dsl/pollers.html)和[PollerSpec
](https://DOCS. Spring.io/ Spring-integration/api/org/springframework/integration/dsl/pollerspec.html)中的 Javadoc 以获取更多信息。
如果使用 DSL 构造PollerSpec 作为@Bean ,请不要调用 Bean 定义中的get() 方法。PollerSpec 是一个FactoryBean ,它从规范中生成PollerMetadata 对象并初始化其所有属性。 |
---|
# `端点
从版本 5.5 开始,ConsumerEndpointSpec
提供了一个reactive()
配置属性,并带有一个可选的定制程序Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
。此选项将目标端点配置为ReactiveStreamsConsumer
实例,独立于输入通道类型,该类型通过IntegrationReactiveUtils.messageChannelToFlux()
转换为Flux
。所提供的函数是使用来自Flux.transform()
的操作符来自定义(publishOn()
、log()
、doOnNext()
等)来自输入通道的反应流的源。
下面的示例演示了如何将发布线程从独立于最终订阅者和生产者的输入通道更改为DirectChannel
:
@Bean
public IntegrationFlow reactiveEndpointFlow() {
return IntegrationFlows
.from("inputChannel")
.<String, Integer>transform(Integer::parseInt,
e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())))
.get();
}
有关更多信息,请参见反应流支持。
# DSL 和端点配置
所有IntegrationFlowBuilder
EIP 方法都有一个变体,该变体应用 lambda 参数为AbstractEndpoint
实例提供选项:SmartLifecycle
、PollerMetadata
、request-handler-advice-chain
和其他实例。每个参数都有通用参数,因此它允许你在上下文中配置一个端点,甚至它的MessageHandler
,如下例所示:
@Bean
public IntegrationFlow flow2() {
return IntegrationFlows.from(this.inputChannel)
.transform(new PayloadSerializingTransformer(),
c -> c.autoStartup(false).id("payloadSerializingTransformer"))
.transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice()))
.get();
}
此外,EndpointSpec
提供了一个id()
方法,使你可以用给定的 Bean 名称注册端点 Bean,而不是生成的名称。
如果MessageHandler
被引用为 Bean,那么如果 DSL 定义中存在.advice()
方法,则任何现有的adviceChain
配置都将被重写:
@Bean
public TcpOutboundGateway tcpOut() {
TcpOutboundGateway gateway = new TcpOutboundGateway();
gateway.setConnectionFactory(cf());
gateway.setAdviceChain(Collections.singletonList(fooAdvice()));
return gateway;
}
@Bean
public IntegrationFlow clientTcpFlow() {
return f -> f
.handle(tcpOut(), e -> e.advice(testAdvice()))
.transform(Transformers.objectToString());
}
即它们不是合并的,在这种情况下只使用testAdvice()
Bean。
# 变形金刚
DSL API 提供了一个方便的、fluentTransformers
工厂,可用作.transform()
EIP 方法中的内联目标对象定义。下面的示例展示了如何使用它:
@Bean
public IntegrationFlow transformFlow() {
return IntegrationFlows.from("input")
.transform(Transformers.fromJson(MyPojo.class))
.transform(Transformers.serializer())
.get();
}
它避免了使用 setter 进行不方便的编码,并使流定义更加简单。请注意,你可以使用Transformers
将目标Transformer
实例声明为@Bean
实例,并再次将它们从IntegrationFlow
定义为 Bean 方法。尽管如此,DSL 解析器处理 Bean 内联对象的声明,如果它们还没有被定义为 bean 的话。
有关更多信息和支持的工厂方法,请参见 Javadoc 中的变形金刚 (opens new window)。
另请参见[lambdas 和Message<?>
参数]。
# 入站通道适配器
通常,消息流从入站通道适配器开始(例如<int-jdbc:inbound-channel-adapter>
)。适配器配置为<poller>
,并要求MessageSource<?>
定期生成消息。Java DSL 也允许从MessageSource<?>
开始IntegrationFlow
。为此,IntegrationFlows
Builder 工厂提供了一个重载的IntegrationFlows.from(MessageSource<?> messageSource)
方法。你可以将MessageSource<?>
配置为 Bean,并将其作为该方法的参数。IntegrationFlows.from()
的第二个参数是一个Consumer<SourcePollingChannelAdapterSpec>
lambda,它允许你为PollerMetadata
或SmartLifecycle
提供选项。下面的示例展示了如何使用 Fluent API 和 lambda 来创建IntegrationFlow
:
@Bean
public MessageSource<Object> jdbcMessageSource() {
return new JdbcPollingChannelAdapter(this.dataSource, "SELECT * FROM something");
}
@Bean
public IntegrationFlow pollingFlow() {
return IntegrationFlows.from(jdbcMessageSource(),
c -> c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1)))
.transform(Transformers.toJson())
.channel("furtherProcessChannel")
.get();
}
对于那些不需要直接构建Message
对象的情况,可以使用基于java.util.function.Supplier
的IntegrationFlows.fromSupplier()
变体。Supplier.get()
的结果会自动包装在Message
中(如果它还不是Message
)。
# 消息路由器
Spring 集成原生地提供了专门的路由器类型,包括:
HeaderValueRouter
PayloadTypeRouter
ExceptionTypeRouter
RecipientListRouter
XPathRouter
与许多其他 DSLIntegrationFlowBuilder
EIP 方法一样,route()
方法可以应用任何AbstractMessageRouter
实现,或者为了方便起见,将String
作为 spel 表达式或ref
-method
对。此外,你可以使用 lambda 配置route()
,并使用 lambda 实现Consumer<RouterSpec<MethodInvokingRouter>>
。Fluent API 还提供AbstractMappingMessageRouter
选项,如channelMapping(String key, String channelName)
对,如下例所示:
@Bean
public IntegrationFlow routeFlowByLambda() {
return IntegrationFlows.from("routerInput")
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.suffix("Channel")
.channelMapping(true, "even")
.channelMapping(false, "odd")
)
.get();
}
下面的示例展示了一个简单的基于表达式的路由器:
@Bean
public IntegrationFlow routeFlowByExpression() {
return IntegrationFlows.from("routerInput")
.route("headers['destChannel']")
.get();
}
routeToRecipients()
方法接受Consumer<RecipientListRouterSpec>
,如下例所示:
@Bean
public IntegrationFlow recipientListFlow() {
return IntegrationFlows.from("recipientListInput")
.<String, String>transform(p -> p.replaceFirst("Payload", ""))
.routeToRecipients(r -> r
.recipient("thing1-channel", "'thing1' == payload")
.recipientMessageSelector("thing2-channel", m ->
m.getHeaders().containsKey("recipient")
&& (boolean) m.getHeaders().get("recipient"))
.recipientFlow("'thing1' == payload or 'thing2' == payload or 'thing3' == payload",
f -> f.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("recipientListSubFlow1Result")))
.recipientFlow((String p) -> p.startsWith("thing3"),
f -> f.transform("Hello "::concat)
.channel(c -> c.queue("recipientListSubFlow2Result")))
.recipientFlow(new FunctionExpression<Message<?>>(m ->
"thing3".equals(m.getPayload())),
f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
.defaultOutputToParentFlow())
.get();
}
.routeToRecipients()
定义的.defaultOutputToParentFlow()
允许你将路由器的defaultOutput
设置为网关,以继续处理主流中不匹配的消息。
另请参见[lambdas 和Message<?>
参数]。
# 分离器
要创建拆分器,请使用split()
EIP 方法。默认情况下,如果有效负载是Iterable
、Iterator
、Array
、Stream
或活性Publisher
,则split()
方法将每个项输出为单独的消息。它接受 lambda、spel 表达式或任何AbstractMessageSplitter
实现。或者,你可以使用它而不需要参数来提供DefaultMessageSplitter
。下面的示例展示了如何通过提供 lambda 来使用split()
方法:
@Bean
public IntegrationFlow splitFlow() {
return IntegrationFlows.from("splitInput")
.split(s -> s.applySequence(false).delimiters(","))
.channel(MessageChannels.executor(taskExecutor()))
.get();
}
前面的示例创建了一个拆分器,该拆分包含以逗号分隔的String
的消息。
另请参见[lambdas 和Message<?>
参数]。
# 聚合器和重排序程序
Aggregator
在概念上与Splitter
相反。它将单个消息的序列聚合到单个消息中,并且必然更复杂。默认情况下,聚合器返回一条包含来自传入消息的有效负载集合的消息。同样的规则也适用于Resequencer
。下面的示例展示了拆分器-聚合器模式的典型示例:
@Bean
public IntegrationFlow splitAggregateFlow() {
return IntegrationFlows.from("splitAggregateInput")
.split()
.channel(MessageChannels.executor(this.taskExecutor()))
.resequence()
.aggregate()
.get();
}
split()
方法将列表分割为单个消息,并将它们发送到ExecutorChannel
。resequence()
方法根据消息标题中的序列详细信息对消息进行重新排序。aggregate()
方法收集这些消息。
但是,你可以通过指定发布策略和相关策略等来更改默认行为。考虑以下示例:
.aggregate(a ->
a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
.releaseStrategy(g -> g.size() > 10)
.messageStore(messageStore()))
前面的示例将具有myCorrelationKey
头的消息关联起来,并在至少积累了 10 个头的情况下释放这些消息。
对于resequence()
EIP 方法,提供了类似的 lambda 配置。
# `方法
.handle()
EIP 方法的目标是在某些 POJO 上调用任何MessageHandler
实现或任何方法。另一种选择是使用 lambda 表达式来定义“活动”。因此,我们引入了一个通用的GenericHandler<P>
功能接口。它的handle
方法需要两个参数:P payload
和MessageHeaders headers
(从版本 5.1 开始)。有了这一点,我们可以将流定义如下:
@Bean
public IntegrationFlow myFlow() {
return IntegrationFlows.from("flow3Input")
.<Integer>handle((p, h) -> p * 2)
.get();
}
前面的示例将它接收到的任何整数加倍。
然而, Spring 集成的一个主要目标是loose coupling
,通过从消息有效负载到消息处理程序的目标参数的运行时类型转换。由于 Java 不支持 lambda 类的泛型类型解析,因此我们引入了一个解决方案,为大多数 EIP 方法和LambdaMessageProcessor
提供了一个额外的payloadType
参数。这样就将困难的转换工作委托给 Spring 的ConversionService
,它使用提供的type
和请求的消息来实现目标方法参数。下面的示例显示了结果IntegrationFlow
可能是什么样子的:
@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlows.from("input")
.<byte[], String>transform(p - > new String(p, "UTF-8"))
.handle(Integer.class, (p, h) -> p * 2)
.get();
}
我们还可以在ConversionService
中注册一些BytesToIntegerConverter
,以消除附加的.transform()
:
@Bean
@IntegrationConverter
public BytesToIntegerConverter bytesToIntegerConverter() {
return new BytesToIntegerConverter();
}
@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlows.from("input")
.handle(Integer.class, (p, h) -> p * 2)
.get();
}
另请参见[lambdas 和Message<?>
参数]。
# 运营商网关()
在IntegrationFlow
定义中的gateway()
运算符是一种特殊的服务激活器实现,通过其输入通道调用一些其他端点或集成流并等待应答。从技术上讲,它在<chain>
定义中扮演与嵌套<gateway>
组件相同的角色(参见从一条链子中调用一条链子),并允许流更干净、更直接。从逻辑上和业务角度来看,它是一个消息传递网关,允许在目标集成解决方案的不同部分之间分配和重用功能(参见消息传递网关)。对于不同的目标,这个操作符有几个重载:
gateway(String requestChannel)
按其名称向某个端点的输入通道发送消息;gateway(MessageChannel requestChannel)
通过直接注入向某个端点的输入通道发送消息;gateway(IntegrationFlow flow)
向所提供的输入通道发送消息IntegrationFlow
。
所有这些都具有一个具有第二个Consumer<GatewayEndpointSpec>
参数的变体,以配置目标GatewayMessageHandler
和相应的AbstractEndpoint
。此外,基于IntegrationFlow
的方法还允许调用现有的IntegrationFlow
Bean 或通过用于IntegrationFlow
功能接口的就地 lambda 将流声明为子流,或者在private
方法中将其提取为更干净的代码样式:
@Bean
IntegrationFlow someFlow() {
return IntegrationFlows
.from(...)
.gateway(subFlow())
.handle(...)
.get();
}
private static IntegrationFlow subFlow() {
return f -> f
.scatterGather(s -> s.recipientFlow(...),
g -> g.outputProcessor(MessageGroup::getOne))
}
如果下游流并不总是返回一个响应,你应该将requestTimeout 设置为 0,以防止无限期地挂起调用线程。在这种情况下,流将在该点结束,线程将被释放以进行进一步的工作。 |
---|
# 运算符日志()
为了方便起见,为了记录通过 Spring 集成流(<logging-channel-adapter>
)的消息行程,呈现了一个log()
操作符。在内部,它由WireTap``ChannelInterceptor
表示,其订阅服务器为LoggingHandler
。它负责将传入消息记录到下一个端点或当前通道中。下面的示例展示了如何使用LoggingHandler
:
.filter(...)
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(...)
在前面的示例中,id
头在ERROR
级别记录到test.category
中,仅用于传递过滤器和路由之前的消息。
当这个操作符在流程的末尾使用时,它是单向处理程序,流程结束。要使其成为一个产生回复的流,你可以在log()
之后使用一个简单的bridge()
,或者,从版本 5.1 开始,你可以使用一个logAndReply()
操作符。logAndReply
只能在流的末尾使用。
# 操作符截获()
从版本 5.3 开始,intercept()
操作符允许在流程中的当前ChannelInterceptor
处注册一个或多个ChannelInterceptor
实例。这是通过MessageChannels
API 创建显式MessageChannel
API 的一种替代方法。下面的示例使用MessageSelectingInterceptor
来拒绝某些异常消息:
.transform(...)
.intercept(new MessageSelectingInterceptor(m -> m.getPayload().isValid()))
.handle(...)
# `
Spring 集成包括.wireTap()
Fluent APIMessageChannelSpec
Builders。下面的示例展示了如何使用wireTap
方法记录输入:
@Bean
public QueueChannelSpec myChannel() {
return MessageChannels.queue()
.wireTap("loggingFlow.input");
}
@Bean
public IntegrationFlow loggingFlow() {
return f -> f.log();
}
如果MessageChannel 是InterceptableChannel 的实例,则将log() 、wireTap() 或intercept() 运算符应用于当前的MessageChannel 。否则,将向当前配置的端点的流中注入一个中间件 DirectChannel 。在下面的示例中,将WireTap 拦截器直接添加到myChannel 中,因为DirectChannel 实现了InterceptableChannel :<br/>@Bean<br/>MessageChannel myChannel() {<br/> return new DirectChannel();<br/>}<br/><br/>...<br/> .channel(myChannel())<br/> .log()<br/>}<br/> |
---|
当当前的MessageChannel
不实现InterceptableChannel
时,一个隐式的DirectChannel
和BridgeHandler
被注入到IntegrationFlow
中,并且WireTap
被添加到这个新的DirectChannel
中。以下示例没有任何通道声明:
.handle(...)
.log()
}
在前面的示例中(并且在没有信道被声明的任何时间),隐式的DirectChannel
被注入在IntegrationFlow
的当前位置中,并用作当前配置的ServiceActivatingHandler
的输出信道(来自.handle()
,前面描述的)。
# 处理消息流
IntegrationFlowBuilder
提供了一个顶级 API 来生成连接到消息流的集成组件。当你的集成可以用单个流(通常是这种情况)来完成时,这是很方便的。交替地IntegrationFlow
实例可以通过MessageChannel
实例进行连接。
默认情况下,MessageFlow
在 Spring 集成术语中表现为“链”。也就是说,端点由DirectChannel
实例自动隐式连接。消息流实际上并不是作为一个链构建的,这提供了更多的灵活性。例如,如果你知道它的inputChannel
名称(也就是说,如果你显式地定义了它),那么你可以向流中的任何组件发送消息。你还可以在流中引用外部定义的通道,以允许使用通道适配器(以启用远程传输协议、文件 I/O 等),而不是直接通道。因此,DSL 不支持 Spring Integrationchain
元素,因为在这种情况下它不会增加太多的值。
由于 Spring 集成 Java DSL 产生了与任何其他配置选项相同的 Bean 定义模型,并且是基于现有的 Spring 框架@Configuration
基础设施的,因此可以与 XML 定义一起使用并有线 Spring 集成消息注释配置。
你还可以使用 lambda 定义直接IntegrationFlow
实例。下面的示例展示了如何做到这一点:
@Bean
public IntegrationFlow lambdaFlow() {
return f -> f.filter("World"::equals)
.transform("Hello "::concat)
.handle(System.out::println);
}
这个定义的结果是使用隐式直接通道连接的同一组集成组件。这里的唯一限制是,这个流是从一个命名的直接通道开始的-lambdaFlow.input
。此外,lambda 流不能从MessageSource
或MessageProducer
开始。
从版本 5.1 开始,这种IntegrationFlow
被包装到代理,以公开生命周期控制,并提供对内部关联inputChannel
的inputChannel
的访问。
从版本 5.0.6 开始,在IntegrationFlow
中为组件生成的 Bean 名称包括流 Bean,后面跟着一个点(.
)作为前缀。例如,前面示例中的ConsumerEndpointFactoryBean
的.transform("Hello "::concat)
的结果是 Bean 名称lambdaFlow.o.s.i.config.ConsumerEndpointFactoryBean#0
。(o.s.i
是从org.springframework.integration
到适合页面的缩写。)该端点的Transformer
实现 Bean 的 Bean 名称为lambdaFlow.transformer#0
(从版本 5.1 开始),其中使用的是其组件类型,而不是MethodInvokingTransformer
类的完全限定名称。当必须在流中生成 Bean 名称时,对所有NamedComponent
s 应用相同的模式。 Bean 这些生成的名称与流 ID 一起前置,以用于诸如解析日志或在某些分析工具中将组件分组在一起的目的,以及在运行时并发注册集成流时避免竞争条件。有关更多信息,请参见动态和运行时集成流。
# FunctionExpression
我们引入了FunctionExpression
类(spel 的Expression
接口的一种实现)来让我们使用 lambdas 和generics
。当存在来自核心 Spring 集成的隐式Strategy
变量时,将为 DSL 组件提供Function<T, R>
选项以及expression
选项。下面的示例展示了如何使用函数表达式:
.enrich(e -> e.requestChannel("enrichChannel")
.requestPayload(Message::getPayload)
.propertyFunction("date", m -> new Date()))
FunctionExpression
还支持运行时类型转换,就像SpelExpression
中所做的那样。
# 子流支持
一些if…else
和publish-subscribe
组件提供了通过使用子流指定其逻辑或映射的能力。最简单的示例是.publishSubscribeChannel()
,如下例所示:
@Bean
public IntegrationFlow subscribersFlow() {
return flow -> flow
.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
.subscribe(f -> f
.<Integer>handle((p, h) -> p / 2)
.channel(c -> c.queue("subscriber1Results")))
.subscribe(f -> f
.<Integer>handle((p, h) -> p * 2)
.channel(c -> c.queue("subscriber2Results"))))
.<Integer>handle((p, h) -> p * 3)
.channel(c -> c.queue("subscriber3Results"));
}
你可以使用单独的IntegrationFlow``@Bean
定义来实现相同的结果,但是我们希望你发现逻辑组合的子流样式是有用的。我们发现,这会导致代码更短(因此也更可读)。
从版本 5.3 开始,提供了一个基于BroadcastCapableChannel
的publishSubscribeChannel()
实现,以在代理支持的消息通道上配置子流订阅服务器。例如,我们现在可以将几个订阅服务器配置为Jms.publishSubscribeChannel()
上的子流:
@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
return Jms.publishSubscribeChannel(jmsConnectionFactory())
.destination("pubsub")
.get();
}
@Bean
public IntegrationFlow pubSubFlow() {
return f -> f
.publishSubscribeChannel(jmsPublishSubscribeChannel(),
pubsub -> pubsub
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel1")))
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}
@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel(ConnectionFactory jmsConnectionFactory) {
return (BroadcastCapableChannel) Jms.publishSubscribeChannel(jmsConnectionFactory)
.destination("pubsub")
.get();
}
类似的publish-subscribe
子流组合提供了.routeToRecipients()
方法。
另一个例子是在.filter()
方法上使用.discardFlow()
而不是.discardChannel()
。
.route()
值得特别关注。考虑以下示例:
@Bean
public IntegrationFlow routeFlow() {
return f -> f
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.channelMapping("true", "evenChannel")
.subFlowMapping("false", sf ->
sf.<Integer>handle((p, h) -> p * 3)))
.transform(Object::toString)
.channel(c -> c.queue("oddChannel"));
}
.channelMapping()
继续工作,就像在常规Router
映射中一样,但是.subFlowMapping()
将该子流绑定到主流。换句话说,任何路由器的子流在.route()
之后返回到主流。
有时,你需要从.subFlowMapping() 中引用一个已存在的IntegrationFlow``@Bean 。下面的示例展示了如何这样做: <br/>@Bean<br/>public IntegrationFlow splitRouteAggregate() {<br/> return f -> f<br/> .split()<br/> .<Integer, Boolean>route(o -> o % 2 == 0,<br/> m -> m<br/> .subFlowMapping(true, oddFlow())<br/> .subFlowMapping(false, sf -> sf.gateway(evenFlow())))<br/> .aggregate();<br/>}<br/><br/>@Bean<br/>public IntegrationFlow oddFlow() {<br/> return f -> f.handle(m -> System.out.println("odd"));<br/>}<br/><br/>@Bean<br/>public IntegrationFlow evenFlow() {<br/> return f -> f.handle((p, h) -> "even");<br/>}<br/> 在这种情况下,当你需要从这样的子流收到回复并继续主流时,这个 IntegrationFlow Bean 引用(或其输入通道)必须用.gateway() 进行包装,如前面的示例所示。前面示例中的 oddFlow() 引用没有包装到.gateway() 。因此,我们不期望从这个路由分支得到答复。 否则,你最终会遇到一个类似于以下情况的异常: <br/>Caused by: org.springframework.beans.factory.BeanCreationException:<br/> The 'currentComponent' ([email protected]a51c)<br/> is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'.<br/> This is the end of the integration flow.<br/> 当你将一个子流配置为 lambda 时,框架将处理与子流的请求-回复交互,并且不需要网关。 |
---|
子流可以嵌套到任何深度,但我们不建议这样做。事实上,即使是在路由器的情况下,在一个流中添加复杂的子流,很快就会开始看起来像一盘意大利面条,并且对于人类来说很难进行解析。
在 DSL 支持子流配置的情况下,当所配置的组件通常需要一个通道,并且该子流以channel() 元素开始时,框架会隐式地在组件输出通道和流的输入通道之间放置一个bridge() 。,例如,在这个filter 定义中:<br/>.filter(p -> p instanceof String, e -> e<br/> .discardFlow(df -> df<br/> .channel(MessageChannels.queue())<br/> ...)<br/> 框架内部创建了一个 DirectChannel Bean 用于注入MessageFilter.discardChannel 。然后它将子流包装到一个 IntegrationFlow 中,从这个隐式通道开始,用于订阅并在流中指定的channel() 之前放置一个bridge 。当现有的 IntegrationFlow Bean 被用作子流引用(而不是内置子流,例如 lambda)时,没有这样的桥需要,因为框架可以解析来自第一通道的流 Bean。具有内联子流的输入通道尚不可用。 |
---|
# 使用协议适配器
到目前为止所示的所有示例都说明了 DSL 如何通过使用 Spring 集成编程模型来支持消息传递体系结构。然而,我们还没有进行任何真正的整合。这样做需要通过 HTTP、JMS、AMQP、TCP、JDBC、FTP、SMTP 等访问远程资源,或者访问本地文件系统。 Spring 集成支持所有这些和更多内容。理想情况下,DSL 应该为所有这些功能提供一流的支持,但是实现所有这些功能并随着新的适配器被添加到 Spring 集成中而跟上,这是一项艰巨的任务。因此,期望是 DSL 不断赶上 Spring 集成。
因此,我们提供了高级 API 来无缝地定义特定于协议的消息传递。我们用工厂和建造商的模式以及 lambdas 来实现这一点。你可以将工厂类视为“命名空间工厂”,因为它们与来自具体协议特定的 Spring 集成模块的组件的 XML 命名空间扮演相同的角色。目前, Spring 集成 Java DSL 支持Amqp
、Feed
、Jms
、Files
、(S)Ftp
、Http
、JPA
、TCP/UDP
、Mail
、和Scripts
名称空间工厂。下面的示例显示了如何使用其中的三个(Amqp
,Jms
和Mail
):
@Bean
public IntegrationFlow amqpFlow() {
return IntegrationFlows.from(Amqp.inboundGateway(this.rabbitConnectionFactory, queue()))
.transform("hello "::concat)
.transform(String.class, String::toUpperCase)
.get();
}
@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
return IntegrationFlows.from("jmsOutboundGatewayChannel")
.handle(Jms.outboundGateway(this.jmsConnectionFactory)
.replyContainer(c ->
c.concurrentConsumers(3)
.sessionTransacted(true))
.requestDestination("jmsPipelineTest"))
.get();
}
@Bean
public IntegrationFlow sendMailFlow() {
return IntegrationFlows.from("sendMailChannel")
.handle(Mail.outboundAdapter("localhost")
.port(smtpPort)
.credentials("user", "pw")
.protocol("smtp")
.javaMailProperties(p -> p.put("mail.debug", "true")),
e -> e.id("sendMailEndpoint"))
.get();
}
前面的示例展示了如何使用“名称空间工厂”作为内联适配器声明。但是,你可以从@Bean
定义中使用它们,以使IntegrationFlow
方法链更具可读性。
我们正在征求社区对这些命名空间工厂的反馈意见,然后再将精力花在其他工厂上。 我们还感谢对优先级排序的任何输入,我们下一步应该支持哪些适配器和网关。 |
---|
你可以在整个参考手册中的特定于协议的章节中找到更多的 Java DSL 示例。
所有其他协议通道适配器可以被配置为通用 bean 并连接到IntegrationFlow
,如下例所示:
@Bean
public QueueChannelSpec wrongMessagesChannel() {
return MessageChannels
.queue()
.wireTap("wrongMessagesWireTapChannel");
}
@Bean
public IntegrationFlow xpathFlow(MessageChannel wrongMessagesChannel) {
return IntegrationFlows.from("inputChannel")
.filter(new StringValueTestXPathMessageSelector("namespace-uri(/*)", "my:namespace"),
e -> e.discardChannel(wrongMessagesChannel))
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(xpathRouter(wrongMessagesChannel))
.get();
}
@Bean
public AbstractMappingMessageRouter xpathRouter(MessageChannel wrongMessagesChannel) {
XPathRouter router = new XPathRouter("local-name(/*)");
router.setEvaluateAsString(true);
router.setResolutionRequired(false);
router.setDefaultOutputChannel(wrongMessagesChannel);
router.setChannelMapping("Tags", "splittingChannel");
router.setChannelMapping("Tag", "receivedChannel");
return router;
}
# IntegrationFlowAdapter
IntegrationFlow
接口可以直接实现并指定为用于扫描的组件,如下例所示:
@Component
public class MyFlow implements IntegrationFlow {
@Override
public void configure(IntegrationFlowDefinition<?> f) {
f.<String, String>transform(String::toUpperCase);
}
}
它由IntegrationFlowBeanPostProcessor
拾取,并在应用程序上下文中进行了正确的解析和注册。
为了方便和获得松耦合体系结构的好处,我们提供了IntegrationFlowAdapter
基类实现。它需要一个buildFlow()
方法实现来通过使用from()
方法中的一个来生成IntegrationFlowDefinition
方法,如下例所示:
@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {
private final AtomicBoolean invoked = new AtomicBoolean();
public Date nextExecutionTime(TriggerContext triggerContext) {
return this.invoked.getAndSet(true) ? null : new Date();
}
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(this::messageSource,
e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
.split(this)
.transform(this)
.aggregate(a -> a.processor(this, null), null)
.enrichHeaders(Collections.singletonMap("thing1", "THING1"))
.filter(this)
.handle(this)
.channel(c -> c.queue("myFlowAdapterOutput"));
}
public String messageSource() {
return "T,H,I,N,G,2";
}
@Splitter
public String[] split(String payload) {
return StringUtils.commaDelimitedListToStringArray(payload);
}
@Transformer
public String transform(String payload) {
return payload.toLowerCase();
}
@Aggregator
public String aggregate(List<String> payloads) {
return payloads.stream().collect(Collectors.joining());
}
@Filter
public boolean filter(@Header Optional<String> thing1) {
return thing1.isPresent();
}
@ServiceActivator
public String handle(String payload, @Header String thing1) {
return payload + ":" + thing1;
}
}
# 动态和运行时集成流
IntegrationFlow
及其所有相关组件都可以在运行时注册。在版本 5.0 之前,我们使用BeanFactory.registerSingleton()
钩子。从 Spring 框架5.0
开始,我们使用instanceSupplier
钩子进行程序化的BeanDefinition
注册。下面的示例显示了如何以编程方式注册 Bean:
BeanDefinition beanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
.getRawBeanDefinition();
((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);
注意,在前面的示例中,instanceSupplier
钩子是genericBeanDefinition
方法的最后一个参数,在本例中由 lambda 提供。
Bean 所有必要的初始化和生命周期都是自动完成的,就像使用标准上下文配置 Bean 定义一样。
Spring 为了简化开发体验,集成引入了IntegrationFlowContext
来在运行时注册和管理IntegrationFlow
实例,如下例所示:
@Autowired
private AbstractServerConnectionFactory server1;
@Autowired
private IntegrationFlowContext flowContext;
...
@Test
public void testTcpGateways() {
TestingUtilities.waitListening(this.server1, null);
IntegrationFlow flow = f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client1"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}
当我们有多个配置选项并且必须创建几个类似流的实例时,这是有用的。为此,我们可以迭代我们的选项,并在循环中创建和注册IntegrationFlow
实例。另一种变体是当我们的数据源不是基于 Spring 的,并且我们必须动态地创建它时。这样的示例是反应流事件源,如下例所示:
Flux<Message<?>> messageFlux =
Flux.just("1,2,3,4")
.map(v -> v.split(","))
.flatMapIterable(Arrays::asList)
.map(Integer::parseInt)
.map(GenericMessage<Integer>::new);
QueueChannel resultChannel = new QueueChannel();
IntegrationFlow integrationFlow =
IntegrationFlows.from(messageFlux)
.<Integer, Integer>transform(p -> p * 2)
.channel(resultChannel)
.get();
this.integrationFlowContext.registration(integrationFlow)
.register();
IntegrationFlowRegistrationBuilder
(作为IntegrationFlowContext.registration()
的结果)可用于为IntegrationFlow
指定一个 Bean 名称来进行注册,以控制其autoStartup
,并注册非 Spring 集成 bean。通常,这些额外的 bean 是连接工厂(AMQP、JMS、(s)FTP、TCP/UDP 等)、序列化器和反序列化器,或任何其他所需的支持组件。
你可以使用IntegrationFlowRegistration.destroy()
回调来删除动态注册的IntegrationFlow
及其所有依赖的 bean,当你不再需要它们时。有关更多信息,请参见[IntegrationFlowContext
Javadoc](https://DOCS. Spring.io/ Spring-integration/api/org/springframework/integration/dsl/context/integrationflowcontext.html)。
从版本 5.0.6 开始,在IntegrationFlow 定义中生成的所有 Bean 名称都以流 ID 作为前缀,我们建议始终指定显式的流 ID, 否则,将在 IntegrationFlowContext 中启动同步障碍,要为IntegrationFlow 生成 Bean 名称并注册其 bean。我们在这两个操作上进行同步,以避免当相同的生成的 Bean 名称可用于不同的 IntegrationFlow 实例时的竞争条件。 |
---|
另外,从版本 5.0.6 开始,Registration Builder API 有一个新的方法:useFlowIdAsPrefix()
。如果你希望声明同一流的多个实例,并在流中的组件具有相同 ID 时避免 Bean 名称冲突,这是非常有用的,如下例所示:
private void registerFlows() {
IntegrationFlowRegistration flow1 =
this.flowContext.registration(buildFlow(1234))
.id("tcp1")
.useFlowIdAsPrefix()
.register();
IntegrationFlowRegistration flow2 =
this.flowContext.registration(buildFlow(1235))
.id("tcp2")
.useFlowIdAsPrefix()
.register();
}
private IntegrationFlow buildFlow(int port) {
return f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
}
在这种情况下,用于第一个流的消息处理程序可以使用 Bean tcp1.client.handler
的名称来引用。
当你使用useFlowIdAsPrefix() 时,需要一个id 属性。 |
---|
# IntegrationFlow
作为网关
IntegrationFlow
可以从提供GatewayProxyFactoryBean
组件的服务接口开始,如下例所示:
public interface ControlBusGateway {
void send(String command);
}
...
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from(ControlBusGateway.class)
.controlBus()
.get();
}
接口方法的所有代理都提供了将消息发送到IntegrationFlow
中的下一个集成组件的通道。你可以使用@MessagingGateway
注释标记服务接口,并使用@Gateway
注释标记方法。尽管如此,requestChannel
中的下一个组件的内部通道忽略并重写了IntegrationFlow
。否则,使用IntegrationFlow
创建这样的配置是没有意义的。
默认情况下,GatewayProxyFactoryBean
会得到一个传统的 Bean 名称,例如[FLOW_BEAN_NAME.gateway]
。你可以通过使用@MessagingGateway.name()
属性或重载IntegrationFlows.from(Class<?> serviceInterface, Consumer<GatewayProxySpec> endpointConfigurer)
工厂方法来更改该 ID。此外,接口上@MessagingGateway
注释的所有属性都应用于目标GatewayProxyFactoryBean
。当注释配置不适用时,Consumer<GatewayProxySpec>
变体可用于为目标代理提供适当的选项。此 DSL 方法从版本 5.2 开始可用。
使用 Java8,你甚至可以创建带有java.util.function
接口的集成网关,如下例所示:
@Bean
public IntegrationFlow errorRecovererFlow() {
return IntegrationFlows.from(Function.class, (gateway) -> gateway.beanName("errorRecovererFunction"))
.handle((GenericHandler<?>) (p, h) -> {
throw new RuntimeException("intentional");
}, e -> e.advice(retryAdvice()))
.get();
}
errorRecovererFlow
可以如下使用:
@Autowired
@Qualifier("errorRecovererFunction")
private Function<String, String> errorRecovererFlowGateway;
# DSL 扩展
从版本 5.3 开始,引入了IntegrationFlowExtension
,以允许使用自定义或组合的 EIP 操作符扩展现有的 Java DSL。所需要的只是这个类的一个扩展,它提供了可以在IntegrationFlow
Bean 定义中使用的方法。扩展类还可以用于自定义IntegrationComponentSpec
配置;例如,可以在现有的IntegrationComponentSpec
扩展中实现错过的或默认的选项。下面的示例演示了一个复合自定义运算符和AggregatorSpec
扩展的使用,用于默认的自定义outputProcessor
:
public class CustomIntegrationFlowDefinition
extends IntegrationFlowExtension<CustomIntegrationFlowDefinition> {
public CustomIntegrationFlowDefinition upperCaseAfterSplit() {
return split()
.transform("payload.toUpperCase()");
}
public CustomIntegrationFlowDefinition customAggregate(Consumer<CustomAggregatorSpec> aggregator) {
return register(new CustomAggregatorSpec(), aggregator);
}
}
public class CustomAggregatorSpec extends AggregatorSpec {
CustomAggregatorSpec() {
outputProcessor(group ->
group.getMessages()
.stream()
.map(Message::getPayload)
.map(String.class::cast)
.collect(Collectors.joining(", ")));
}
}
对于链流方法,这些扩展中的新 DSL 操作符必须返回扩展类。这样,目标IntegrationFlow
定义将与新的和现有的 DSL 操作符一起工作:
@Bean
public IntegrationFlow customFlowDefinition() {
return
new CustomIntegrationFlowDefinition()
.log()
.upperCaseAfterSplit()
.channel("innerChannel")
.customAggregate(customAggregatorSpec ->
customAggregatorSpec.expireGroupsUponCompletion(true))
.logAndReply();
}
# 积分流组合
在 Spring 集成中,将MessageChannel
抽象作为第一类公民,始终假定集成流的组成。流中任何端点的输入通道都可以用于从任何其他端点发送消息,而不仅仅是从具有该通道作为输出的端点发送消息。此外,有了@MessagingGateway
契约、内容更丰富的组件、像<chain>
这样的复合端点,以及现在有了IntegrationFlow
bean(例如IntegrationFlowAdapter
),在更短的、可重用的部分之间分配业务逻辑就足够简单了。最后一篇作文所需要的只是关于MessageChannel
发送到或接收到的信息的知识。
从版本5.5.4
开始,为了从MessageChannel
中提取更多内容并从最终用户隐藏实现细节,IntegrationFlows
引入了from(IntegrationFlow)
工厂方法,以允许从现有流的输出中启动当前的IntegrationFlow
:
@Bean
IntegrationFlow templateSourceFlow() {
return IntegrationFlows.fromSupplier(() -> "test data")
.channel("sourceChannel")
.get();
}
@Bean
IntegrationFlow compositionMainFlow(IntegrationFlow templateSourceFlow) {
return IntegrationFlows.from(templateSourceFlow)
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("compositionMainFlowResult"))
.get();
}
另一方面,在IntegrationFlowDefinition
中添加了一个to(IntegrationFlow)
终端操作符,以继续在输入通道处的当前流的一些其他流:
@Bean
IntegrationFlow mainFlow(IntegrationFlow otherFlow) {
return f -> f
.<String, String>transform(String::toUpperCase)
.to(otherFlow);
}
@Bean
IntegrationFlow otherFlow() {
return f -> f
.<String, String>transform(p -> p + " from other flow")
.channel(c -> c.queue("otherFlowResultChannel"));
}
在流程中间的合成是可以通过现有的gateway(IntegrationFlow)
EIP 方法简单地实现的。通过这种方式,我们可以通过从更简单、可重用的逻辑块中组合流来构建具有任何复杂性的流。例如,你可以添加一个IntegrationFlow
bean 的库作为依赖项,并且它只需要将它们的配置类导入到最终的项目中并自动连接到你的IntegrationFlow
定义中即可。
← 消息传递端点 Kotlin DSL →