请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

关于经典模式的疑问

老师您好,经典模式是创建多个消费者对象来进行消费,我对多线程这块了解不多,就打印了consumer的对象,但我发现所有的消息都是被同一个对象给消费掉了,好像没有创建多个对象来消费呀

https://img1.sycdn.imooc.com//szimg/5e97fef509c41b3106340149.jpg

https://img1.sycdn.imooc.com//szimg/5e97ff1a0956f30604180956.jpg

截图只是一部分,其实全都是同一个对象

正在回答

2回答

把多线程的代码截图一下呗

1 回复 有任何疑惑可以回复我~
  • 提问者 乃好 #1
    老师您好,已经发了代码,其实就是直接拿的老师git上的。
    回复 有任何疑惑可以回复我~ 2020-04-16 15:55:52
  • weixin_慕仰1047700 回复 提问者 乃好 #2
    这个问题解决了吗,我也很好奇
    回复 有任何疑惑可以回复我~ 2020-05-03 12:28:50
  • 提问者 乃好 回复 weixin_慕仰1047700 #3
    还没有
    回复 有任何疑惑可以回复我~ 2020-05-03 13:03:08
提问者 乃好 2020-04-16 15:56:17
package com.gx.kafkastudy.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

public class ConsumerThreadSample {
    private final static String TOPIC_NAME="jiangzh-topic";

    /*
        这种类型是经典模式,每一个线程单独创建一个KafkaConsumer,用于保证线程安全
     */
    public static void main(String[] args) throws InterruptedException {
        KafkaConsumerRunner r1 = new KafkaConsumerRunner();
        Thread t1 = new Thread(r1);

        t1.start();

        Thread.sleep(15000);

        r1.shutdown();
    }

    public static class KafkaConsumerRunner implements Runnable{
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaConsumer consumer;

        public KafkaConsumerRunner() {
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.1.197:9092");
            props.put("group.id", "test");
            props.put("enable.auto.commit", "false");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

            consumer = new KafkaConsumer<>(props);

            TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
            TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);

            consumer.assign(Arrays.asList(p0,p1));
        }


        public void run() {
            try {
                while(!closed.get()) {
                    //处理消息
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));

                    for (TopicPartition partition : records.partitions()) {
                        List<ConsumerRecord<String, String>> pRecord = records.records(partition);
                        // 处理每个分区的消息
                        for (ConsumerRecord<String, String> record : pRecord) {
                            System.out.println(consumer.toString());
                            System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                                    record.partition(),record.offset(), record.key(), record.value());
                        }

                        // 返回去告诉kafka新的offset
                        long lastOffset = pRecord.get(pRecord.size() - 1).offset();
                        // 注意加1
                        consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                    }

                }
            }catch(WakeupException e) {
                if(!closed.get()) {
                    throw e;
                }
            }finally {
                consumer.close();
            }
        }

        public void shutdown() {
            closed.set(true);
            consumer.wakeup();
        }
    }

}


1 回复 有任何疑惑可以回复我~
  • Allen #1
    这不是就一个线程跑么
    回复 有任何疑惑可以回复我~ 2020-05-03 23:32:58
问题已解决,确定采纳
还有疑问,暂不采纳
意见反馈 帮助中心 APP下载
官方微信