采纳答案成功!
向帮助你的同学说点啥吧!感谢那些助人为乐的人
[main] DEBUG org.apache.kafka.clients.admin.KafkaAdminClient - [AdminClient clientId=adminclient-1] Queueing Call(callName=createTopics, deadlineMs=1587828993633) with a timeout 120000 ms from now.
就报这个问题,但是topic就是一直无法创建,请问老师是什么问题?
1.get的代码可以参考一下我这个
/** * 创建Topic */ public static void createTopic() throws Exception { AdminClient adminClient = adminClient(); // 副本因子 short replicationFactor = 1; NewTopic newTopic = new NewTopic(TOPIC_NAME, 1, replicationFactor); CreateTopicsResult result = adminClient.createTopics(Arrays.asList(newTopic)); System.out.println("CreateTopicResult : " + result.all().get()); }
2.如果你用的是虚拟机,确保你的2181和9092端口是开放的,或者可以先直接把防火墙关了
我看你说能查看到topic信息,那估计就是因为result是异步返回的,子线程还没执行完创建topic的操作主线程的退出了
System.out.println("CreateTopicsResult : "+ topics.all().get()); 这个不会超时,能有效创建topic
我也出现了类似的问题,是因为客户端还没来得及向Kafka发送创建Topic的信息就已经结束了,添加一行Thread.sleep后,问题解决:
/** * 创建topic */ public static void createTopic() throws ExecutionException, InterruptedException { AdminClient adminClient = adminClient(); // topic的名称 String name = "MyTopic3"; // partition数量 int numPartitions = 1; // 副本数量 short replicationFactor = 1; NewTopic topic = new NewTopic(name, numPartitions, replicationFactor); CreateTopicsResult result = adminClient.createTopics(List.of(topic)); // 避免客户端连接太快断开而导致Topic没有创建成功 Thread.sleep(500); // 获取topic设置的partition数量 System.out.println(result.numPartitions(name).get()); }
你这个sleep仍然有超时风险,所以并不是最佳方案。 System.out.println("CreateTopicsResult : "+ topics.all().get());这个是最佳方案,会阻塞等待,确保不管是延迟多久,都会创建topic成功。
我也遇到同样问题:可以获取到adminClient,但是无法创建topic,无法获取topic列表,加了get()一直timeout。应该是跟虚拟机里的kafka配置有关,同样的代码我换成自己申请的kafka云服务器就没问题。
是配置的问题吗?
看起来是创建超时或者链接超时, 确认下链接成功没有
但是能获取topic的信息,这个算是连接上了吗?
老师能否将详细一些,这不是个例,应该是大部分都有的问题。我看你在别的回复里面说get一下,不知道能否讲详细一些,或者把代码在git上更新一下,要不然我们这些刚入门的实在搞不明白,而且一直卡在这里也不是办法,谢谢老师了!
如果能获取列表就说明连接是没问题的, 你创建都是超过120秒以后才超时么?
登录后可查看更多问答,登录/注册
系统讲解Kafka,实战结合,让你成为使用Kafka的高手
1.2k 2
983 13
1.1k 10
2.2k 10
2.0k 8