我有一个Spring Boot微服务,可以消费来自RabbitMQ的消息、编写邮箱并将其发送到RTP服务器. 它由以下组件组成:

  1. 构成邮箱端的邮箱发件人将其发送到RTP服务器:
@Service
@RequiredArgsConstructor
public class MessageSender {

    private final JavaMailSender sender;

    public void sendMessage(final RabbitEmailDto emailDto) {
        MimeMessage message = sender.createMimeMessage();
        message.setRecipients(Message.RecipientType.TO, emailDto.getTo());

        MimeMessageHelper helper = new MimeMessageHelper(message, CharEncoding.UTF_8);
        helper.setSubject(emailDto.getData().getEmail().getSubject());
        helper.setText(emailDto.getHtml(), true);
        helper.setFrom(emailDto.getFrom());

        sender.send(message);
    }
}
  1. 消息处理器获取RabbitMQ消息列表, for each 消息在单独的虚拟线程中调用消息发送者,并返回邮箱发送结果的future 列表
@Service
@RequiredArgsConstructor
public class MessageProcessor {

    private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
    private final MessageSender messageSender;
    private final Jackson2JsonMessageConverter converter;

    public List<MessageProcessingFuture> processMessages(List<Message> messages) {
        List<MessageProcessingFuture> processingFutures = new ArrayList<>();

        for (Message message : messages) {
            MessageProcessingFuture messageProcessingFuture = new MessageProcessingFuture(
                    message.getMessageProperties().getDeliveryTag(),
                    processMessageAsync(message, executor)
            );
            processingFutures.add(messageProcessingFuture);
        }

        return processingFutures;
    }

    private Future<?> processMessageAsync(final Message message) {
        RabbitEmailDto rabbitEmailDto = (RabbitEmailDto) converter.fromMessage(message);
        MessageDeliverySystemEmailDto email = rabbitEmailDto.getData().getEmail();

        return executor.submit(() -> messageSender.sendMessage(rabbitEmailDto));
    }
}
  1. RabbitMQ消息监听器,它消费Rabbit队列中的消息,将它们传递给处理器,然后根据Future.get()是否引发异常,通过向RabbitMQ发送确认或拒绝来处理从处理器获得的future .
@Component
@RequiredArgsConstructor
public class BatchMessageListener implements ChannelAwareBatchMessageListener {

    private final MessageProcessor messageProcessor;

    @Override
    @MeasureExecutionTime
    public void onMessageBatch(final List<Message> messages, final Channel channel) {
        messageProcessor.processMessages(messages)
                .forEach(processingFuture -> processFuture(processingFuture, channel));
    }

    private void processFuture(final MessageProcessingFuture future, final Channel channel) {
        try {
            future.deliveryFuture().get();
            channel.basicAck(future.deliveryTag(), false);
        } catch (Exception e) {
            channel.basicReject(future.deliveryTag(), false);
        }
    }
}

