请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

IDEA运行时报错 Reconnect due to socket error:

麻烦老师帮忙看一下,谢谢。

我用的虚拟机,kafka在虚拟机里面,检查防火墙都已经关闭,本机用SecureCRT可以连接。在本机用 telnet 192.168.45.140 9092  命令测试时黑屏,网络应该没有问题。在shell里执行消费,可以接收到生产者的消息。在IDEA里run时报错如下。


18/07/18 20:02:52 INFO VerifiableProperties: Property zookeeper.connect is overridden to 

18/07/18 20:02:53 INFO SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException

Exception in thread "main" org.apache.spark.SparkException: java.nio.channels.ClosedChannelException

org.apache.spark.SparkException: Couldn't find leader offsets for Set([app_topic,0])

at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:385)

at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:385)

at scala.util.Either.fold(Either.scala:98)

at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:384)

at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)

at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)

at Log_From_kafka$.main(Log_From_kafka.scala:17)

at Log_From_kafka.main(Log_From_kafka.scala)



scala代码如下

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Log_From_kafka {
 def main(args: Array[String]): Unit = {
   val brokers="192.168.45.140:9092"
   val topics="app_topic"

   val sparkConf = new SparkConf().setAppName("Log_From_kafka").setMaster("local[2]")
   val ssc = new StreamingContext(sparkConf,Seconds(5))

   val topicsSet = topics.split(",").toSet
   val kafkaParams = Map[String,String]("metadata.broker.list"->brokers)

   val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicsSet)

   messages.map(_._2).print()

   ssc.start()
   ssc.awaitTermination()
 }

}



pom.xml 内容如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>

   <groupId>com.newtouch.jianx</groupId>
   <artifactId>Logcj</artifactId>
   <version>1.0</version>

   <properties>
       <scala.version>2.11.8</scala.version>
       <kafka.version>0.9.0.0</kafka.version>
       <spark.version>2.2.0</spark.version>
   </properties>

   <dependencies>
       <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-streaming_2.11</artifactId>
           <version>${spark.version}</version>
       </dependency>
       <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
           <version>${spark.version}</version>
       </dependency>
   </dependencies>
</project>

正在回答 回答被采纳积分+3

1回答

Michael_PK 2018-07-18 21:18:56

Couldn't find leader offsets for Set([app_topic,0]),看似应该kafka有问题,leader找不到,你重新创建一个topic再进行测试下

0 回复 有任何疑惑可以回复我~
  • 提问者 糊涂一次 #1
    谢谢老师。我已经找到问题了。在本机hosts文件加上 192.168.45.140 zhanghjhost.com.cn    之前加的是 192.168.45.140 zhanghjhost   不行。必须要有.com.cn 。已经反复测试重现问题了。
    回复 有任何疑惑可以回复我~ 2018-07-18 22:11:09
  • Michael_PK 回复 提问者 糊涂一次 #2
    对,必须一致
    回复 有任何疑惑可以回复我~ 2018-07-18 22:55:07
  • 提问者 糊涂一次 #3
    恩   反复测试。服务器上hosts文件加的  192.168.45.140 zhanghjhost   在本地hosts里也要一样加上。两边都不加.com.cn 也可以。
    回复 有任何疑惑可以回复我~ 2018-07-18 23:17:31
问题已解决,确定采纳
还有疑问,暂不采纳
意见反馈 帮助中心 APP下载
官方微信