# Java DSL

# Java DSL

Spring 集成 Java 配置和 DSL 提供了一组方便的构建器和 Fluent API,允许你配置 Spring 来自 Spring @Configuration类的集成消息流。

(另见Kotlin DSL

用于 Spring 集成的 Java DSL 本质上是用于 Spring 集成的外观。DSL 提供了一种简单的方式,通过使用 FluentBuilder模式以及 Spring Framework 和 Spring Integration 中的现有 Java 配置,将 Spring 集成消息流嵌入到你的应用程序中。我们还使用并支持 Lambdas(在 Java8 中可用),以进一步简化 Java 配置。

cafe (opens new window)提供了使用 DSL 的一个很好的示例。

DSL 是由IntegrationFlows工厂为IntegrationFlowBuilder提供的。这将生成IntegrationFlow组件,该组件应注册为 Spring Bean(通过使用@Bean注释)。Builder 模式用于将任意复杂的结构表示为可以接受 lambda 作为参数的方法的层次结构。

IntegrationFlowBuilder只在IntegrationFlow Bean 中收集集成组件(MessageChannel实例,AbstractEndpoint实例,以此类推),用于通过IntegrationFlowBeanPostProcessor在应用程序上下文中进一步解析和注册具体 bean。

Java DSL 直接使用 Spring 集成类,并绕过任何 XML 生成和解析。然而,DSL 在 XML 之上提供的不仅仅是语法方面的优势。它最引人注目的特性之一是能够定义内联 lambda 来实现端点逻辑,从而消除了对外部类来实现自定义逻辑的需求。从某种意义上说, Spring 集成对 Spring 表达式语言和内联脚本的支持解决了这一问题,但 Lambda 更简单,功能也更强大。

下面的示例展示了如何使用 Java 配置进行 Spring 集成:

@Configuration
@EnableIntegration
public class MyConfiguration {

    @Bean
    public AtomicInteger integerSource() {
        return new AtomicInteger();
    }

    @Bean
    public IntegrationFlow myFlow() {
        return IntegrationFlows.fromSupplier(integerSource()::getAndIncrement,
                                         c -> c.poller(Pollers.fixedRate(100)))
                    .channel("inputChannel")
                    .filter((Integer p) -> p > 0)
                    .transform(Object::toString)
                    .channel(MessageChannels.queue())
                    .get();
    }
}

前面的配置示例的结果是,它在ApplicationContext启动后创建 Spring 集成端点和消息通道。Java 配置既可以用来替换 XML 配置,也可以用来增强 XML 配置。你不需要替换所有现有的 XML 配置来使用 Java 配置。

# DSL 基础知识

org.springframework.integration.dsl包包含前面提到的IntegrationFlowBuilderAPI 和许多IntegrationComponentSpec实现,它们也是构建器,并提供 Fluent API 来配置具体的端点。IntegrationFlowBuilder基础架构为基于消息的应用程序(例如通道、端点、poller 和通道拦截器)提供了常见的Enterprise 整合模式 (opens new window)

端点在 DSL 中用动词表示,以提高可读性。以下列表包括常见的 DSL 方法名称和相关的 EIP 端点:

  • 转换Transformer

  • 过滤器Filter

  • 句柄ServiceActivator

  • 分割Splitter

  • 总计Aggregator

  • RouteRouter

  • 桥梁Bridge

从概念上讲,集成过程是通过将这些端点组合成一个或多个消息流来构建的。请注意,EIP 并未正式定义术语“消息流”,但将其视为使用众所周知的消息传递模式的工作单元是有用的。DSL 提供了一个IntegrationFlow组件来定义通道和端点之间的组合,但是现在IntegrationFlow只扮演配置角色来填充应用程序上下文中的实际 bean,并且在运行时不使用。然而,对于IntegrationFlow的 Bean 可以自动连线为Lifecycle,以控制start()stop()的整个流,该流被委派给与此IntegrationFlow相关的所有 Spring 集成组件。下面的示例使用IntegrationFlows工厂,通过使用IntegrationFlowBuilder中的 eip-methods 来定义IntegrationFlow Bean:

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlows.from("input")
            .<String, Integer>transform(Integer::parseInt)
            .get();
}

transform方法接受一个 lambda 作为端点参数来对消息有效负载进行操作。该方法的实参为GenericTransformer<S, T>。因此,所提供的任何变压器(ObjectToJsonTransformerFileToStringTransformer等)都可以在此使用。

在封面下,IntegrationFlowBuilder识别MessageHandler及其端点,分别为MessageTransformingHandlerConsumerEndpointFactoryBean。再举一个例子:

@Bean
public IntegrationFlow myFlow() {
    return IntegrationFlows.from("input")
                .filter("World"::equals)
                .transform("Hello "::concat)
                .handle(System.out::println)
                .get();
}

前面的示例构成了Filter → Transformer → Service Activator的序列。流程是“单向”。也就是说,它不提供回复消息,而只将有效负载打印到 Stdout。端点通过直接通道自动连接在一起。

lambdas 和Message<?>参数

在 EIP 方法中使用 lambdas 时,“input”参数通常是消息有效载荷。
如果你希望访问整个消息,请使用一个重载方法,该方法将Class<?>作为第一个参数。,例如,这将不起作用:

<br/>.<Message<?>, Foo>transform(m -> newFooFromMessage(m))<br/>

这将在运行时使用ClassCastException时失败,因为 lambda 不保留参数类型,并且框架将尝试将有效负载强制转换为

,而是使用:
r=“112”/><gt="96"/>
Bean 定义重写

该 Java DSL 可以为在流定义中在线定义的对象注册 bean,并且可以重用现有的、注入的 bean。
在为在线对象定义的名称与现有的 Bean 定义相同的情况下,抛出一个BeanDefinitionOverrideException,表示这样的配置是错误的。
但是,当你处理prototypebean 时,没有办法从集成流处理器中检测到现有的 Bean 定义,因为每次我们从BeanFactory调用prototype Bean 时,都会得到一个新的实例。
这样,在IntegrationFlow中就会使用所提供的实例。正如没有任何 Bean 注册和任何可能的检查现有的prototype Bean 定义。
但是BeanFactory.initializeBean()是为这个对象调用的,如果它有一个显式的id并且 Bean 这个名称的定义在prototype范围内。

