我需要合并多个慢源,但保持秩序. 如果代码执行两次,则顺序必须相同.

为此,非常简单的解决方案如下:

 Source
    .from(partions())
    .flatMapConcat(partition -> slowSource(partition))

这是可行的,但是第二个分区的慢速源代码在第一个分区之后执行.我想并行运行速度较慢的源代码,但将结果与稳定的顺序合并.

我试着这样做:

Source
    .completionStage(Source
        .from(partions())
        .map(partition -> slowSource(partition).runWith(Sink.seq(), actorSystem))
        .runWith(Sink.seq(), actorSystem))
    .mapConcat(i -> i)
    .mapAsync(partitions, stage -> stage)
    .mapConcat(i -> i) 

它正在工作,但我需要 for each 分区创建一个列表. 有没有更好的方法来实现这一点?

推荐答案

类似的东西应该接近你似乎想要的东西:

Source.from(partitions())
  .map(partition -> (slowSource(partition).buffer(1, OverflowStrategy.backpressure())).preMaterialize(actorSystem))
  .flatMapConcat(Pair::second)

通过将源附加到缓冲区并将其具体化,您可以通过初始化它并向源发送需求信号来有效地"启动泵". 你可以调整缓冲区的大小(例如,可以使用statefulMap在后面的分区上有更大的缓冲区). 如果源缓慢的主要原因是在第一个元素出现之前有很长的延迟,但之后它们出现得相当快,这将允许每个分区快速设置.

Java相关问答推荐

Jooq隐式地将bigint转换为数字,并且索引不起作用

try Dockerize Maven应用程序,但发布版本21不支持"

Java 21虚拟线程会解决转向react 式单线程框架的主要原因吗?

在Java中测试DAO方法:假实现与内存数据库

参数值[...]与预期类型java.util.Date不匹配

工件部署期间出错[Tomcat 8.5.45]

是否保证在事务性块的末尾标记违反约束?

现场观看Android Studio中的变化

RichFaces 3.x-Spring Boot-迁移web.xml

按属性值从流中筛选出重复项

Java Mooc.fi Part 12_01.Hideout -返回和删除方法

迁移到Java 17后,日期显示不准确

在VS代码中,如何启用Java Main函数的&Q;Run|DEBUG&Q;代码?

Java组件项目中的JavaFX对话框国际化

Spring Framework6.1中引入的新RestClient是否有适合于测试的变体,就像RestTemplate和TestRestTemplate一样?

try 使用预准备语句占位符获取信息时出现Try-With-Resources错误

如果执行@BeForeEach#repository.save(),则测试中的UnitTest最终UUID会发生更改

未调用OnBackPressedCallback-Activitiy立即终止

如何设计包含已知和未知键值对映射的Java类?

java.exe如何执行java源代码?