我正在try 通过将目标时间戳提供为UTC Instant("2023-05-31T00:00:00.00Z")来重新读取直到特定时间点的消息.然而, Select 性重置不起作用,我总是得到主题的第一个偏移量(0)的返回.根据Javadoc(https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html-offsetsForTimes),我应该收到时间戳等于或大于指定时间戳的第一条消息的偏移量:
公共Map<;TopicPartition,OffsetAndTimestamp>;OffsetsForTimes(Map<;TopicPartition,Long>;TimestStamp ToSearch) 按时间戳查找给定分区的偏移量.每个分区返回的偏移量是时间戳大于或等于对应分区中给定时间戳的最早的偏移量.这是一个阻止呼叫.
然而,我得到的是一个比指定时间戳更早的时间戳以及偏移量0.
我有点不愿意考虑apache Kafka客户端的错误,所以我想要求一些额外的眼睛来验证我在这里没有try 一些彻头彻尾的胡说八道.
以下是我对代码片段所做的简短总结. Main方法重置记录的偏移量和轮询(并将它们打印到控制台):
resetOffsets(consumer, resetTime);
while (true) {
records = consumer.poll(timeout);
Reset Offsets方法首先将时间戳转换为纪元秒,并将其添加到每个分配的TopicPartiton的映射中(我在测试中只使用了一个单分区主题):
private static void resetOffsets(Consumer<Object, Object> consumer, String resetTime) {
Long resetTimeEpoch = Long.valueOf(Instant.parse(resetTime).getEpochSecond());
Map<TopicPartition, Long> partitionTimestamps = consumer.assignment().stream().collect(Collectors.toMap(tp -> tp, tp -> resetTimeEpoch));
然后,我调用offsetsForTimes来获取每个分区的偏移量,并将参数时间戳和结果映射打印到控制台以进行调试:
Map<TopicPartition, OffsetAndTimestamp> resetOffsets = consumer.offsetsForTimes(partitionTimestamps);
System.out.println(partitionTimestamps);
System.out.println(resetOffsets);
之后,我将寻求该偏移量,并在主循环中处理消息.
该话题目前有八条信息,五条来自昨天(31.05.2023),三条来自前天(30.05.2023).因此,我希望在重置为"2023-05-31T00:00:00.00Z"时处理五条较新的消息,但我再次得到了所有八条消息.
System.out.println(partitionTimestamps);
的控制台输出为{replicated-topic-0=1685491200}
,System.out.println(resetOffsets);
的控制台输出为{replicated-topic-0=(timestamp=1685455042425, leaderEpoch=0, offset=0)}
,这表明我使用正确的时间戳调用了该方法.不过,我还是要买一辆旧的.
我对我可能做错了什么感到有点不知所措,并感谢您的任何意见.如果需要,我可以提供我的小测试程序的完整代码.