我有以下场景:一个自定义线程池需要能够暂停.暂停时,删除当前排队的任务.在暂停期间,它可以接受任务,并在恢复时执行"新"排队任务.我目前的实现就是在做上述要求.线程池实现:

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class QueriesThreadPoolExecutor extends ThreadPoolExecutor {
    private final List<Future<?>> futuresList = Collections.synchronizedList(new ArrayList<>());
    private boolean isPaused = false;
    private Lock pauseLock = new ReentrantLock();
    private Condition unpaused = pauseLock.newCondition();

    public QueriesThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    public void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
//        System.out.println("beforeExecute " + t.getName());
        pauseLock.lock();
        try {
            while (isPaused) {
                unpaused.await();
            }
        } catch (InterruptedException e) {
            t.interrupt();
        } finally {
            pauseLock.unlock();
        }
    }

    @Override
    public void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
//        System.out.println("afterExecute " + Thread.currentThread().getName());
    }

    @Override
    public void terminated() {
        super.terminated();
        System.out.println("Thread pool terminated");
    }

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks = super.shutdownNow();

        System.out.println("Shutting down thread pool, active tasks remaining: " + tasks.size());
        return tasks;
    }

    public <T> Future<T> submit(Callable<T> task) {
        System.out.println("submit task");
        Future<T> res = super.submit(task);
        futuresList.add(res);

        return res;
    }

    public void pause() {
        cancelRunningTasks();

        System.out.println("pause");
        pauseLock.lock();
        try {
            isPaused = true;
        } finally {
            pauseLock.unlock();
        }
    }

    public void resume() {
        System.out.println("resume");
        pauseLock.lock();
        try {
            isPaused = false;
            unpaused.signal();
        } finally {
            pauseLock.unlock();
        }
    }

    public void cancelRunningTasks() {
        synchronized (futuresList) {
            for (Future<?> future : futuresList) {
                if (!future.isDone()) {
                    future.cancel(true);
                    System.out.println("cancelRunningTasks");
                }
            }
        }
    }
}

单元测试

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

class QueriesThreadPoolExecutorTests {
    private static QueriesThreadPoolExecutor queriesThreadPoolExecutor;
    private AtomicInteger longCounter = new AtomicInteger(0);
    private AtomicInteger shortCounter = new AtomicInteger(0);

    @BeforeAll
    static void setup() {
        queriesThreadPoolExecutor = new QueriesThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
    }

    @AfterAll
    static void tearDown() {
        queriesThreadPoolExecutor.shutdownNow();
    }

    @Test
    void testCancelResumeTasks() {
        Callable<String> longRunningTask = () -> {
            Thread.sleep(20_000);
            return "long running task" + longCounter.incrementAndGet();
        };

        Callable<String> shortRunningTask = () -> {
            Thread.sleep(1_000);
            return "short running task" + shortCounter.incrementAndGet();
        };
        List<Future<String>> futureList = new ArrayList<>();

        submitTasks(futureList, shortRunningTask, longRunningTask);

        ScheduledExecutorService cancelScheduledExecutor = Executors.newScheduledThreadPool(2);
        cancelScheduledExecutor.schedule(() -> {
            queriesThreadPoolExecutor.pause();
        }, 2, TimeUnit.SECONDS);

        printResults(futureList);

        submitTasks(futureList, shortRunningTask, longRunningTask);

        queriesThreadPoolExecutor.resume();

        printResults(futureList);
    }

    private void submitTasks(List<Future<String>> futureList, Callable<String> shortRunningTask, Callable<String> longRunningTask) {
        for (int i = 0; i < 5; i++) {
            futureList.add(queriesThreadPoolExecutor.submit(shortRunningTask));
        }
        for (int i = 0; i < 5; i++) {
            futureList.add(queriesThreadPoolExecutor.submit(longRunningTask));
        }
    }

    private void printResults(List<Future<String>> futureList) {
        System.out.println("***********Printing results*****************");
        for (Future<String> future : futureList) {
            try {
                String status = future.isCancelled() ? "cancelled" : "done";
                String result = future.isCancelled() ? "cancelled" : future.get();
                System.out.println("task status - " + status + " - result " + result);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            } catch (CancellationException e) {
                System.out.println("Exception - Task was cancelled");
            }
        }
    }
}

由于某种原因,当线程池暂停时,一个或多个任务从单元测试中引发异常:

task status - done - result short running task1
cancelRunningTasks
cancelRunningTasks
cancelRunningTasks
cancelRunningTasks
cancelRunningTasks
Exception - Task was cancelled  <--------------------WHY?
task status - cancelled - result cancelled
task status - cancelled - result cancelled
task status - cancelled - result cancelled
task status - cancelled - result cancelled
submit task
pause
submit task

也许我错过了一些琐碎的东西.

推荐答案

所以这里发生的事情:

您有一个包含5个线程的池,并并行启动5个任务.

短任务需要1秒,长任务需要20秒. 您还计划取消2s.

这里有两个有趣的地方,一个是printResults每次循环一个任务,另一个是future.get()阻塞.

打印循环将在任何长任务被取消之前到达该语句,并开始阻塞第一个长任务.

与此同时,cancelScheduledExecutor将启动并取消所有长期运行的任务.

当前被阻止的任务现在被取消,并将引发异常.

其余任务将在.isCancelled()上正确判断并注销.

Java相关问答推荐

是否有一种格式模式,可以在除0之外的数字前面有正负符号?

Saxon 9:如何从Java扩展函数中的net.sf.saxon.expr. XPathContent中获取声明的变量

根据对象和值的参数将映射<;T、值&>转换为列表<;T&>

使用联接和分页的SpringBoot Spring数据JPA

Hibernate 6支持Joda DateTime吗?

解释左移在Java中的工作原理

Jenv-相同的Java版本,但带有前缀

用OSQL创建索引

如何为JavaFX Spring Boot应用程序制作Windows/MacOS/Linux安装程序

Java连接池无法正常工作

使用While循环打印素数,无法正常工作

基于配置switch 的@Controller的条件摄取

如何创建模块信息类文件并将其添加到JAR中?

除0错误/抱歉我的句子是PT

在应用getCellFormula()时,Excel引用中的文件名始终为";[1]";使用Apache POI()

基于Java中mm/dd/yy格式的最近日期对数组列表进行排序

JavaFX,GridPane:在GridPane的列中生成元素将保持所有列的宽度

如何设计包含已知和未知键值对映射的Java类?

如何在Java上为循环数组从synchronized迁移到ReentrantLock

将在 Docker 中运行的 Spring Boot 连接到在 Docker 中运行的 PostgreSQL,无需 compose 文件?