# R2DBC 支持

# R2DBC 支持

Spring 集成通过使用通过R2DBC (opens new window)驱动程序对数据库的反应性访问,为接收和发送消息提供了通道适配器。

你需要在项目中包含此依赖项:

Maven

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-r2dbc</artifactId>
    <version>5.5.9</version>
</dependency>

Gradle

compile "org.springframework.integration:spring-integration-r2dbc:5.5.9"

# R2DBC 入站通道适配器

R2dbcMessageSource是一个基于R2dbcEntityOperations的 pollableMessageSource实现,并生成带有FluxMono的消息,作为根据expectSingleResult选项从数据库中获取数据的有效负载。可以静态地提供对SELECT的查询,也可以基于对每个receive()调用求值的 SPEL 表达式。R2dbcMessageSource.SelectCreator作为求值上下文的根对象存在,以允许使用StatementMapper.SelectSpecfluent API。默认情况下,此通道适配器将从 SELECT 映射到LinkedCaseInsensitiveMap实例中的记录。可以根据this.r2dbcEntityOperations.getConverter()定制提供payloadType选项,该选项由EntityRowMapper在下面使用。updateSql是可选的,用于在数据库中 Mark Read 记录,以便跳过后续的轮询。UPDATE操作可以提供一个BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec>,根据SELECT结果中的记录,将值绑定到UPDATE中。

此通道适配器的典型配置可能如下所示:

@Bean
@InboundChannelAdapter("fromR2dbcChannel")
public R2dbcMessageSource r2dbcMessageSourceSelectMany() {
    R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate,
            "SELECT * FROM person WHERE name='Name'");
    r2dbcMessageSource.setPayloadType(Person.class);
    r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id");
    r2dbcMessageSource.setBindFunction(
				(DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));}
    return r2dbcMessageSource;
}

对于 Java DSL,此通道适配器的配置如下所示:

@Bean
IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
    return IntegrationFlows
        .from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
            (selectCreator) ->
                    selectCreator.createSelect("person")
                        .withProjection("*")
                        .withCriteria(Criteria.where("id").is(1)))
                    .expectSingleResult(true)
                    .payloadType(Person.class)
                    .updateSql("UPDATE Person SET id='2' where id = :id")
                    .bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
                            bindSpec.bind("id", o.getId())),
            e -> e.poller(p -> p.fixedDelay(100)))
        .<Mono<?>>handle((p, h) -> p, e -> e.async(true))
        .channel(MessageChannels.flux())
        .get();
}

# R2DBC 出站通道适配器

R2dbcMessageHandler是一个ReactiveMessageHandler实现,用于在数据库中使用提供的R2dbcEntityOperations执行INSERT(默认)、UPDATEDELETE查询。R2dbcMessageHandler.Type可以静态配置,也可以通过针对请求消息的 SPEL 表达式配置。要执行的查询可以基于tableNamevaluescriteria表达式选项,或者(如果不提供tableName)将整个消息有效负载作为org.springframework.data.relational.core.mapping.Table实体来执行 SQL。将包org.springframework.data.relational.core.query注册为导入到 SPEL 评估上下文中,以直接访问CriteriaFluent API,该 API 用于UPDATEDELETE查询。valuesExpressionINSERTUPDATE中使用,并且必须对Map进行求值,以便对列值对在目标表中执行针对请求消息的更改。

此通道适配器的典型配置可能如下所示:

@Bean
@ServiceActivator(inputChannel = "toR2dbcChannel")
public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) {
    R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate)
    messageHandler.setValuesExpression(new FunctionExpression<Message<?>>(Message::getPayload));
    messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE);
    messageHandler.setCriteriaExpression(
        EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId)));
    return messageHandler;
}

对于 Java DSL,此通道适配器的配置如下所示:

.handle(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
        .queryType(R2dbcMessageHandler.Type.UPDATE)
        .tableNameExpression("payload.class.simpleName")
        .criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
        .values("{age:36}"))