我有一个演示Spring集成项目,是接收卡夫卡消息,聚合它们,然后释放它们.我正试着在这个项目中增加JdbcMessageStore个.问题是它失败了,并出现了错误:

Caused by: java.lang.IllegalArgumentException: Cannot store messages without an ID header
    at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.2.15.RELEASE.jar:5.2.15.RELEASE]
    at org.springframework.integration.jdbc.store.JdbcMessageStore.addMessage(JdbcMessageStore.java:314) ~[spring-integration-jdbc-5.3.8.RELEASE.jar:5.3.8.RELEASE]

调试后,我发现它需要该消息中的UUID头id.但问题是我不能手动设置Kafka Header ID-它是被禁止的(与timestamp Header相同)-我在不同项目的Kafka Producer中try 过这样做.

如果我使用名为Big Data Tools的IDEA插件并从那里发送一条消息,我可以设置id头,但它是以字节数组的形式被我的项目接收的,它失败并出现错误

IllegalArgumentException Incorrect type specified for header 'id'. Expected [UUID] but actual type is [B]

关于如何解决这个问题,我找不到任何解决方案.我需要以某种方式设置这个id头,以便能够在数据库中存储消息. 提前谢谢你

推荐答案

KafkaMessageDrivenChannelAdapter有一个选项:

/**
 * Set the message converter to use with a record-based consumer.
 * @param messageConverter the converter.
 */
public void setRecordMessageConverter(RecordMessageConverter messageConverter) {

其中,您可以通过以下方式设置MessagingMessageConverter:

/**
 * Generate {@link Message} {@code ids} for produced messages. If set to {@code false},
 * will try to use a default value. By default set to {@code false}.
 * @param generateMessageId true if a message id should be generated
 */
public void setGenerateMessageId(boolean generateMessageId) {
    this.generateMessageId = generateMessageId;
}

/**
 * Generate {@code timestamp} for produced messages. If set to {@code false}, -1 is
 * used instead. By default set to {@code false}.
 * @param generateTimestamp true if a timestamp should be generated
 */
public void setGenerateTimestamp(boolean generateTimestamp) {
    this.generateTimestamp = generateTimestamp;
}

设置为true.

这样,从ConsumerRecord创建的Message将具有各自的idtimestamp报头.

您还可以简单地使用一个"虚拟"转换器来返回传入的有效负载,框架将在其中创建一个新的Message来生成这些标头.

Java相关问答推荐

内容处置 destruct 了PSP请求

Spring Webocket:尽管凭据设置为False,但MLhttpsify和Fetch请求之间的CORS行为存在差异

Java Streams在矩阵遍历中的性能影响

Java .类参数不通过构造函数传递

Chunk(Int)已弃用并标记为要删除

在Spring Boot中使用哪个Java类来存储创建时间戳?

有没有办法让扩展变得多态?

类型集合的Jackson JsonNode:类型引用的对象读取器应该是Singleton吗?

Java Mooc.fi Part 12_01.Hideout -返回和删除方法

匹配一组字符或另一组字符

内存和硬盘中的Zip不同,这会导致下载后的Zip损坏

如果按钮符合某些期望,如何修改它的文本?

Spring Boot&;Docker:无法执行目标org.springframework.boot:spring-boot-maven-plugin:3.2.0:build-image

在macOS上读取文件会导致FileNotFound,即使文件存在(并且具有权限)

带有可选部分的Java DateTimeForMatter

JavaFX:无论何时显示应用程序,如何更改组件/ node 位置?

Spring Validator批注不起作用

在Spring Boot中使用咖啡因进行缓存-根据输出控制缓存

";home/runner/work/中没有文件...匹配到[**/pom.xml];Maven项目的构建过程中出现错误

为什么Instant没有从UTC转换为PostgreSQL的时区?