我在一个Spring Boot应用程序中使用了Spring Kafka.我正在try 使用卡夫卡ConsumerInterceptor来拦截偏移量promise 的时间.

这似乎是工作生产者事务未启用,但事务已打开,拦截者:拦截者.:onCommit不再被称为.

以下是最小的示例,一切都按预期运行:

@SpringBootApplication
@EnableKafka
class Application {
    @KafkaListener(topics = ["test"])
    fun onMessage(message: String) {
        log.warn("onMessage: $message")
    }

拦截者:拦截者.

class Interceptor : ConsumerInterceptor<String, String> {
    override fun onCommit(offsets: MutableMap<TopicPartition, OffsetAndMetadata>) {
        log.warn("onCommit: $offsets")
    }

    override fun onConsume(records: ConsumerRecords<String, String>): ConsumerRecords<String, String> {
        log.warn("onConsume: $records")
        return records
    }
}

应用程序配置:

spring:
  kafka:
    consumer:
      enable-auto-commit: false
      auto-offset-reset: earliest
      properties:
        "interceptor.classes": com.example.Interceptor
      group-id: test-group
    listener:
      ack-mode: record

在使用@EmbeddedKafka的测试中:

    @Test
    fun sendMessage() {
        kafkaTemplate.send("test", "id", "sent message").get() // block so we don't end before the consumer gets the message
    }

这将输出我预期的结果:

onConsume: org.apache.kafka.clients.consumer.ConsumerRecords@6a646f3c
onMessage: sent message
onCommit: {test-0=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}}

然而,当我通过提供transaction-id-prefix来启用事务时,InterceptoronCommit不再被调用.

我的更新配置仅添加了:

spring:
  kafka:
    producer:
      transaction-id-prefix: tx-id-

并且该测试被更新以在事务中包装send:

    @Test
    fun sendMessage() {
        kafkaTemplate.executeInTransaction {
            kafkaTemplate.send("test", "a", "sent message").get()
        }
    }

进行此更改后,我的日志(log)输出现在仅

onConsume: org.apache.kafka.clients.consumer.ConsumerRecords@738b5968
onMessage: sent message

调用InterceptoronConsume方法,并且@KafkaListener接收消息,但从未调用onCommit.

有人碰巧知道这里发生了什么事吗?我对我应该在这里看到的东西的期望是不是不正确?

推荐答案

在使用事务时,偏移量不会通过消费者提交(只有一次语义).相反,偏移量是通过制作人提交的.

KafkaProducer...

/**
 * Sends a list of specified offsets to the consumer group coordinator, and also marks
 * those offsets as part of the current transaction. These offsets will be considered
 * committed only if the transaction is committed successfully. The committed offset should
 * be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.
 * <p>
 * This method should be used when you need to batch consumed and produced messages
 * together, typically in a consume-transform-produce pattern. Thus, the specified
 * {@code groupMetadata} should be extracted from the used {@link KafkaConsumer consumer} via
 * {@link KafkaConsumer#groupMetadata()} to leverage consumer group metadata. This will provide
 * stronger fencing than just supplying the {@code consumerGroupId} and passing in {@code new ConsumerGroupMetadata(consumerGroupId)},
 * however note that the full set of consumer group metadata returned by {@link KafkaConsumer#groupMetadata()}
 * requires the brokers to be on version 2.5 or newer to understand.
 *
 * <p>
 * Note, that the consumer should have {@code enable.auto.commit=false} and should
 * also not commit offsets manually (via {@link KafkaConsumer#commitSync(Map) sync} or
 * {@link KafkaConsumer#commitAsync(Map, OffsetCommitCallback) async} commits).
 * This method will raise {@link TimeoutException} if the producer cannot send offsets before expiration of {@code max.block.ms}.
 * Additionally, it will raise {@link InterruptException} if interrupted.
 *
 * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started.
 * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
 * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
 *         does not support transactions (i.e. if its version is lower than 0.11.0.0) or
 *         the broker doesn't support latest version of transactional API with all consumer group metadata
 *         (i.e. if its version is lower than 2.5.0).
 * @throws org.apache.kafka.common.errors.UnsupportedForMessageFormatException fatal error indicating the message
 *         format used for the offsets topic on the broker does not support transactions
 * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
 *         transactional.id is not authorized, or the consumer group id is not authorized.
 * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried
 *         (e.g. if the consumer has been kicked out of the group). Users should handle this by aborting the transaction.
 * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this producer instance gets fenced by broker due to a
 *                                                                  mis-configured consumer instance id within group metadata.
 * @throws org.apache.kafka.common.errors.InvalidProducerEpochException if the producer has attempted to produce with an old epoch
 *         to the partition leader. See the exception for more details
 * @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any
 *         other unexpected error
 * @throws TimeoutException if the time taken for sending offsets has surpassed max.block.ms.
 * @throws InterruptException if the thread is interrupted while blocked
 */
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                                     ConsumerGroupMetadata groupMetadata) throws ProducerFencedException {

Kotlin相关问答推荐

Compose:LaunchedEffect在密钥更改后不会重新启动

Jetpack Compose Material3和Material2 Slider onValueChangeFinded()的行为不同

可选的.在kotlin中不使用泛型参数

如何检测一个值是否是Kotlin中的枚举实例?

限制通用Kotlin枚举为特定类型

init中的NPE抽象函数变量

compareBy 如何使用布尔表达式在 kotlin 中工作

.indices 在 kotlin 中的含义是什么?

runBlocking 中的 deferred.await() 抛出的异常即使在被捕获后也被视为未处理

如何禁用智能投射突出显示 Kotlin?

在 Kotlin 中通过反射获取 Enum 值

如何在顶级函数中使用 koin 注入依赖项

Kotlin 具体化的泛型不会按计划保持类型

将协同路由调用放在存储库或ViewModel中哪个更好?

如何在Spring Boot应用程序上启用承载身份验证?

Kotlin suspend fun

参数不匹配;SimpleXML

Jetpack Compose-居中文本

内联 onFocusChange kotlin

使用 Kotlin 按字母对数组进行排序