I've got a system with many writers and a single reader, each running in a separate thread. The writers notify the reader when work is available, and the reader blocks until it is notified.

Given the number of writers, I want to use a lock-free implementation to notify the reader. Every time the reader wakes up, it resets the notification flag, does some work, and blocks waiting for more notifications to arrive. Essentially I'm looking for the equivalent of an AtomicBoolean with an ability to block until its value becomes true.

What I've tried so far:

  • My current implementation uses a Semaphore.
  • The semaphore starts out with no permits.
  • The reader blocks trying to acquire a permit.
  • Writers invoke Semaphore.release() in order to notify the reader.
  • The reader invokes Semaphore.drainPermits(), does some work, and blocks again on Semaphore.acquire.

What I don't like about the Semaphore approach:

  • It seems a bit heavy-handed. I only care about about the first notification arriving. I don't need to keep a count of how many other notifications came in.
  • Semaphores throw an exception if their count surpasses Integer.MAX_VALUE. This is more of a theoretical problem than practical but still not ideal.

Is there a data structure that is equivalent to AtomicBoolean with an ability to block waiting on a particular value?

Alternatively, is there a thread-safe manner to ensure that Semaphore's number of permits never surpass a certain value?

推荐答案

BlockingQueue<Singleton> would do this adequately.

You would create, for example, an ArrayBlockingQueue<Singleton>(1), and then your waiter would look like:

queue.take();

… and the notifier would look like:

queue.offer(Singleton.INSTANCE)

… with the use of offer ensuring that multiple releases are combined together.

Java相关问答推荐

原子性对并发哈希映射中的 computeIfAbsent 意味着什么?原子性与同步

Spring 对线程和数据库连接的更多调试见解

保存新文档时将旧版本的文档移动到历史索引(elasticsearch)

代码被推送到主线程,如何修复?

不能在子类的构造函数中抛出异常

捕获的读/写操作?在Java

Thymeleaf - th:method="delete/put" 导致:不支持请求方法'POST'

在没有线程暂停的情况下执行 ExecutorService 中的任务

如何在 Jakarta EE 应用程序中激活我自己的 Jakarta HttpAuthenticationMechanism 实现

try 将 JSON 文件或字符串转换为 CSV 文件会得到一个空的 CSV 文件

如何在 Java 中格式化两个 LocalDate 变量的范围?

java.util.Base64.getDecoder().decode 和 org.asynchttpclient.util.Base64.decode 有什么区别?

聊天应用 - 更新令牌聊天片段 - 新版本

无法在列表中添加元素

我正在使用 Java Spring 框架从我的部分数据中映射数据,而我的路径变量不会让我访问任何数据

为什么将方法作为 getOrDefault 的参数,无论如何都会调用该方法?

使用 JSP 包含指令、JSP 包含操作和使用 JSP 标记文件包含文件有什么区别?

无法从 START_OBJECT 令牌中反序列化 java.util.ArrayList 的实例

如何在 spring 启动中设置休息的基本网址?

Java 中的 Class 是什么意思?