请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

自定义partition调用异步阻塞这样的方式

老师请问下,如果自定义partition,调用异步阻塞这样的方式,就会出错,会出现死循环;如果是异步+callback则不会,请问下这里面的逻辑是为什么呢?

正在回答 回答被采纳积分+3

2回答

小麻雀呀 2020-04-30 15:54:03

老哥,代码贴出来看看

1 回复 有任何疑惑可以回复我~
  • 提问者 weixin_慕仰1047700 #1
    就是producer异步阻塞这么一个调用,代码贴在前面了
    回复 有任何疑惑可以回复我~ 2020-04-30 16:00:50
  • 提问者 weixin_慕仰1047700 #2
    如果producer.send()是异步+callback(如下)就没问题,但我想不通为什么异步阻塞就会出问题。
    
    producer.send(record, new Callback() {                                                                          
        @Override                                                                                                   
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {                                      
            System.out.println("partition: "+ recordMetadata.partition()+" , offset: "+ recordMetadata.offset());   
        }                                                                                                           
    });
    回复 有任何疑惑可以回复我~ 2020-04-30 16:02:22
  • 小麻雀呀 回复 提问者 weixin_慕仰1047700 #3
    com.kafka.demo.producer.PartitionSample 这个类看下,为什么会出现死循环,老师视频也出现了死循环,因为他自定义的Partition 类的逻辑处理本身就是死循环,kafka在对Partition进行处理会调用这个类
    回复 有任何疑惑可以回复我~ 2020-04-30 19:30:40
提问者 weixin_慕仰1047700 2020-04-30 16:00:00

public static void asynchronousSendWithCustomizedPartition() throws ExecutionException, InterruptedException {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.151.80:9092");
    properties.put(ProducerConfig.ACKS_CONFIG, "all");
    properties.put(ProducerConfig.RETRIES_CONFIG, "0");
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
    properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, " 33554432");

    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.kafka.demo.producer.PartitionSample");

    //main object
    Producer<String, String> producer = new KafkaProducer<String, String>(properties);

    //message object - ProducerRecord
    for(int i=0; i<10; i++){
        ProducerRecord<String, String> record
                = new ProducerRecord<>(TOPIC_NAME, "key-"+i, "value-"+i);

        Future<RecordMetadata> send = producer.send(record);
        RecordMetadata recordMetadata = send.get();
        System.out.println("key-"+i+", "+"partition: "+ recordMetadata.partition()+" , offset: "+             recordMetadata.offset());     }

    //close producer
    producer.close();
}

0 回复 有任何疑惑可以回复我~
问题已解决,确定采纳
还有疑问,暂不采纳
意见反馈 帮助中心 APP下载
官方微信