# 消息通道

除了带有 EIP 方法的IntegrationFlowBuilder之外,Java DSL 还提供了一个 Fluent API 来配置MessageChannel实例。为此,提供了MessageChannels建设者工厂。下面的示例展示了如何使用它:

@Bean
public MessageChannel priorityChannel() {
    return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
                        .interceptor(wireTap())
                        .get();
}

相同的MessageChannelsBuilder Factory 可以在channel()EIP 方法中从IntegrationFlowBuilder连接到端点,类似于在 XML 配置中连接一个input-channel/output-channel对。默认情况下,端点用DirectChannel实例连接,其中 Bean 名称基于以下模式:[IntegrationFlow.beanName].channel#[channelNameIndex]。此规则也适用于 inlineMessageChannelsBuilder Factory Usage 产生的未命名通道。但是,所有MessageChannels方法都有一个知道channelId的变体,你可以使用它为MessageChannel实例设置 Bean 名称。MessageChannel引用和beanName引用可以用作 Bean-方法调用。下面的示例显示了使用channel()EIP 方法的可能方法:

@Bean
public MessageChannel queueChannel() {
    return MessageChannels.queue().get();
}

@Bean
public MessageChannel publishSubscribe() {
    return MessageChannels.publishSubscribe().get();
}

@Bean
public IntegrationFlow channelFlow() {
    return IntegrationFlows.from("input")
                .fixedSubscriberChannel()
                .channel("queueChannel")
                .channel(publishSubscribe())
                .channel(MessageChannels.executor("executorChannel", this.taskExecutor))
                .channel("output")
                .get();
}
  • from("input")表示“用”输入“ID 查找并使用MessageChannel,或者创建一个”。

  • fixedSubscriberChannel()生成FixedSubscriberChannel的实例,并将其注册为channelFlow.channel#0

  • channel("queueChannel")的工作方式相同,但使用现有的queueChannel Bean。

  • channel(publishSubscribe())是 Bean-方法引用。

  • channel(MessageChannels.executor("executorChannel", this.taskExecutor))是将IntegrationFlowBuilder暴露给IntegrationComponentSpecExecutorChannel并将其注册为executorChannelIntegrationFlowBuilder

  • channel("output")DirectChannel Bean 为名称注册DirectChannel Bean,只要没有这个名称的 bean 已经存在。

注意:前面的IntegrationFlow定义是有效的,它的所有通道都应用于具有BridgeHandler实例的端点。

注意通过MessageChannels工厂从不同的IntegrationFlow实例使用相同的内联通道定义。
即使 DSL 解析器将不存在的对象注册为 bean,也无法从不同的IntegrationFlow容器中确定相同的对象(MessageChannel)。
以下示例错误:
@Bean
public IntegrationFlow startFlow() {
    return IntegrationFlows.from("input")
                .transform(...)
                .channel(MessageChannels.queue("queueChannel"))
                .get();
}

@Bean
public IntegrationFlow endFlow() {
    return IntegrationFlows.from(MessageChannels.queue("queueChannel"))
                .handle(...)
                .get();
}

这个坏例子的结果是出现了以下例外:

Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
     there is already object [queueChannel] bound
	    at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)

要使其工作,你需要为该通道声明@Bean,并从不同的IntegrationFlow实例中使用其 Bean 方法。

# 调查者

Spring 集成还提供了一个 Fluent API,它允许你为PollerMetadata实现配置AbstractPollingEndpoint。你可以使用PollersBuilder 工厂来配置公共 Bean 定义或从IntegrationFlowBuilderEIP 方法创建的定义,如下例所示:

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
    return Pollers.fixedRate(500)
        .errorChannel("myErrors");
}

参见[Pollers](https://DOCS. Spring.io/ Spring-integration/api/org/springframework/integration/dsl/pollers.html)和[PollerSpec](https://DOCS. Spring.io/ Spring-integration/api/org/springframework/integration/dsl/pollerspec.html)中的 Javadoc 以获取更多信息。

如果使用 DSL 构造PollerSpec作为@Bean,请不要调用 Bean 定义中的get()方法。
PollerSpec是一个FactoryBean,它从规范中生成PollerMetadata对象并初始化其所有属性。

# `端点

从版本 5.5 开始,ConsumerEndpointSpec提供了一个reactive()配置属性,并带有一个可选的定制程序Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>。此选项将目标端点配置为ReactiveStreamsConsumer实例,独立于输入通道类型,该类型通过IntegrationReactiveUtils.messageChannelToFlux()转换为Flux。所提供的函数是使用来自Flux.transform()的操作符来自定义(publishOn()log()doOnNext()等)来自输入通道的反应流的源。

下面的示例演示了如何将发布线程从独立于最终订阅者和生产者的输入通道更改为DirectChannel:

@Bean
public IntegrationFlow reactiveEndpointFlow() {
    return IntegrationFlows
            .from("inputChannel")
            .<String, Integer>transform(Integer::parseInt,
                    e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())))
            .get();
}

有关更多信息,请参见反应流支持

# DSL 和端点配置

所有IntegrationFlowBuilderEIP 方法都有一个变体,该变体应用 lambda 参数为AbstractEndpoint实例提供选项:SmartLifecyclePollerMetadatarequest-handler-advice-chain和其他实例。每个参数都有通用参数,因此它允许你在上下文中配置一个端点,甚至它的MessageHandler,如下例所示:

@Bean
public IntegrationFlow flow2() {
    return IntegrationFlows.from(this.inputChannel)
                .transform(new PayloadSerializingTransformer(),
                       c -> c.autoStartup(false).id("payloadSerializingTransformer"))
                .transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice()))
                .get();
}

此外,EndpointSpec提供了一个id()方法,使你可以用给定的 Bean 名称注册端点 Bean,而不是生成的名称。

如果MessageHandler被引用为 Bean,那么如果 DSL 定义中存在.advice()方法,则任何现有的adviceChain配置都将被重写:

@Bean
public TcpOutboundGateway tcpOut() {
    TcpOutboundGateway gateway = new TcpOutboundGateway();
    gateway.setConnectionFactory(cf());
    gateway.setAdviceChain(Collections.singletonList(fooAdvice()));
    return gateway;
}