我可以在日志(log)中看到MessageSender.sendMessage方法确实正在虚拟线程中执行,标识为VirtualThread[#100]/runnable@ForkJoinPool-1-worker-1.

我可以看到我们的生产服务器上有4名工人.(我认为这些工作人员是实际的平台线程或载体线程正确吗?)

我还发现,MessageSender.sendMessage方法通常需要大约1秒才能完成,其中大部分时间都花在等待来自RTP服务器的响应上.

根据我对虚拟线程的了解,我预计处理一批BatchMessageListener条消息(这是我为BatchMessageListener条消息配置的批量大小)将需要大约1秒的时间,因为平台线程不会阻止对RTP服务器的调用.这4个平台线程将在BatchMessageListener个虚拟线程中共享,从而有效地允许BatchMessageListener次几乎同时调用到RTP服务器.

然而,在实践中,我观察到一次处理4条消息,处理所有100条消息大约需要25秒.

在我的计算机上进行本地测试期间,我故意在MessageSender中的sender.send(message);行之前添加Thread.sleep(1000);来引入1秒的延迟,以模拟网络延迟.当时,一批Thread.sleep(1000);条消息确实在大约1秒内得到了处理,尽管根据日志(log),我只有10个载体线程.

我很困惑.为什么运营商线程不会阻止Thead.sleep呼叫,而是阻止外部服务的呼叫?我做错了什么吗?

推荐答案

Pinning

当虚拟线程没有从其载体线程中删除时,听起来像是固定.

固定通常是由于:

  • 在长时间运行的代码周围使用synchronized.
  • 通过JNI等调用本地代码.

如果您有四个载体线程(专门用于服务虚拟线程的平台线程),并且这些虚拟线程大部分时间都是固定的,那么您实际上已经将工作吞吐量限制在这四个载体线程上.100个任务中的其余96个任务必须等到前四个任务完成,然后92个任务必须等到下四个任务完成,以此类推.在这种情况下,您应该使用平台线程而不是虚拟线程.虚拟线程没有任何好处,实际上正在创造额外的工作.

检测钉扎

有关如何检测钉扎的指南,请参阅Todd Ginsberg的文章Java Virtual Thread Pinning.他描述了如何通过以下方式检测钉扎:

您可以查看带有JDK Mission Control (JMC)的JFR结果.

请注意,您可以调整检测钉扎的阈值.我模糊地记得默认值是20毫秒.但你应该验证并决定自己的有用价值.

解决方案/替代方案

你后来 comments 道:

我看到我使用的org.springframework.mail.javamail. JavMailHandler在发送邮箱时正在调用多个同步方法.在这种情况下,没有什么可以做的事情来从虚拟踩踏中受益,我的意思是正确的吗?

正确.如果synchronized代码长期运行.

虚拟线程不适合涉及synchronized或本机(JNI等)长期运行代码的任务.在这两种情况下,Java的虚拟线程调度器都无法看到此类代码何时被阻止,因此它仍被分配给平台载体线程.

对于偶尔的相遇来说,这种情况没什么大不了的.但对于重复或持续的接触,这种情况意味着您将从虚拟线程中获得很少或根本没有好处,而虚拟线程实际上可能会损害整体性能.听起来你的情况符合"重复或持续遭遇"类别.

Project Loom团队正在继续try 寻找绕过synchronized限制的方法,但Java 21 22方面尚未取得进展.

Replacing synchronized with ReentrantLock

如果synchronized代码是您自己的,则将synchronized的任何长期使用替换为ReentrantLock,以通过虚拟线程重新获得效率.

有些人将这一指南误解为"用ReentrantLock替换synchronizedall使用".这没有必要.Only long-running 100 code need be modified.简短的代码,例如保护对通常可用的变量的访问,可以保留在原处,因为在synchronized代码中花费的时间很少.

如果无法修改长期运行的synchronized个任务的源代码,请使用平台线程而不是虚拟线程.但请记住,如果这些任务具有多个可以多线程的子任务,则平台线程中的任务可能会受益于使用虚拟线程运行这些子任务.

欲了解更多信息

我不是这方面的专家.所以听专家的,而不是我的.

  • 研究概述功能的基本需求文档,JEP 444: Virtual Threads
  • 查看Project Loom名团队成员Alan BatemanRon Pressler的视频演示.
  • 另请参阅José Paumard的解释性谈话.

Java相关问答推荐

如何在Java中对自定义协议进行主机名验证?

有没有方法可以修复错误错误:无法初始化主类code_editor?

Jooq隐式地将bigint转换为数字,并且索引不起作用

try 使用Java 9或更高版本对特殊对象图进行解析时出现NullPointerException

在spring—data中自动发现native—sql查询期间遇到重复的SQL别名[id]

获取字符串中带空格的数字和Java中的字符

Mac上的全屏截图在使用JavaFX时不能正常工作吗?

Spring Security不允许加载js

在Ubuntu 23.10上使用mp3创建JavaFX MediaPlayer时出错

基于配置switch 的@Controller的条件摄取

FETCH类型设置为LAZY,但它仍会发送第二个请求

使用SWIG将C++自定义单元类型转换为基本Java类型

在JDK Flight Recorder中只记录单个线程

为什么我不能建立输入/输出流?Java ServerSocket

为什么项目名称出现在我的GET请求中?

如何在Selenium上继续使用最新的WebDriver版本

JOOQ:批处理CRUD操作使用动态表定义,如何?

为什么Java编译器为没有参数的方法(getter方法)创建桥接方法

在JPanel上使用GridBagLayout并将JButton放在里面时出现问题

为什么 log4j 过滤器在appender中不起作用