请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

关于经典模式的疑问

老师您好,经典模式是创建多个消费者对象来进行消费,我对多线程这块了解不多,就打印了consumer的对象,但我发现所有的消息都是被同一个对象给消费掉了,好像没有创建多个对象来消费呀

https://img1.sycdn.imooc.com//szimg/5e97fef509c41b3106340149.jpg

https://img1.sycdn.imooc.com//szimg/5e97ff1a0956f30604180956.jpg

截图只是一部分,其实全都是同一个对象

正在回答

插入代码

2回答

把多线程的代码截图一下呗

1 回复 有任何疑惑可以回复我~
  • 提问者 乃好 #1
    老师您好,已经发了代码,其实就是直接拿的老师git上的。
    回复 有任何疑惑可以回复我~ 2020-04-16 15:55:52
  • weixin_慕仰1047700 回复 提问者 乃好 #2
    这个问题解决了吗,我也很好奇
    回复 有任何疑惑可以回复我~ 2020-05-03 12:28:50
  • 提问者 乃好 回复 weixin_慕仰1047700 #3
    还没有
    回复 有任何疑惑可以回复我~ 2020-05-03 13:03:08
提问者 乃好 2020-04-16 15:56:17
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package com.gx.kafkastudy.consumer;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
 
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
 
public class ConsumerThreadSample {
    private final static String TOPIC_NAME="jiangzh-topic";
 
    /*
        这种类型是经典模式,每一个线程单独创建一个KafkaConsumer,用于保证线程安全
     */
    public static void main(String[] args) throws InterruptedException {
        KafkaConsumerRunner r1 = new KafkaConsumerRunner();
        Thread t1 = new Thread(r1);
 
        t1.start();
 
        Thread.sleep(15000);
 
        r1.shutdown();
    }
 
    public static class KafkaConsumerRunner implements Runnable{
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaConsumer consumer;
 
        public KafkaConsumerRunner() {
            Properties props = new Properties();
            props.put("bootstrap.servers""192.168.1.197:9092");
            props.put("group.id""test");
            props.put("enable.auto.commit""false");
            props.put("auto.commit.interval.ms""1000");
            props.put("session.timeout.ms""30000");
            props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
 
            consumer = new KafkaConsumer<>(props);
 
            TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
            TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);
 
            consumer.assign(Arrays.asList(p0,p1));
        }
 
 
        public void run() {
            try {
                while(!closed.get()) {
                    //处理消息
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
 
                    for (TopicPartition partition : records.partitions()) {
                        List<ConsumerRecord<String, String>> pRecord = records.records(partition);
                        // 处理每个分区的消息
                        for (ConsumerRecord<String, String> record : pRecord) {
                            System.out.println(consumer.toString());
                            System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                                    record.partition(),record.offset(), record.key(), record.value());
                        }
 
                        // 返回去告诉kafka新的offset
                        long lastOffset = pRecord.get(pRecord.size() - 1).offset();
                        // 注意加1
                        consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                    }
 
                }
            }catch(WakeupException e) {
                if(!closed.get()) {
                    throw e;
                }
            }finally {
                consumer.close();
            }
        }
 
        public void shutdown() {
            closed.set(true);
            consumer.wakeup();
        }
    }
 
}


1 回复 有任何疑惑可以回复我~
  • Allen #1
    这不是就一个线程跑么
    回复 有任何疑惑可以回复我~ 2020-05-03 23:32:58
问题已解决,确定采纳
还有疑问,暂不采纳
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号