采纳答案成功!
向帮助你的同学说点啥吧!感谢那些助人为乐的人
老师您好,经典模式是创建多个消费者对象来进行消费,我对多线程这块了解不多,就打印了consumer的对象,但我发现所有的消息都是被同一个对象给消费掉了,好像没有创建多个对象来消费呀
截图只是一部分,其实全都是同一个对象
把多线程的代码截图一下呗
老师您好,已经发了代码,其实就是直接拿的老师git上的。
这个问题解决了吗,我也很好奇
还没有
package com.gx.kafkastudy.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; public class ConsumerThreadSample { private final static String TOPIC_NAME="jiangzh-topic"; /* 这种类型是经典模式,每一个线程单独创建一个KafkaConsumer,用于保证线程安全 */ public static void main(String[] args) throws InterruptedException { KafkaConsumerRunner r1 = new KafkaConsumerRunner(); Thread t1 = new Thread(r1); t1.start(); Thread.sleep(15000); r1.shutdown(); } public static class KafkaConsumerRunner implements Runnable{ private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer consumer; public KafkaConsumerRunner() { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.197:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props); TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0); TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1); consumer.assign(Arrays.asList(p0,p1)); } public void run() { try { while(!closed.get()) { //处理消息 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000)); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> pRecord = records.records(partition); // 处理每个分区的消息 for (ConsumerRecord<String, String> record : pRecord) { System.out.println(consumer.toString()); System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), record.value()); } // 返回去告诉kafka新的offset long lastOffset = pRecord.get(pRecord.size() - 1).offset(); // 注意加1 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } }catch(WakeupException e) { if(!closed.get()) { throw e; } }finally { consumer.close(); } } public void shutdown() { closed.set(true); consumer.wakeup(); } } }
这不是就一个线程跑么
登录后可查看更多问答,登录/注册
系统讲解Kafka,实战结合,让你成为使用Kafka的高手
1.1k 2
979 13
1.1k 10
2.2k 10
2.0k 8