我们使用ExecutorService并行地对云存储进行各种REST调用,我们担心池可能会忙于被阻止的调用,这些调用已经失控,然后长时间阻止较新的调用(云超时可能在经历所有重试后几分钟).当然,我们总是可以添加越来越多的线程,但我用C++实现的一种解决方案是,如果任务尚未开始,就让Future.get()在调用者线程中运行.Java almostExecutors.newWorkStealingQueue()中实现了这一点,但只有在调用方本身是池线程时,才会启用调用方逻辑:

class ForkJoinTask {
  private int awaitDone(...) {
      ...
      if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
        // execute in caller thread

这就是工作窃取防止递归算法耗尽线程的方式.但如果调用方是普通线程,则不会触发此逻辑.ForkJoinPool也没什么不同,因为Executors.newWorkStealingQueue()只是一个包装.Java中有没有什么东西支持我想要的东西?

PS:我们还不能使用JDK 21的虚拟线程:-(

更新:更多关于用例的解释.我们发现,为了回答有关云存储的某些问题,例如,应该将此路径视为目录还是文件,我们必须问几个问题:

  • 是否存在具有该名称的斑点?
  • 是否存在名称后跟/的blob?
  • 是否存在以该路径为前缀的其他BLOB?

执行器允许我们并行地提出这些问题,并减少延迟.因此,我们有一个共享的执行者,每个人都可以问这些关于云存储终端的问题.但是...有时,某个终端会崩溃,在恢复或超时之前不回答问题,这可能需要几分钟的时间.当这种情况发生时,我们不希望延迟的请求阻塞其他所有人的执行程序,并使every个请求等待."调用者忙时执行"模式给了我们一个安全阀,在几分钟内一切都会稍微慢一点,但不会慢lot分钟.

推荐答案

如果您使用标准的ThreadPoolExecutor,则"Celler Runs"策略将执行您想要的操作:

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html

Javadoc:

直接在中运行拒绝任务的拒绝任务处理程序 Execute方法的调用线程,除非执行器已 关闭,在这种情况下,任务将被丢弃.

以下是关于拒绝是如何发生的解释:

任何BlockingQueue都可用于传输和保留提交的任务. 此队列的使用与池大小调整相互作用:

  • 如果正在运行的线程少于corePoolSize线程,则Executor始终倾向于添加新线程,而不是排队.
  • 如果corePoolSize或更多线程正在运行,则Executor始终倾向于将请求排队,而不是添加新线程.
  • If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be
    rejected.

UPDATE FROM OP:使用SynchronousQueue来避免重复,它工作起来很有魅力.

exec = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new SynchronousQueue<>());
exec.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

Java相关问答推荐

try 使用Java 9或更高版本对特殊对象图进行解析时出现NullPointerException

Springdoc Whitelabel Error Page with Spring V3

为什么一个Test的instance?& gt;在构造函数中接受非空对象?

如何找到MongoDB文档并进行本地化?

如何在运行时动态创建表(使用Java、JPA、SprringBoot)

Java中的死锁及其重入锁和锁

将响应转换为带值的键

与不同顺序的组进行匹配,不重复组但分开

用OSQL创建索引

try 从REST API返回对象列表时出错

如何以编程方式保存workBench.xmi?

在Spring Boot JPA for MySQL中为我的所有类创建Bean时出错?

没有Tomcat,IntelliJ如何在本地运行API?

我该如何为我的类编写getter和setter方法?

Java HashMap保留所有时间复杂性

为什么Instant没有从UTC转换为PostgreSQL的时区?

Spring Boot Security-每个端点都被403禁止,Spring记录一个BasicErrorController#错误(HttpServlet请求)

如何从指定某些字段的父对象创建子对象

java构造函数中的冻结操作何时发生?

为什么child-pom会创建一个新版本