@Bean
public IntegrationFlow clientTcpFlow() {
    return f -> f
        .handle(tcpOut(), e -> e.advice(testAdvice()))
        .transform(Transformers.objectToString());
}

即它们不是合并的,在这种情况下只使用testAdvice() Bean。

# 变形金刚

DSL API 提供了一个方便的、fluentTransformers工厂,可用作.transform()EIP 方法中的内联目标对象定义。下面的示例展示了如何使用它:

@Bean
public IntegrationFlow transformFlow() {
    return IntegrationFlows.from("input")
            .transform(Transformers.fromJson(MyPojo.class))
            .transform(Transformers.serializer())
            .get();
}

它避免了使用 setter 进行不方便的编码,并使流定义更加简单。请注意,你可以使用Transformers将目标Transformer实例声明为@Bean实例,并再次将它们从IntegrationFlow定义为 Bean 方法。尽管如此,DSL 解析器处理 Bean 内联对象的声明,如果它们还没有被定义为 bean 的话。

有关更多信息和支持的工厂方法,请参见 Javadoc 中的变形金刚 (opens new window)

另请参见[lambdas 和Message<?>参数]。

# 入站通道适配器

通常,消息流从入站通道适配器开始(例如<int-jdbc:inbound-channel-adapter>)。适配器配置为<poller>,并要求MessageSource<?>定期生成消息。Java DSL 也允许从MessageSource<?>开始IntegrationFlow。为此,IntegrationFlowsBuilder 工厂提供了一个重载的IntegrationFlows.from(MessageSource<?> messageSource)方法。你可以将MessageSource<?>配置为 Bean,并将其作为该方法的参数。IntegrationFlows.from()的第二个参数是一个Consumer<SourcePollingChannelAdapterSpec>lambda,它允许你为PollerMetadataSmartLifecycle提供选项。下面的示例展示了如何使用 Fluent API 和 lambda 来创建IntegrationFlow:

@Bean
public MessageSource<Object> jdbcMessageSource() {
    return new JdbcPollingChannelAdapter(this.dataSource, "SELECT * FROM something");
}

@Bean
public IntegrationFlow pollingFlow() {
    return IntegrationFlows.from(jdbcMessageSource(),
                c -> c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1)))
            .transform(Transformers.toJson())
            .channel("furtherProcessChannel")
            .get();
}

对于那些不需要直接构建Message对象的情况,可以使用基于java.util.function.SupplierIntegrationFlows.fromSupplier()变体。Supplier.get()的结果会自动包装在Message中(如果它还不是Message)。

# 消息路由器

Spring 集成原生地提供了专门的路由器类型,包括:

  • HeaderValueRouter

  • PayloadTypeRouter

  • ExceptionTypeRouter

  • RecipientListRouter

  • XPathRouter

与许多其他 DSLIntegrationFlowBuilderEIP 方法一样,route()方法可以应用任何AbstractMessageRouter实现,或者为了方便起见,将String作为 spel 表达式或ref-method对。此外,你可以使用 lambda 配置route(),并使用 lambda 实现Consumer<RouterSpec<MethodInvokingRouter>>。Fluent API 还提供AbstractMappingMessageRouter选项,如channelMapping(String key, String channelName)对,如下例所示:

@Bean
public IntegrationFlow routeFlowByLambda() {
    return IntegrationFlows.from("routerInput")
            .<Integer, Boolean>route(p -> p % 2 == 0,
                    m -> m.suffix("Channel")
                            .channelMapping(true, "even")
                            .channelMapping(false, "odd")
            )
            .get();
}

下面的示例展示了一个简单的基于表达式的路由器:

@Bean
public IntegrationFlow routeFlowByExpression() {
    return IntegrationFlows.from("routerInput")
            .route("headers['destChannel']")
            .get();
}

routeToRecipients()方法接受Consumer<RecipientListRouterSpec>,如下例所示:

@Bean
public IntegrationFlow recipientListFlow() {
    return IntegrationFlows.from("recipientListInput")
            .<String, String>transform(p -> p.replaceFirst("Payload", ""))
            .routeToRecipients(r -> r
                    .recipient("thing1-channel", "'thing1' == payload")
                    .recipientMessageSelector("thing2-channel", m ->
                            m.getHeaders().containsKey("recipient")
                                    && (boolean) m.getHeaders().get("recipient"))
                    .recipientFlow("'thing1' == payload or 'thing2' == payload or 'thing3' == payload",
                            f -> f.<String, String>transform(String::toUpperCase)
                                    .channel(c -> c.queue("recipientListSubFlow1Result")))
                    .recipientFlow((String p) -> p.startsWith("thing3"),
                            f -> f.transform("Hello "::concat)
                                    .channel(c -> c.queue("recipientListSubFlow2Result")))
                    .recipientFlow(new FunctionExpression<Message<?>>(m ->
                                    "thing3".equals(m.getPayload())),
                            f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
                    .defaultOutputToParentFlow())
            .get();
}

.routeToRecipients()定义的.defaultOutputToParentFlow()允许你将路由器的defaultOutput设置为网关,以继续处理主流中不匹配的消息。

另请参见[lambdas 和Message<?>参数]。

# 分离器

要创建拆分器,请使用split()EIP 方法。默认情况下,如果有效负载是IterableIteratorArrayStream或活性Publisher,则split()方法将每个项输出为单独的消息。它接受 lambda、spel 表达式或任何AbstractMessageSplitter实现。或者,你可以使用它而不需要参数来提供DefaultMessageSplitter。下面的示例展示了如何通过提供 lambda 来使用split()方法:

@Bean
public IntegrationFlow splitFlow() {
    return IntegrationFlows.from("splitInput")
              .split(s -> s.applySequence(false).delimiters(","))
              .channel(MessageChannels.executor(taskExecutor()))
              .get();
}

前面的示例创建了一个拆分器,该拆分包含以逗号分隔的String的消息。

另请参见[lambdas 和Message<?>参数]。

# 聚合器和重排序程序

Aggregator在概念上与Splitter相反。它将单个消息的序列聚合到单个消息中,并且必然更复杂。默认情况下,聚合器返回一条包含来自传入消息的有效负载集合的消息。同样的规则也适用于Resequencer。下面的示例展示了拆分器-聚合器模式的典型示例:

@Bean
public IntegrationFlow splitAggregateFlow() {
    return IntegrationFlows.from("splitAggregateInput")
            .split()
            .channel(MessageChannels.executor(this.taskExecutor()))
            .resequence()
            .aggregate()
            .get();
}

split()方法将列表分割为单个消息,并将它们发送到ExecutorChannelresequence()方法根据消息标题中的序列详细信息对消息进行重新排序。aggregate()方法收集这些消息。

但是,你可以通过指定发布策略和相关策略等来更改默认行为。考虑以下示例:

.aggregate(a ->
        a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
            .releaseStrategy(g -> g.size() > 10)
            .messageStore(messageStore()))

前面的示例将具有myCorrelationKey头的消息关联起来,并在至少积累了 10 个头的情况下释放这些消息。

对于resequence()EIP 方法,提供了类似的 lambda 配置。

# `方法

.handle()EIP 方法的目标是在某些 POJO 上调用任何MessageHandler实现或任何方法。另一种选择是使用 lambda 表达式来定义“活动”。因此,我们引入了一个通用的GenericHandler<P>功能接口。它的handle方法需要两个参数:P payloadMessageHeaders headers(从版本 5.1 开始)。有了这一点,我们可以将流定义如下:

@Bean
public IntegrationFlow myFlow() {
    return IntegrationFlows.from("flow3Input")
        .<Integer>handle((p, h) -> p * 2)
        .get();
}

前面的示例将它接收到的任何整数加倍。

然而, Spring 集成的一个主要目标是loose coupling,通过从消息有效负载到消息处理程序的目标参数的运行时类型转换。由于 Java 不支持 lambda 类的泛型类型解析,因此我们引入了一个解决方案,为大多数 EIP 方法和LambdaMessageProcessor提供了一个额外的payloadType参数。这样就将困难的转换工作委托给 Spring 的ConversionService,它使用提供的type和请求的消息来实现目标方法参数。下面的示例显示了结果IntegrationFlow可能是什么样子的:

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlows.from("input")
            .<byte[], String>transform(p - > new String(p, "UTF-8"))
            .handle(Integer.class, (p, h) -> p * 2)
            .get();
}

