背景:

我能想到的一种方法是要求操作员知道当前元素是最后一个元素.

AtomicReference<Item> itemRef = new AtomicReference();
itemRef.set(new Item());
Flowable<Item> accumulateOrProcessFlowable = source.
    flatMap(item -> {
        if(item.shouldBeAccumulated()) {
            //Accumulate data into reference
            itemRef.set(itemRef.get().addData(item.getData()));
            //Return empty to throw away consumed item;
            return Flowable.empty();
        } else {
            item.updateProperty();
            return Flowable.just(item);
        }
    })
    .applyIfLastElement(item -> {
        if (item.shouldBeAccumulated()) {
            return Flowable.just(itemRef.get());
        }
    })

推荐答案

下面是如何做到这一点(在RxJava2.x中,它与RxJava3.x非常接近).诀窍是使用defer(封装Flowable状态的最佳方式,以便可以多次订阅)和concatWith.defer还可以在last的情况下启用延迟判断.还请注意,作为一种性能改进,您可能不关心我使用了单元素数组而不是原子引用对象(以避免不必要的易失性读取、设置等).

Flowable<Integer> result = Flowable.defer(() -> {
    boolean[] isFirst = new boolean[] { true };
    Integer[] state = new Integer[1];
    Maybe<Integer> last = Maybe.defer(() -> {
        if (state[0] == null) {
            return Maybe.empty();
        } else {
            return Maybe.just(state[0]);
        }
    });
    return source //
            .flatMap(x -> {
                if (state[0] != null || isFirst[0] && shouldBeAccumulated(x)) {
                        // accumulate
                        state[0] = state[0] == null ? 0 : state[0] + x;
                        isFirst[0] = false;
                        return Flowable.empty();
                    } else {
                        isFirst[0] = false;
                        return Flowable.just(x);
                    }
                })
            .concatWith(last);
    });

Java相关问答推荐

具有默认分支的JUnit代码覆盖率切换声明

Java Stream,需要更新列表对象列表

无法传递消费者<;>;实例

S的字符串表示是双重精确的吗?

Spring Boot Maven包

我不能再在Android Studio Hedgehog上用Java语言创建新项目了吗?

如何让DTO接受空字符串字段,但如果它们不为空,则应用JPA验证?

由于 list 中的权限错误,Android未生成

如何使用带有谓词参数的方法,而不使用lambda表达式

未找到适用于响应类型[类java.io.InputStream]和内容类型[Text/CSV]的HttpMessageConverter

GetChildren().emoveAll()不会删除 node

try 将JSON字符串响应从API转换为映射字符串、对象>;时出错

使用正则表达式从字符串中提取多个值

在Ubuntu 23.10上使用mp3创建JavaFX MediaPlayer时出错

如何从HttpResponse实例获取Entity对象的内容?

向Java进程发送`kill-11`会引发NullPointerException吗?

在Spring Boot中使用咖啡因进行缓存

Java编译器是否进行了持续的折叠优化,以及如何进行判断?

java中的网上购物车解析错误

如何使用 JDBC 更改 Postgres Enum 类型