采纳答案成功!
向帮助你的同学说点啥吧!感谢那些助人为乐的人
[main] DEBUG org.apache.kafka.clients.admin.KafkaAdminClient - [AdminClient clientId=adminclient-1] Queueing Call(callName=createTopics, deadlineMs=1587828993633) with a timeout 120000 ms from now.
就报这个问题,但是topic就是一直无法创建,请问老师是什么问题?
1.get的代码可以参考一下我这个
/**
* 创建Topic
*/
public
static
void
createTopic()
throws
Exception {
AdminClient adminClient = adminClient();
// 副本因子
short
replicationFactor =
1
;
NewTopic newTopic =
new
NewTopic(TOPIC_NAME,
, replicationFactor);
CreateTopicsResult result = adminClient.createTopics(Arrays.asList(newTopic));
System.out.println(
"CreateTopicResult : "
+ result.all().get());
}
2.如果你用的是虚拟机,确保你的2181和9092端口是开放的,或者可以先直接把防火墙关了
我看你说能查看到topic信息,那估计就是因为result是异步返回的,子线程还没执行完创建topic的操作主线程的退出了
System.out.println("CreateTopicsResult : "+ topics.all().get()); 这个不会超时,能有效创建topic
我也出现了类似的问题,是因为客户端还没来得及向Kafka发送创建Topic的信息就已经结束了,添加一行Thread.sleep后,问题解决:
* 创建topic
ExecutionException, InterruptedException {
// topic的名称
String name =
"MyTopic3"
// partition数量
int
numPartitions =
// 副本数量
NewTopic topic =
NewTopic(name, numPartitions, replicationFactor);
CreateTopicsResult result = adminClient.createTopics(List.of(topic));
// 避免客户端连接太快断开而导致Topic没有创建成功
Thread.sleep(
500
);
// 获取topic设置的partition数量
System.out.println(result.numPartitions(name).get());
你这个sleep仍然有超时风险,所以并不是最佳方案。 System.out.println("CreateTopicsResult : "+ topics.all().get());这个是最佳方案,会阻塞等待,确保不管是延迟多久,都会创建topic成功。
我也遇到同样问题:可以获取到adminClient,但是无法创建topic,无法获取topic列表,加了get()一直timeout。应该是跟虚拟机里的kafka配置有关,同样的代码我换成自己申请的kafka云服务器就没问题。
是配置的问题吗?
看起来是创建超时或者链接超时, 确认下链接成功没有
但是能获取topic的信息,这个算是连接上了吗?
老师能否将详细一些,这不是个例,应该是大部分都有的问题。我看你在别的回复里面说get一下,不知道能否讲详细一些,或者把代码在git上更新一下,要不然我们这些刚入门的实在搞不明白,而且一直卡在这里也不是办法,谢谢老师了!
如果能获取列表就说明连接是没问题的, 你创建都是超过120秒以后才超时么?
登录后可查看更多问答,登录/注册
系统讲解Kafka,实战结合,让你成为使用Kafka的高手
1.2k 2
1.0k 13
1.2k 10
2.3k 10
2.1k 8
购课补贴联系客服咨询优惠详情
慕课网APP您的移动学习伙伴
扫描二维码关注慕课网微信公众号