我已经看到了关于accessing flux at the middle of an IntegrationFlow的问题,我想知道为什么我能用以下方式成功地在通量中编写逻辑:

public void writeToSolr(IntegrationFlowDefinition<?> flowDefinition) {
    flowDefinition
        .bridge(e -> e.reactive(flux -> a ->
           flux.log("write to solr")
               .flatMap(writeToSolr)
               .subscribe()));
}

首先,我想知道为什么我从未将错误抛出控制台,但在调试时我看到了错误.

public void writeToSolr(IntegrationFlowDefinition<?> flowDefinition) {
    flowDefinition
        .bridge(e -> e.reactive(flux ->
           flux.log("write to solr")
               .flatMap(writeToSolr)
               .subscribe()));
}

我得到一个异常Bad return type in lambda expression: Disposable cannot be converted to Publisher<Message<?>>-例如,由于类型问题,代码无法编译代码.

推荐答案

你在那e.reactive()中写的逻辑是不正确的.

/**
 * Make the consumer endpoint as reactive independently of an input channel and
 * apply the provided function into the {@link Flux#transform(Function)} operator.
 * @param reactiveCustomizer the function to transform {@link Flux} for the input channel.
 * @return the spec
 * @since 5.5
 */
public S reactive(Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> reactiveCustomizer) {

https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-reactive

在此配置器中,您无法执行像flatMap()这样的"活动"操作.您完全消除了流的目的.必须在下游的处理程序中完成.这样的计划毫无意义.它只是将当前的流动状态转变为无功状态.当handle()reactive()配置器相似时,它也会做同样的事情,但也会应用这个处理程序的一个用途——它的句柄部分.

.subscribe()这个数字根本不正确.让框架在Spring集成的基础上处理提供的react 流!这就是为什么你用flux -> a ->来误导自己.它确实可以编译并让您运行,因为它不是编译或配置的一部分.当您已经发送消息时,它实际上是在运行时判断的回调.

writeToSolr可以这样使用:

flowDefinition
    .channel(c -> c.flux())
    .handle(new ReactiveMessageHandlerAdapter((message) -> writeToSolr(message.getPayload())))

我认为我们将修改reactive()个端点,只公开那些仅用于配置的Flux个操作符.其余部分不在当前端点配置中:必须在目标处理程序方法逻辑中完成.

此外,我认为我们可以引入handleReactive(ReactiveMessageHandler)作为IntegrationFlow的终端运营商,以简化ReactiveMessageHandlerAdapter的使用.

Java相关问答推荐

Java FFM,如何将Java对象与C struct 链接起来?

android Document File. isDirector()返回意外结果

ActivityCompat.请求收件箱自动拒绝权限

JPackage-results已安装-如何添加系统属性?

为什么我的画布没有显示在PFA应用程序中?

如何将kotlin代码转换为java

Java Stream,需要更新列表对象列表

Java .类参数不通过构造函数传递

对于几乎不涉及逻辑的请求,您是否应该使用命令模式?

在JavaFX项目中注册组合框的控件FX验证器时,模块系统出错

try 将JSON字符串响应从API转换为映射字符串、对象>;时出错

如何在JavaFX循环中完美地制作一个AudioClip/MediaPlayer?

Java创建带有扩展通配符的抽象处理器

寻找Thread.sky()方法的清晰度

每次我需要时创建和关闭数据库连接会有什么效果吗?

用于Java的Visual Studio代码完成不起作用

为什么创建Java动态代理需要接口参数

通过/失败的参数化junit测试方法执行数

Intellij 2023 IDE:始终在顶部显示菜单栏

如何使用外部函数从Java中获取C++ struct 的返回值&;内存API