我们还可以在ConversionService中注册一些BytesToIntegerConverter,以消除附加的.transform():

@Bean
@IntegrationConverter
public BytesToIntegerConverter bytesToIntegerConverter() {
   return new BytesToIntegerConverter();
}

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlows.from("input")
             .handle(Integer.class, (p, h) -> p * 2)
            .get();
}

另请参见[lambdas 和Message<?>参数]。

# 运营商网关()

IntegrationFlow定义中的gateway()运算符是一种特殊的服务激活器实现,通过其输入通道调用一些其他端点或集成流并等待应答。从技术上讲,它在<chain>定义中扮演与嵌套<gateway>组件相同的角色(参见从一条链子中调用一条链子),并允许流更干净、更直接。从逻辑上和业务角度来看,它是一个消息传递网关,允许在目标集成解决方案的不同部分之间分配和重用功能(参见消息传递网关)。对于不同的目标,这个操作符有几个重载:

  • gateway(String requestChannel)按其名称向某个端点的输入通道发送消息;

  • gateway(MessageChannel requestChannel)通过直接注入向某个端点的输入通道发送消息;

  • gateway(IntegrationFlow flow)向所提供的输入通道发送消息IntegrationFlow

所有这些都具有一个具有第二个Consumer<GatewayEndpointSpec>参数的变体,以配置目标GatewayMessageHandler和相应的AbstractEndpoint。此外,基于IntegrationFlow的方法还允许调用现有的IntegrationFlow Bean 或通过用于IntegrationFlow功能接口的就地 lambda 将流声明为子流,或者在private方法中将其提取为更干净的代码样式:

@Bean
IntegrationFlow someFlow() {
        return IntegrationFlows
                .from(...)
                .gateway(subFlow())
                .handle(...)
                .get();
}

private static IntegrationFlow subFlow() {
        return f -> f
                .scatterGather(s -> s.recipientFlow(...),
                        g -> g.outputProcessor(MessageGroup::getOne))
}
如果下游流并不总是返回一个响应,你应该将requestTimeout设置为 0,以防止无限期地挂起调用线程。
在这种情况下,流将在该点结束,线程将被释放以进行进一步的工作。

# 运算符日志()

为了方便起见,为了记录通过 Spring 集成流(<logging-channel-adapter>)的消息行程,呈现了一个log()操作符。在内部,它由WireTap``ChannelInterceptor表示,其订阅服务器为LoggingHandler。它负责将传入消息记录到下一个端点或当前通道中。下面的示例展示了如何使用LoggingHandler:

.filter(...)
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(...)

在前面的示例中,id头在ERROR级别记录到test.category中,仅用于传递过滤器和路由之前的消息。

当这个操作符在流程的末尾使用时,它是单向处理程序,流程结束。要使其成为一个产生回复的流,你可以在log()之后使用一个简单的bridge(),或者,从版本 5.1 开始,你可以使用一个logAndReply()操作符。logAndReply只能在流的末尾使用。

# 操作符截获()

从版本 5.3 开始,intercept()操作符允许在流程中的当前ChannelInterceptor处注册一个或多个ChannelInterceptor实例。这是通过MessageChannelsAPI 创建显式MessageChannelAPI 的一种替代方法。下面的示例使用MessageSelectingInterceptor来拒绝某些异常消息:

.transform(...)
.intercept(new MessageSelectingInterceptor(m -> m.getPayload().isValid()))
.handle(...)

# `

Spring 集成包括.wireTap()Fluent APIMessageChannelSpecBuilders。下面的示例展示了如何使用wireTap方法记录输入:

@Bean
public QueueChannelSpec myChannel() {
    return MessageChannels.queue()
            .wireTap("loggingFlow.input");
}

@Bean
public IntegrationFlow loggingFlow() {
    return f -> f.log();
}
如果MessageChannelInterceptableChannel的实例,则将log()wireTap()intercept()运算符应用于当前的MessageChannel
否则,将向当前配置的端点的流中注入一个中间件DirectChannel。在下面的示例中,将WireTap拦截器直接添加到myChannel中,因为DirectChannel实现了InterceptableChannel:

