我有一个包含任务的BlockingQueue,并且只有一个守护进程线程执行它们:

public class TaskManager {

  private final BlockingQueue<Task> taskQueue;

  public TaskManager() {
    this.taskQueue = new ArrayBlockingQueue<>(5); //Max 5 tasks waiting
    Thread taskRunner = new Thread(this::runTask);
    taskRunner.setDaemon(true); //Infinite loop inside shouldn't prevent JVM shutdown
    taskRunner.start();
  }

  //Called async
  public void enqueueTask(Task task) {
    boolean added = false;
    try {
        added = taskQueue.offer(task, 5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {/*no-op*/}
    if (!added) {
        publishEvent(new Fail(task)); //Async notify the client their task won't run
    }
  }

  //Also called async
  public void dequeueTask(Task task) {
    this.taskQueue.remove(task);
  }

  void runTask() {
    while (true) {
      try {
        Task task = taskQueue.take(); //Blocks indefinitely
        doStuff(task);
      } catch (InterruptedException e) {
          //Can only be a spurious interruption. Ignore.
      }
    }
  }
}

看起来没什么特别的.但.感觉就像我通过自己管理队列和守护进程来重新发明轮子.我本以为Executor(Service)分会很容易做到这一点.我可以有一个有1个线程的ThreadPoolExecutor,给它一个有界队列.它为我提供了一个将任务出列的remove方法.但是,如果排队花费的时间太长(在我的示例中,taskQueue.offer()上的5s超时),似乎没有办法超时.

我做的事是不是已经合法了只是我想多了?如果没有,我可以用Executor来代替它吗?

推荐答案

重要的是要理解,ThreadPoolExecutor从不等待队列变得"未满".它使用offer,如果容量耗尽,则立即返回false,如果发生这种情况,并且已达到配置的最大线程大小,则执行器将调用RejectedExecutionHandler.

因此,您可以指定最大线程计数为1,这是RejectedExecutionHandler:

RejectedExecutionHandler tryForFiveSecs = (r, es) -> {
    boolean added = false;
    try {
        BlockingQueue<Runnable> taskQueue = ((ThreadPoolExecutor)es).getQueue();
        added = taskQueue.offer(r, 5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {/*no-op*/}
    if(!added) {
        //assuming the Runnable and "Task" are interchangeable
        publishEvent(new Fail(r));
    }
};

使用构造器ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler).

请注意,这包括execute的使用,这对于您现有的工作流来说已经足够了.如果您开始使用submit和返回的期货,您必须考虑失败作业(job)返回的期货永远不会结束.除非您扩展处理程序,例如:

RejectedExecutionHandler tryForFiveSecs = (r, es) -> {
    boolean added = false;
    try {
        BlockingQueue<Runnable> taskQueue = ((ThreadPoolExecutor)es).getQueue();
        added = taskQueue.offer(r, 5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {/*no-op*/}
    if(!added) {
        //assuming the Runnable and "Task" are interchangeable
        publishEvent(new Fail(r));
        if(r instanceof Future<?> f) f.cancel(false);
    }
};

Java相关问答推荐

使用ExecutorService时在ThreadFactory中触发自定义newThread函数

将Nimbus设置为计算机上运行的所有Java应用程序的默认外观

如何使用Java API在Oracle ODI中运行模拟?

Spring Boot@Cachebale批注未按预期工作

如何确定springboot在将json字段转换为Dto时如何处理它?

测试容器无法加载类路径初始化脚本

类型集合的Jackson JsonNode:类型引用的对象读取器应该是Singleton吗?

JNI:将代码打包成自包含的二进制文件

Android Java:已设置但未读取SharedPreferences

AbstractList保证溢出到其他方法

如何在不作为类出现的表上执行原生查询?

除0错误/抱歉我的句子是PT

在一行中检索字符分隔字符串的第n个值

如何处理两个几乎相同的XSD文件?

Oracle中从JSON中提取和插入数据

在Eclipse中可以使用外部字体吗?

协同 routine 似乎并不比JVM线程占用更少的资源

[jdk21][Foreign Function&;Memory API]MemoryLayout::varHandle通过可变数组进行 struct 化的问题

使用原子整数的共享计数器并发增量

如何使用 Java 替换位于特定标记内的 XML 标记的 CDATA 内的值