请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

offset回滚后无法重复消费数据

老师好,想问一下关于offset的手动提交。
如果通过offset的手动提交的话,执行了consumer.commitAsync()以后offset会被提交,下次就不会对这些数据进行处理了。
想问的是,如果我把consumer.commitAsync()注释掉了,为什么在while里通过consumer.poll也不会获取到这些数据,只能停止当前进程后再启动才可以获取到这些rollback的数据。
我理解的是因为没有执行consumer.commitAsync()而rollback了,那么在下一次while循环里应该可以通过consumer.poll再获取到这些数据。

public static void commitedOffset() {
    Properties properties = buildProperties();
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(Arrays.asList(TOPIC_NAME));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n", record.partition(), record.offset(), record.key(), record.value());
        }
//        consumer.commitAsync();
    }
}

正在回答 回答被采纳积分+3

1回答

Allen 2020-04-16 11:09:17

因为会有缓存呀, consumer比大家想象的还要智能一点的 哈哈

1 回复 有任何疑惑可以回复我~
  • 提问者 abulaka #1
    如果是缓存的话,那 rollback 的数据 consumer 会在什么时点被获取并处理呢。
    回复 有任何疑惑可以回复我~ 2020-04-21 08:55:18
  • Allen 回复 提问者 abulaka #2
    这个要看你自己的提交策略,课上有讲到
    回复 有任何疑惑可以回复我~ 2020-04-22 14:30:50
  • 老师好,我也碰到这个问题,总感觉这些消息莫名其妙被谁给消费了,自动提交关闭了也无法重复消费到消息。consumer的缓存在哪里清理?或者设置别缓存?
    回复 有任何疑惑可以回复我~ 2020-10-12 16:48:17
问题已解决,确定采纳
还有疑问,暂不采纳
意见反馈 帮助中心 APP下载
官方微信