我有一小段代码:

var blong = Stream.iterate(BigInteger.ZERO, bi -> bi.add(BigInteger.ONE))
    .collect(Collector.of(
        () -> Stream.of(),
        (s, bi) -> Stream.concat(s, Stream.of(bi)),
        (s1, s2) -> Stream.concat(s1, s2),
        s -> s
    ));

System.out.println(blong.getClass().getName());

它不能正常工作.我得了IllegalStateException分:

Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.base/java.util.stream.AbstractPipeline.spliterator(AbstractPipeline.java:346)
at java.base/java.util.stream.Stream.concat(Stream.java:1618)
at UninitializedTest.lambda$2(UninitializedTest.java:28)
at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.base/java.util.stream.Stream$1.tryAdvance(Stream.java:1469)
at java.base/java.util.Spliterator.forEachRemaining(Spliterator.java:332)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
at UninitializedTest.main(UninitializedTest.java:27)

我的Supplier返回的Stream似乎在创建后立即关闭.

即使我从空的List或包含一些实际的BigInteger数据的Stream创建Stream,我也会得到相同的错误.

Why is my stream being closed?

推荐答案

首先,让我们仔细分析一下您创建的custom collector个:

Collector.of(
    () -> Stream.of(),                          // Container - this object is meant accumulate the result by consuming stream elements (in this case it can't really change its state, but nominally its still a container)
    (s, bi) -> Stream.concat(s, Stream.of(bi)), // Accumulator - is meant to define how stream elements should be accumulated in the container (again in this case we can't do anything with the container)
    (s1, s2) -> Stream.concat(s1, s2),          // Combiner - difines how containers should be merged while executing stream in parallel (NB: - we can replace it with a method reference Stream::concat)
    s -> s                                      // Finisher function - discribes the final transformation which should be performed with contaner (NB: since it doesn't perform any action we can omit this agument, there's an overloaded version which doesn't expect Finisher)
)

首先,值得指出的是,Stream等于not a container of data(就像一个集合).

因此,提供空流() -> Stream.of()作为collectorcontainer是错误的-container需要是可变的.但是我们can't把元素推入一条空流中.

其次,custom collector中的accumulator(Collector.of()的第二个参数)是not,它在做你可能期望它做的事情.

AccumulatorBiConsumer<R, T>,您已经按如下方式实现了它:

(s, bi) -> Stream.concat(s, Stream.of(bi))

在这里,Stream.concat()使用流sStream.of(bi)返回的流中的所有元素,并生成一个新的未命名流,该流很快就成为垃圾收集器的猎物.提醒:BiConsumer不返回值,因此concat()返回的流消失.

s保留(意味着collector知道它是引用),但它在执行concat()时已经被消耗,即它是closed.它发生在第一流元素(BigInteger.ZERO)获得处理时.当collectortry 处理第二个元素时,您会得到一个异常,因为concat()try 使用已经关闭的流s.


Consumer触发时,我预计Stream<Stream<BigInteger>>将被消耗,返回Stream<BigInteger>

首先,BiConsumerConsumer都有一个abstract方法accept(),即void,它不打算返回任何东西.

我的Supplier返回的流似乎正在关闭

其次,感觉你对Collection 家的工作方式有误解.mutable container的实例将在顺序执行场景中创建only once(每个线程并行创建一个容器,除非您通过提供Collector.Characteristics.CONCURRENT将其指定为concurrent collector,在这种情况下,所有线程将共享相同的容器).

Container应该是一个可变对象(否则它不会像您的例子那样有用),它的工作是累加流元素.即container更改其状态,而collector使用流中的元素.

Stream.iterate(seed,f)返回的流将是Stream<BigInteger>类型.

supplier () -> Stream.of()生成的Container将是Stream<Object>类型,因为编译器无法推断空流的类型,除非您使用类似.<BigInteger>of()的Type-Witness显式指定它.

因此,由accumulator侧的concat()返回的流也将是Stream<Object>类型.并提醒您,此流将被忽略.

在任何地方都不会有像Stream<Stream<BigInteger>>这样的野兽,无论是在管道中,还是在收集器内部.

最后,我要重申的是,将一个元素加到Stream中,本质上是impossible.

Java相关问答推荐

在现有Json文件中添加新记录

获取拦截器内部的IP地址

我想了解Java中的模块化.编译我的应用程序时,我有一个ResolutionException

错误:在Liferay7.4中找不到符号导入com.liferay.portal.kernel.uuid.PortalUUID;";

Java中实现的归并排序算法给出ArrayIndexOutOfBound异常

使用Mockito进行的Junit测试失败

放气总是压缩整个街区吗?

按属性值从流中筛选出重复项

如何在JavaFX中处理多个按钮

如何在一行中使用Dijkstra中的Java Stream

如何在JavaFX中制作鼠标透明stage

Java SSLKeyManager出厂密码

找出承载Cargo 的最小成本

Java在操作多个属性和锁定锁对象时使用同步和易失性

如何在IntelliJ IDEA的Build.sbt中添加外部JAR文件?

视图被推出线性布局-Android

未调用OnBackPressedCallback-Activitiy立即终止

不能在 map 上移除折线

如何修复Spring Boot应用程序中的RestDocumentationGenerationException:java.io.FileNotFoundException:/curl-request.adoc(只读文件系统)?

Eureka客户端无法使用用户/通行证注册到Eureka服务器