老师好,想问一下关于offset的手动提交。
如果通过offset的手动提交的话,执行了consumer.commitAsync()
以后offset会被提交,下次就不会对这些数据进行处理了。
想问的是,如果我把consumer.commitAsync()
注释掉了,为什么在while
里通过consumer.poll
也不会获取到这些数据,只能停止当前进程后再启动才可以获取到这些rollback的数据。
我理解的是因为没有执行consumer.commitAsync()
而rollback了,那么在下一次while循环里应该可以通过consumer.poll
再获取到这些数据。
public static void commitedOffset() {
Properties properties = buildProperties();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n", record.partition(), record.offset(), record.key(), record.value());
}
// consumer.commitAsync();
}
}