<br/>@Bean<br/>MessageChannel myChannel() {<br/> return new DirectChannel();<br/>}<br/><br/>...<br/> .channel(myChannel())<br/> .log()<br/>}<br/>

当当前的MessageChannel不实现InterceptableChannel时,一个隐式的DirectChannelBridgeHandler被注入到IntegrationFlow中,并且WireTap被添加到这个新的DirectChannel中。以下示例没有任何通道声明:

.handle(...)
.log()
}

在前面的示例中(并且在没有信道被声明的任何时间),隐式的DirectChannel被注入在IntegrationFlow的当前位置中,并用作当前配置的ServiceActivatingHandler的输出信道(来自.handle()前面描述的)。

# 处理消息流

IntegrationFlowBuilder提供了一个顶级 API 来生成连接到消息流的集成组件。当你的集成可以用单个流(通常是这种情况)来完成时,这是很方便的。交替地IntegrationFlow实例可以通过MessageChannel实例进行连接。

默认情况下,MessageFlow在 Spring 集成术语中表现为“链”。也就是说,端点由DirectChannel实例自动隐式连接。消息流实际上并不是作为一个链构建的,这提供了更多的灵活性。例如,如果你知道它的inputChannel名称(也就是说,如果你显式地定义了它),那么你可以向流中的任何组件发送消息。你还可以在流中引用外部定义的通道,以允许使用通道适配器(以启用远程传输协议、文件 I/O 等),而不是直接通道。因此,DSL 不支持 Spring Integrationchain元素,因为在这种情况下它不会增加太多的值。

由于 Spring 集成 Java DSL 产生了与任何其他配置选项相同的 Bean 定义模型,并且是基于现有的 Spring 框架@Configuration基础设施的,因此可以与 XML 定义一起使用并有线 Spring 集成消息注释配置。

你还可以使用 lambda 定义直接IntegrationFlow实例。下面的示例展示了如何做到这一点:

@Bean
public IntegrationFlow lambdaFlow() {
    return f -> f.filter("World"::equals)
                   .transform("Hello "::concat)
                   .handle(System.out::println);
}

这个定义的结果是使用隐式直接通道连接的同一组集成组件。这里的唯一限制是,这个流是从一个命名的直接通道开始的-lambdaFlow.input。此外,lambda 流不能从MessageSourceMessageProducer开始。

从版本 5.1 开始,这种IntegrationFlow被包装到代理,以公开生命周期控制,并提供对内部关联inputChannelinputChannel的访问。

从版本 5.0.6 开始,在IntegrationFlow中为组件生成的 Bean 名称包括流 Bean,后面跟着一个点(.)作为前缀。例如,前面示例中的ConsumerEndpointFactoryBean.transform("Hello "::concat)的结果是 Bean 名称lambdaFlow.o.s.i.config.ConsumerEndpointFactoryBean#0。(o.s.i是从org.springframework.integration到适合页面的缩写。)该端点的Transformer实现 Bean 的 Bean 名称为lambdaFlow.transformer#0(从版本 5.1 开始),其中使用的是其组件类型,而不是MethodInvokingTransformer类的完全限定名称。当必须在流中生成 Bean 名称时,对所有NamedComponents 应用相同的模式。 Bean 这些生成的名称与流 ID 一起前置,以用于诸如解析日志或在某些分析工具中将组件分组在一起的目的,以及在运行时并发注册集成流时避免竞争条件。有关更多信息,请参见动态和运行时集成流

# FunctionExpression

我们引入了FunctionExpression类(spel 的Expression接口的一种实现)来让我们使用 lambdas 和generics。当存在来自核心 Spring 集成的隐式Strategy变量时,将为 DSL 组件提供Function<T, R>选项以及expression选项。下面的示例展示了如何使用函数表达式:

.enrich(e -> e.requestChannel("enrichChannel")
            .requestPayload(Message::getPayload)
            .propertyFunction("date", m -> new Date()))

FunctionExpression还支持运行时类型转换,就像SpelExpression中所做的那样。

# 子流支持

一些if…​elsepublish-subscribe组件提供了通过使用子流指定其逻辑或映射的能力。最简单的示例是.publishSubscribeChannel(),如下例所示:

@Bean
public IntegrationFlow subscribersFlow() {
    return flow -> flow
            .publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p / 2)
                            .channel(c -> c.queue("subscriber1Results")))
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p * 2)
                            .channel(c -> c.queue("subscriber2Results"))))
            .<Integer>handle((p, h) -> p * 3)
            .channel(c -> c.queue("subscriber3Results"));
}

你可以使用单独的IntegrationFlow``@Bean定义来实现相同的结果,但是我们希望你发现逻辑组合的子流样式是有用的。我们发现,这会导致代码更短(因此也更可读)。

从版本 5.3 开始,提供了一个基于BroadcastCapableChannelpublishSubscribeChannel()实现,以在代理支持的消息通道上配置子流订阅服务器。例如,我们现在可以将几个订阅服务器配置为Jms.publishSubscribeChannel()上的子流:

@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
    return Jms.publishSubscribeChannel(jmsConnectionFactory())
                .destination("pubsub")
                .get();
}

