可以为Java 8 parallel stream指定自定义线程池吗?我哪儿都找不到.

假设我有一个服务器应用程序,我想使用并行流.但是这个应用程序很大,而且是多线程的,所以我想把它划分开来.我不希望在applicationblock任务的一个模块中有一个运行缓慢的任务从另一个模块中删除.

如果我不能为不同的模块使用不同的线程池,这意味着我不能在大多数实际情况下安全地使用并行流.

试试下面的例子.在不同的线程中执行一些CPU密集型任务.

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

推荐答案

实际上,有一个技巧可以说明如何在特定的fork-join池中执行并行操作.如果您在fork-join池中将其作为任务执行,则它会留在那里,而不使用公共任务.

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

技巧基于ForkJoinTask.fork,其中指定:"安排在当前任务运行的池中异步执行此任务,如果适用,或者使用ForkJoinPool.commonPool(),如果不是inForkJoinPool()"

Java相关问答推荐

收听RDX中用户数据的变化

Cucumber TestNG Assert失败,出现java. lang. Numbercycle异常

路径映射未发生

JavaFX Maven Assembly插件一直打包到错误的JDK版本

获取字符串中带空格的数字和Java中的字符

蒙蒂霍尔比赛结果不正确

当返回Mono<;Something>;时,不会调用Mono<;void>;.flatMap

返回响应时,CamelCase命名约定不起作用

在向WebSphere中的文档添加元素时iText挂起

如何在不删除Java中已有内容的情况下覆盖文件内容?

在Java中将int[]矩阵添加到ArrayList中,但出现错误

是否在settings.xml中使用条件Maven镜像?

是否有一个Java Future实现可以在池繁忙时在调用者线程中执行?

如何在IntelliJ IDEA的Build.sbt中添加外部JAR文件?

除0错误/抱歉我的句子是PT

使用for循环时出现堆栈溢出错误,但如果使用if块执行相同的操作,则不会产生错误

未调用OnBackPressedCallback-Activitiy立即终止

TinyDB问题,无法解析符号';上下文&

Java中计算大n和k值模10^9+7的二项式系数的乘法公式输出错误值

如何使用Jackson读取以方括号开头的JSON?