我有一个Spring Boot微服务,可以消费来自RabbitMQ的消息、编写邮箱并将其发送到RTP服务器. 它由以下组件组成:
- 构成邮箱端的邮箱发件人将其发送到RTP服务器:
@Service
@RequiredArgsConstructor
public class MessageSender {
private final JavaMailSender sender;
public void sendMessage(final RabbitEmailDto emailDto) {
MimeMessage message = sender.createMimeMessage();
message.setRecipients(Message.RecipientType.TO, emailDto.getTo());
MimeMessageHelper helper = new MimeMessageHelper(message, CharEncoding.UTF_8);
helper.setSubject(emailDto.getData().getEmail().getSubject());
helper.setText(emailDto.getHtml(), true);
helper.setFrom(emailDto.getFrom());
sender.send(message);
}
}
- 消息处理器获取RabbitMQ消息列表, for each 消息在单独的虚拟线程中调用消息发送者,并返回邮箱发送结果的future 列表
@Service
@RequiredArgsConstructor
public class MessageProcessor {
private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
private final MessageSender messageSender;
private final Jackson2JsonMessageConverter converter;
public List<MessageProcessingFuture> processMessages(List<Message> messages) {
List<MessageProcessingFuture> processingFutures = new ArrayList<>();
for (Message message : messages) {
MessageProcessingFuture messageProcessingFuture = new MessageProcessingFuture(
message.getMessageProperties().getDeliveryTag(),
processMessageAsync(message, executor)
);
processingFutures.add(messageProcessingFuture);
}
return processingFutures;
}
private Future<?> processMessageAsync(final Message message) {
RabbitEmailDto rabbitEmailDto = (RabbitEmailDto) converter.fromMessage(message);
MessageDeliverySystemEmailDto email = rabbitEmailDto.getData().getEmail();
return executor.submit(() -> messageSender.sendMessage(rabbitEmailDto));
}
}
- RabbitMQ消息监听器,它消费Rabbit队列中的消息,将它们传递给处理器,然后根据Future.get()是否引发异常,通过向RabbitMQ发送确认或拒绝来处理从处理器获得的future .
@Component
@RequiredArgsConstructor
public class BatchMessageListener implements ChannelAwareBatchMessageListener {
private final MessageProcessor messageProcessor;
@Override
@MeasureExecutionTime
public void onMessageBatch(final List<Message> messages, final Channel channel) {
messageProcessor.processMessages(messages)
.forEach(processingFuture -> processFuture(processingFuture, channel));
}
private void processFuture(final MessageProcessingFuture future, final Channel channel) {
try {
future.deliveryFuture().get();
channel.basicAck(future.deliveryTag(), false);
} catch (Exception e) {
channel.basicReject(future.deliveryTag(), false);
}
}
}
我可以在日志(log)中看到MessageSender.sendMessage
方法确实正在虚拟线程中执行,标识为VirtualThread[#100]/runnable@ForkJoinPool-1-worker-1
.
我可以看到我们的生产服务器上有4名工人.(我认为这些工作人员是实际的平台线程或载体线程正确吗?)
我还发现,MessageSender.sendMessage
方法通常需要大约1秒才能完成,其中大部分时间都花在等待来自RTP服务器的响应上.
根据我对虚拟线程的了解,我预计处理一批BatchMessageListener
条消息(这是我为BatchMessageListener
条消息配置的批量大小)将需要大约1秒的时间,因为平台线程不会阻止对RTP服务器的调用.这4个平台线程将在BatchMessageListener
个虚拟线程中共享,从而有效地允许BatchMessageListener
次几乎同时调用到RTP服务器.
然而,在实践中,我观察到一次处理4条消息,处理所有100条消息大约需要25秒.
在我的计算机上进行本地测试期间,我故意在MessageSender
中的sender.send(message);
行之前添加Thread.sleep(1000);
来引入1秒的延迟,以模拟网络延迟.当时,一批Thread.sleep(1000);
条消息确实在大约1秒内得到了处理,尽管根据日志(log),我只有10个载体线程.
我很困惑.为什么运营商线程不会阻止Thead.sleep
呼叫,而是阻止外部服务的呼叫?我做错了什么吗?