@Bean
public IntegrationFlow pubSubFlow() {
    return f -> f
            .publishSubscribeChannel(jmsPublishSubscribeChannel(),
                    pubsub -> pubsub
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel1")))
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}

@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel(ConnectionFactory jmsConnectionFactory) {
    return (BroadcastCapableChannel) Jms.publishSubscribeChannel(jmsConnectionFactory)
            .destination("pubsub")
            .get();
}

类似的publish-subscribe子流组合提供了.routeToRecipients()方法。

另一个例子是在.filter()方法上使用.discardFlow()而不是.discardChannel()

.route()值得特别关注。考虑以下示例:

@Bean
public IntegrationFlow routeFlow() {
    return f -> f
            .<Integer, Boolean>route(p -> p % 2 == 0,
                    m -> m.channelMapping("true", "evenChannel")
                            .subFlowMapping("false", sf ->
                                    sf.<Integer>handle((p, h) -> p * 3)))
            .transform(Object::toString)
            .channel(c -> c.queue("oddChannel"));
}

.channelMapping()继续工作,就像在常规Router映射中一样,但是.subFlowMapping()将该子流绑定到主流。换句话说,任何路由器的子流在.route()之后返回到主流。

有时,你需要从.subFlowMapping()中引用一个已存在的IntegrationFlow``@Bean
下面的示例展示了如何这样做:
<br/>@Bean<br/>public IntegrationFlow splitRouteAggregate() {<br/> return f -> f<br/> .split()<br/> .<Integer, Boolean>route(o -> o % 2 == 0,<br/> m -> m<br/> .subFlowMapping(true, oddFlow())<br/> .subFlowMapping(false, sf -> sf.gateway(evenFlow())))<br/> .aggregate();<br/>}<br/><br/>@Bean<br/>public IntegrationFlow oddFlow() {<br/> return f -> f.handle(m -> System.out.println("odd"));<br/>}<br/><br/>@Bean<br/>public IntegrationFlow evenFlow() {<br/> return f -> f.handle((p, h) -> "even");<br/>}<br/>
在这种情况下,当你需要从这样的子流收到回复并继续主流时,这个IntegrationFlow Bean 引用(或其输入通道)必须用.gateway()进行包装,如前面的示例所示。
前面示例中的oddFlow()引用没有包装到.gateway()
因此,我们不期望从这个路由分支得到答复。
否则,你最终会遇到一个类似于以下情况的异常:

<br/>Caused by: org.springframework.beans.factory.BeanCreationException:<br/> The 'currentComponent' ([email protected]a51c)<br/> is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'.<br/> This is the end of the integration flow.<br/>

当你将一个子流配置为 lambda 时,框架将处理与子流的请求-回复交互,并且不需要网关。

子流可以嵌套到任何深度,但我们不建议这样做。事实上,即使是在路由器的情况下,在一个流中添加复杂的子流,很快就会开始看起来像一盘意大利面条,并且对于人类来说很难进行解析。

在 DSL 支持子流配置的情况下,当所配置的组件通常需要一个通道,并且该子流以channel()元素开始时,框架会隐式地在组件输出通道和流的输入通道之间放置一个bridge()。,例如,在这个filter定义中:

<br/>.filter(p -> p instanceof String, e -> e<br/> .discardFlow(df -> df<br/> .channel(MessageChannels.queue())<br/> ...)<br/>

框架内部创建了一个DirectChannel Bean 用于注入MessageFilter.discardChannel
然后它将子流包装到一个IntegrationFlow中,从这个隐式通道开始,用于订阅并在流中指定的channel()之前放置一个bridge
当现有的IntegrationFlow Bean 被用作子流引用(而不是内置子流,例如 lambda)时,没有这样的桥需要,因为框架可以解析来自第一通道的流 Bean。具有内联子流的输入通道尚不可用。

# 使用协议适配器

到目前为止所示的所有示例都说明了 DSL 如何通过使用 Spring 集成编程模型来支持消息传递体系结构。然而,我们还没有进行任何真正的整合。这样做需要通过 HTTP、JMS、AMQP、TCP、JDBC、FTP、SMTP 等访问远程资源,或者访问本地文件系统。 Spring 集成支持所有这些和更多内容。理想情况下,DSL 应该为所有这些功能提供一流的支持,但是实现所有这些功能并随着新的适配器被添加到 Spring 集成中而跟上,这是一项艰巨的任务。因此,期望是 DSL 不断赶上 Spring 集成。

因此,我们提供了高级 API 来无缝地定义特定于协议的消息传递。我们用工厂和建造商的模式以及 lambdas 来实现这一点。你可以将工厂类视为“命名空间工厂”,因为它们与来自具体协议特定的 Spring 集成模块的组件的 XML 命名空间扮演相同的角色。目前, Spring 集成 Java DSL 支持AmqpFeedJmsFiles(S)FtpHttpJPATCP/UDPMail、和Scripts名称空间工厂。下面的示例显示了如何使用其中的三个(AmqpJmsMail):

@Bean
public IntegrationFlow amqpFlow() {
    return IntegrationFlows.from(Amqp.inboundGateway(this.rabbitConnectionFactory, queue()))
            .transform("hello "::concat)
            .transform(String.class, String::toUpperCase)
            .get();
}

@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
    return IntegrationFlows.from("jmsOutboundGatewayChannel")
            .handle(Jms.outboundGateway(this.jmsConnectionFactory)
                        .replyContainer(c ->
                                    c.concurrentConsumers(3)
                                            .sessionTransacted(true))
                        .requestDestination("jmsPipelineTest"))
            .get();
}

@Bean
public IntegrationFlow sendMailFlow() {
    return IntegrationFlows.from("sendMailChannel")
            .handle(Mail.outboundAdapter("localhost")
                            .port(smtpPort)
                            .credentials("user", "pw")
                            .protocol("smtp")
                            .javaMailProperties(p -> p.put("mail.debug", "true")),
                    e -> e.id("sendMailEndpoint"))
            .get();
}

前面的示例展示了如何使用“名称空间工厂”作为内联适配器声明。但是,你可以从@Bean定义中使用它们,以使IntegrationFlow方法链更具可读性。

我们正在征求社区对这些命名空间工厂的反馈意见,然后再将精力花在其他工厂上。
我们还感谢对优先级排序的任何输入,我们下一步应该支持哪些适配器和网关。

你可以在整个参考手册中的特定于协议的章节中找到更多的 Java DSL 示例。

所有其他协议通道适配器可以被配置为通用 bean 并连接到IntegrationFlow,如下例所示:

@Bean
public QueueChannelSpec wrongMessagesChannel() {
    return MessageChannels
            .queue()
            .wireTap("wrongMessagesWireTapChannel");
}

@Bean
public IntegrationFlow xpathFlow(MessageChannel wrongMessagesChannel) {
    return IntegrationFlows.from("inputChannel")
            .filter(new StringValueTestXPathMessageSelector("namespace-uri(/*)", "my:namespace"),
                    e -> e.discardChannel(wrongMessagesChannel))
            .log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
            .route(xpathRouter(wrongMessagesChannel))
            .get();
}

@Bean
public AbstractMappingMessageRouter xpathRouter(MessageChannel wrongMessagesChannel) {
    XPathRouter router = new XPathRouter("local-name(/*)");
    router.setEvaluateAsString(true);
    router.setResolutionRequired(false);
    router.setDefaultOutputChannel(wrongMessagesChannel);
    router.setChannelMapping("Tags", "splittingChannel");
    router.setChannelMapping("Tag", "receivedChannel");
    return router;
}

# IntegrationFlowAdapter

IntegrationFlow接口可以直接实现并指定为用于扫描的组件,如下例所示:

@Component
public class MyFlow implements IntegrationFlow {

    @Override
    public void configure(IntegrationFlowDefinition<?> f) {
        f.<String, String>transform(String::toUpperCase);
    }

}

它由IntegrationFlowBeanPostProcessor拾取,并在应用程序上下文中进行了正确的解析和注册。

为了方便和获得松耦合体系结构的好处,我们提供了IntegrationFlowAdapter基类实现。它需要一个buildFlow()方法实现来通过使用from()方法中的一个来生成IntegrationFlowDefinition方法,如下例所示:

@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {

    private final AtomicBoolean invoked = new AtomicBoolean();

    public Date nextExecutionTime(TriggerContext triggerContext) {
          return this.invoked.getAndSet(true) ? null : new Date();
    }

    @Override
    protected IntegrationFlowDefinition<?> buildFlow() {
        return from(this::messageSource,
                      e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
                 .split(this)
                 .transform(this)
                 .aggregate(a -> a.processor(this, null), null)
                 .enrichHeaders(Collections.singletonMap("thing1", "THING1"))
                 .filter(this)
                 .handle(this)
                 .channel(c -> c.queue("myFlowAdapterOutput"));
    }

    public String messageSource() {
         return "T,H,I,N,G,2";
    }

    @Splitter
    public String[] split(String payload) {
         return StringUtils.commaDelimitedListToStringArray(payload);
    }

    @Transformer
    public String transform(String payload) {
         return payload.toLowerCase();
    }

    @Aggregator
    public String aggregate(List<String> payloads) {
           return payloads.stream().collect(Collectors.joining());
    }

    @Filter
    public boolean filter(@Header Optional<String> thing1) {
            return thing1.isPresent();
    }

    @ServiceActivator
    public String handle(String payload, @Header String thing1) {
           return payload + ":" + thing1;
    }

}

# 动态和运行时集成流

IntegrationFlow及其所有相关组件都可以在运行时注册。在版本 5.0 之前,我们使用BeanFactory.registerSingleton()钩子。从 Spring 框架5.0开始,我们使用instanceSupplier钩子进行程序化的BeanDefinition注册。下面的示例显示了如何以编程方式注册 Bean:

BeanDefinition beanDefinition =
         BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
               .getRawBeanDefinition();

((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);

注意,在前面的示例中,instanceSupplier钩子是genericBeanDefinition方法的最后一个参数,在本例中由 lambda 提供。

Bean 所有必要的初始化和生命周期都是自动完成的,就像使用标准上下文配置 Bean 定义一样。

Spring 为了简化开发体验,集成引入了IntegrationFlowContext来在运行时注册和管理IntegrationFlow实例,如下例所示:

@Autowired
private AbstractServerConnectionFactory server1;

@Autowired
private IntegrationFlowContext flowContext;

...

@Test
public void testTcpGateways() {
    TestingUtilities.waitListening(this.server1, null);

    IntegrationFlow flow = f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client1"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());

    IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
    assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}

当我们有多个配置选项并且必须创建几个类似流的实例时,这是有用的。为此,我们可以迭代我们的选项,并在循环中创建和注册IntegrationFlow实例。另一种变体是当我们的数据源不是基于 Spring 的,并且我们必须动态地创建它时。这样的示例是反应流事件源,如下例所示:

Flux<Message<?>> messageFlux =
    Flux.just("1,2,3,4")
        .map(v -> v.split(","))
        .flatMapIterable(Arrays::asList)
        .map(Integer::parseInt)
        .map(GenericMessage<Integer>::new);

QueueChannel resultChannel = new QueueChannel();

IntegrationFlow integrationFlow =
    IntegrationFlows.from(messageFlux)
        .<Integer, Integer>transform(p -> p * 2)
        .channel(resultChannel)
        .get();

this.integrationFlowContext.registration(integrationFlow)
            .register();

IntegrationFlowRegistrationBuilder(作为IntegrationFlowContext.registration()的结果)可用于为IntegrationFlow指定一个 Bean 名称来进行注册,以控制其autoStartup,并注册非 Spring 集成 bean。通常,这些额外的 bean 是连接工厂(AMQP、JMS、(s)FTP、TCP/UDP 等)、序列化器和反序列化器,或任何其他所需的支持组件。

你可以使用IntegrationFlowRegistration.destroy()回调来删除动态注册的IntegrationFlow及其所有依赖的 bean,当你不再需要它们时。有关更多信息,请参见[IntegrationFlowContextJavadoc](https://DOCS. Spring.io/ Spring-integration/api/org/springframework/integration/dsl/context/integrationflowcontext.html)。

从版本 5.0.6 开始,在IntegrationFlow定义中生成的所有 Bean 名称都以流 ID 作为前缀,
我们建议始终指定显式的流 ID,
否则,将在IntegrationFlowContext中启动同步障碍,要为IntegrationFlow生成 Bean 名称并注册其 bean。
我们在这两个操作上进行同步,以避免当相同的生成的 Bean 名称可用于不同的IntegrationFlow实例时的竞争条件。

另外,从版本 5.0.6 开始,Registration Builder API 有一个新的方法:useFlowIdAsPrefix()。如果你希望声明同一流的多个实例,并在流中的组件具有相同 ID 时避免 Bean 名称冲突,这是非常有用的,如下例所示:

private void registerFlows() {
    IntegrationFlowRegistration flow1 =
              this.flowContext.registration(buildFlow(1234))
                    .id("tcp1")
                    .useFlowIdAsPrefix()
                    .register();

    IntegrationFlowRegistration flow2 =
              this.flowContext.registration(buildFlow(1235))
                    .id("tcp2")
                    .useFlowIdAsPrefix()
                    .register();
}

private IntegrationFlow buildFlow(int port) {
    return f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());
}

在这种情况下,用于第一个流的消息处理程序可以使用 Bean tcp1.client.handler的名称来引用。

当你使用useFlowIdAsPrefix()时,需要一个id属性。

# IntegrationFlow作为网关

IntegrationFlow可以从提供GatewayProxyFactoryBean组件的服务接口开始,如下例所示:

public interface ControlBusGateway {

    void send(String command);
}

...

@Bean
public IntegrationFlow controlBusFlow() {
    return IntegrationFlows.from(ControlBusGateway.class)
            .controlBus()
            .get();
}

接口方法的所有代理都提供了将消息发送到IntegrationFlow中的下一个集成组件的通道。你可以使用@MessagingGateway注释标记服务接口,并使用@Gateway注释标记方法。尽管如此,requestChannel中的下一个组件的内部通道忽略并重写了IntegrationFlow。否则,使用IntegrationFlow创建这样的配置是没有意义的。

默认情况下,GatewayProxyFactoryBean会得到一个传统的 Bean 名称,例如[FLOW_BEAN_NAME.gateway]。你可以通过使用@MessagingGateway.name()属性或重载IntegrationFlows.from(Class<?> serviceInterface, Consumer<GatewayProxySpec> endpointConfigurer)工厂方法来更改该 ID。此外,接口上@MessagingGateway注释的所有属性都应用于目标GatewayProxyFactoryBean。当注释配置不适用时,Consumer<GatewayProxySpec>变体可用于为目标代理提供适当的选项。此 DSL 方法从版本 5.2 开始可用。

使用 Java8,你甚至可以创建带有java.util.function接口的集成网关,如下例所示:

@Bean
public IntegrationFlow errorRecovererFlow() {
    return IntegrationFlows.from(Function.class, (gateway) -> gateway.beanName("errorRecovererFunction"))
            .handle((GenericHandler<?>) (p, h) -> {
                throw new RuntimeException("intentional");
            }, e -> e.advice(retryAdvice()))
            .get();
}

errorRecovererFlow可以如下使用:

@Autowired
@Qualifier("errorRecovererFunction")
private Function<String, String> errorRecovererFlowGateway;

# DSL 扩展

从版本 5.3 开始,引入了IntegrationFlowExtension,以允许使用自定义或组合的 EIP 操作符扩展现有的 Java DSL。所需要的只是这个类的一个扩展,它提供了可以在IntegrationFlow Bean 定义中使用的方法。扩展类还可以用于自定义IntegrationComponentSpec配置;例如,可以在现有的IntegrationComponentSpec扩展中实现错过的或默认的选项。下面的示例演示了一个复合自定义运算符和AggregatorSpec扩展的使用,用于默认的自定义outputProcessor:

public class CustomIntegrationFlowDefinition
        extends IntegrationFlowExtension<CustomIntegrationFlowDefinition> {

    public CustomIntegrationFlowDefinition upperCaseAfterSplit() {
        return split()
                .transform("payload.toUpperCase()");
    }

    public CustomIntegrationFlowDefinition customAggregate(Consumer<CustomAggregatorSpec> aggregator) {
        return register(new CustomAggregatorSpec(), aggregator);
    }

}

public class CustomAggregatorSpec extends AggregatorSpec {

    CustomAggregatorSpec() {
        outputProcessor(group ->
                group.getMessages()
                        .stream()
                        .map(Message::getPayload)
                        .map(String.class::cast)
                        .collect(Collectors.joining(", ")));
    }

}

对于链流方法,这些扩展中的新 DSL 操作符必须返回扩展类。这样,目标IntegrationFlow定义将与新的和现有的 DSL 操作符一起工作:

@Bean
public IntegrationFlow customFlowDefinition() {
    return
            new CustomIntegrationFlowDefinition()
                    .log()
                    .upperCaseAfterSplit()
                    .channel("innerChannel")
                    .customAggregate(customAggregatorSpec ->
                            customAggregatorSpec.expireGroupsUponCompletion(true))
                    .logAndReply();
}

# 积分流组合

在 Spring 集成中,将MessageChannel抽象作为第一类公民,始终假定集成流的组成。流中任何端点的输入通道都可以用于从任何其他端点发送消息,而不仅仅是从具有该通道作为输出的端点发送消息。此外,有了@MessagingGateway契约、内容更丰富的组件、像<chain>这样的复合端点,以及现在有了IntegrationFlowbean(例如IntegrationFlowAdapter),在更短的、可重用的部分之间分配业务逻辑就足够简单了。最后一篇作文所需要的只是关于MessageChannel发送到或接收到的信息的知识。

从版本5.5.4开始,为了从MessageChannel中提取更多内容并从最终用户隐藏实现细节,IntegrationFlows引入了from(IntegrationFlow)工厂方法,以允许从现有流的输出中启动当前的IntegrationFlow:

@Bean
IntegrationFlow templateSourceFlow() {
    return IntegrationFlows.fromSupplier(() -> "test data")
            .channel("sourceChannel")
            .get();
}

@Bean
IntegrationFlow compositionMainFlow(IntegrationFlow templateSourceFlow) {
    return IntegrationFlows.from(templateSourceFlow)
            .<String, String>transform(String::toUpperCase)
            .channel(c -> c.queue("compositionMainFlowResult"))
            .get();
}

另一方面,在IntegrationFlowDefinition中添加了一个to(IntegrationFlow)终端操作符,以继续在输入通道处的当前流的一些其他流:

@Bean
IntegrationFlow mainFlow(IntegrationFlow otherFlow) {
    return f -> f
            .<String, String>transform(String::toUpperCase)
            .to(otherFlow);
}

@Bean
IntegrationFlow otherFlow() {
    return f -> f
            .<String, String>transform(p -> p + " from other flow")
            .channel(c -> c.queue("otherFlowResultChannel"));
}

在流程中间的合成是可以通过现有的gateway(IntegrationFlow)EIP 方法简单地实现的。通过这种方式,我们可以通过从更简单、可重用的逻辑块中组合流来构建具有任何复杂性的流。例如,你可以添加一个IntegrationFlowbean 的库作为依赖项,并且它只需要将它们的配置类导入到最终的项目中并自动连接到你的IntegrationFlow定义中即可。