我在一个Spring Boot应用程序中使用了Spring Kafka.我正在try 使用卡夫卡ConsumerInterceptor
来拦截偏移量promise 的时间.
这似乎是工作生产者事务未启用,但事务已打开,拦截者:拦截者.:onCommit
不再被称为.
以下是最小的示例,一切都按预期运行:
@SpringBootApplication
@EnableKafka
class Application {
@KafkaListener(topics = ["test"])
fun onMessage(message: String) {
log.warn("onMessage: $message")
}
拦截者:拦截者.
class Interceptor : ConsumerInterceptor<String, String> {
override fun onCommit(offsets: MutableMap<TopicPartition, OffsetAndMetadata>) {
log.warn("onCommit: $offsets")
}
override fun onConsume(records: ConsumerRecords<String, String>): ConsumerRecords<String, String> {
log.warn("onConsume: $records")
return records
}
}
应用程序配置:
spring:
kafka:
consumer:
enable-auto-commit: false
auto-offset-reset: earliest
properties:
"interceptor.classes": com.example.Interceptor
group-id: test-group
listener:
ack-mode: record
在使用@EmbeddedKafka
的测试中:
@Test
fun sendMessage() {
kafkaTemplate.send("test", "id", "sent message").get() // block so we don't end before the consumer gets the message
}
这将输出我预期的结果:
onConsume: org.apache.kafka.clients.consumer.ConsumerRecords@6a646f3c
onMessage: sent message
onCommit: {test-0=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}}
然而,当我通过提供transaction-id-prefix
来启用事务时,Interceptor
的onCommit
不再被调用.
我的更新配置仅添加了:
spring:
kafka:
producer:
transaction-id-prefix: tx-id-
并且该测试被更新以在事务中包装send
:
@Test
fun sendMessage() {
kafkaTemplate.executeInTransaction {
kafkaTemplate.send("test", "a", "sent message").get()
}
}
进行此更改后,我的日志(log)输出现在仅
onConsume: org.apache.kafka.clients.consumer.ConsumerRecords@738b5968
onMessage: sent message
调用Interceptor
的onConsume
方法,并且@KafkaListener
接收消息,但从未调用onCommit
.
有人碰巧知道这里发生了什么事吗?我对我应该在这里看到的东西的期望是不是不正确?