我正在try 通过将目标时间戳提供为UTC Instant("2023-05-31T00:00:00.00Z")来重新读取直到特定时间点的消息.然而, Select 性重置不起作用,我总是得到主题的第一个偏移量(0)的返回.根据Javadoc(https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html-offsetsForTimes),我应该收到时间戳等于或大于指定时间戳的第一条消息的偏移量:

公共Map<TopicPartition,OffsetAndTimestamp>OffsetsForTimes(Map<TopicPartition,Long>TimestStamp ToSearch) 按时间戳查找给定分区的偏移量.每个分区返回的偏移量是时间戳大于或等于对应分区中给定时间戳的最早的偏移量.这是一个阻止呼叫.

然而,我得到的是一个比指定时间戳更早的时间戳以及偏移量0.

我有点不愿意考虑apache Kafka客户端的错误,所以我想要求一些额外的眼睛来验证我在这里没有try 一些彻头彻尾的胡说八道.

以下是我对代码片段所做的简短总结. Main方法重置记录的偏移量和轮询(并将它们打印到控制台):

 resetOffsets(consumer, resetTime);

        while (true) {
            records = consumer.poll(timeout);

Reset Offsets方法首先将时间戳转换为纪元秒,并将其添加到每个分配的TopicPartiton的映射中(我在测试中只使用了一个单分区主题):

private static void resetOffsets(Consumer<Object, Object> consumer, String resetTime) {
        Long resetTimeEpoch = Long.valueOf(Instant.parse(resetTime).getEpochSecond());
        Map<TopicPartition, Long> partitionTimestamps = consumer.assignment().stream().collect(Collectors.toMap(tp -> tp, tp -> resetTimeEpoch));

然后,我调用offsetsForTimes来获取每个分区的偏移量,并将参数时间戳和结果映射打印到控制台以进行调试:

Map<TopicPartition, OffsetAndTimestamp> resetOffsets = consumer.offsetsForTimes(partitionTimestamps);
        
        System.out.println(partitionTimestamps);
        System.out.println(resetOffsets);

之后,我将寻求该偏移量,并在主循环中处理消息.

该话题目前有八条信息,五条来自昨天(31.05.2023),三条来自前天(30.05.2023).因此,我希望在重置为"2023-05-31T00:00:00.00Z"时处理五条较新的消息,但我再次得到了所有八条消息. System.out.println(partitionTimestamps);的控制台输出为{replicated-topic-0=1685491200},System.out.println(resetOffsets);的控制台输出为{replicated-topic-0=(timestamp=1685455042425, leaderEpoch=0, offset=0)},这表明我使用正确的时间戳调用了该方法.不过,我还是要买一辆旧的.

我对我可能做错了什么感到有点不知所措,并感谢您的任何意见.如果需要,我可以提供我的小测试程序的完整代码.

推荐答案

为了证明我不是在胡说八道

确实发生了这种事.问题越奇怪,由自己造成的可能性就越大.

问题是请求的时间戳仅以纪元秒为单位,返回的时间戳以毫秒为单位->;,因此后者当然更大,从而导致问题.请原谅我浪费了时间.

Java相关问答推荐

Junit with Mockito for java

SpringBootreact 式Web应用程序的Spring Cloud Configer服务器中的资源控制器损坏

JavaFX如何在MeshView中修复多个立方体?

测试何时使用Mockito强制转换对象会导致ClassCastException

虚拟线程应该很快消亡吗?

将ByteBuffer异步写入InputStream或Channel或类似对象

如何配置空手道以使用FeignClient或RestTemplate代替ApacheHttpClient

如何使用Criteria Builder处理一对多关系中的空值?

无法将GSON导入到我的JavaFX Maven项目

在Spring Boot中使用咖啡因进行缓存

Java中的一个错误';s stdlib SocksSocketImpl?

由于版本不匹配,从Java 8迁移到Java 17和Spring 6 JUnit4失败

设置背景时缺少Android编辑文本下划线

Springboot应用程序无法识别任何@RestController或@Service,我认为@Repository也无法识别

如何转换Vector<;对象>;转换为int?

更新不可变的深层嵌套字段

为什么 Random() 的行为不符合预期?

我真的能很好地解释同步计数器和 AtomicInteger 的 JMH 结果吗

Java gRPC,嵌套类不生成

java.time - 如何解析各种模式的日期时间格式(具有所有可选时间组件,除了年份)?