采纳答案成功!
向帮助你的同学说点啥吧!感谢那些助人为乐的人
老师请问下,如果自定义partition,调用异步阻塞这样的方式,就会出错,会出现死循环;如果是异步+callback则不会,请问下这里面的逻辑是为什么呢?
老哥,代码贴出来看看
就是producer异步阻塞这么一个调用,代码贴在前面了
如果producer.send()是异步+callback(如下)就没问题,但我想不通为什么异步阻塞就会出问题。 producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { System.out.println("partition: "+ recordMetadata.partition()+" , offset: "+ recordMetadata.offset()); } });
com.kafka.demo.producer.PartitionSample 这个类看下,为什么会出现死循环,老师视频也出现了死循环,因为他自定义的Partition 类的逻辑处理本身就是死循环,kafka在对Partition进行处理会调用这个类
public static void asynchronousSendWithCustomizedPartition() throws ExecutionException, InterruptedException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.151.80:9092"); properties.put(ProducerConfig.ACKS_CONFIG, "all"); properties.put(ProducerConfig.RETRIES_CONFIG, "0"); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); properties.put(ProducerConfig.LINGER_MS_CONFIG, "1"); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, " 33554432"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.kafka.demo.producer.PartitionSample"); //main object Producer<String, String> producer = new KafkaProducer<String, String>(properties); //message object - ProducerRecord for(int i=0; i<10; i++){ ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-"+i, "value-"+i); Future<RecordMetadata> send = producer.send(record); RecordMetadata recordMetadata = send.get(); System.out.println("key-"+i+", "+"partition: "+ recordMetadata.partition()+" , offset: "+ recordMetadata.offset()); } //close producer producer.close(); }
登录后可查看更多问答,登录/注册
系统讲解Kafka,实战结合,让你成为使用Kafka的高手
1.1k 2
979 13
1.1k 10
2.2k 10
2.0k 8