我有一个以Avro种格式发送消息的制作人和一个收听这些消息的消费者.

我还通过在消费者中使用@RetryableTopic来处理错误,实现了非阻塞重试.

当消费者无法反序列化消息时(由于模式更改或其他原因),它不会将该消息放入-retry主题中.它直接将其发送到-dlt主题.

我还想重审DeserializationException次.原因是,在重试这些错误时,我可以在消费者中部署修复程序,以便重试最终能够成功.

我在@RetryableTopic年try 了include个选项,但在DeserializationException年似乎不起作用.

  @RetryableTopic(
    attempts = "${app.consumer.retry.topic.count:5}",
    backoff = @Backoff(delayExpression = "${app.consumer.retry.topic.back-off:2000}"),
    fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC,
    include = {DeserializationException.class}  // does not work
  )

这是@RetryableTopic中的一个bug还是有其他方法可以实现这一点?

推荐答案

正如您所描述的,因为Spring Kafka 2.8.3有一组global fatal exceptions,所以会导致记录直接转发到DLT.

处理此类异常的通常模式是,在部署修复程序后,使用某种控制台应用程序从DLT中检索失败的记录并重新处理,可能是通过将记录发送回第一个重试主题,以便主主题中没有重复的记录.

对于您描述的模式,您可以通过提供DestinationTopicResolver个bean来管理这组FATAL个全局异常,例如:

@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DestinationTopicResolver topicResolver(ApplicationContext applicationContext) {
    DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(Clock.systemUTC(), applicationContext);
    ddtr.removeClassification(DeserializationException.class);
    return ddtr;
}

请让我知道这是否对你有效.谢谢

Java相关问答推荐

无法运行Java(已解决)

Java事件系统通用转换为有界通配符

如何粘合(合并)文件Lucene?

根据对象和值的参数将映射<;T、值&>转换为列表<;T&>

当我已经安装了其他版本的Java时,如何在Mac OSX 14.3.1上安装Java 6?

无法了解Java线程所消耗的时间

Mac上的全屏截图在使用JavaFX时不能正常工作吗?

为什么当我创建Robot对象时,JavaFX引发IlLegalStateException异常?

是否在settings.xml中使用条件Maven镜像?

在Ubuntu 23.10上使用mp3创建JavaFX MediaPlayer时出错

在Spring Boot应用程序中,server.port=0的默认端口范围是多少?

FETCH类型设置为LAZY,但它仍会发送第二个请求

无法将GSON导入到我的JavaFX Maven项目

组合连接以从两个表返回数据

如何通过Java java.lang.Foreign API访问本机字节数组

在具有Quarkus Panache的PostgreSQL中将JSON数据存储为JSONB时,会将其存储为转义字符串

在单例类上获取Java锁,了解原因

由于可为null,无法在kotlin中实现java接口

javax.crypto-密码对象-提供者服务是如何工作的?

双对象供应商