麻烦老师帮忙看一下,谢谢。
我用的虚拟机,kafka在虚拟机里面,检查防火墙都已经关闭,本机用SecureCRT可以连接。在本机用 telnet 192.168.45.140 9092 命令测试时黑屏,网络应该没有问题。在shell里执行消费,可以接收到生产者的消息。在IDEA里run时报错如下。
18/07/18 20:02:52 INFO VerifiableProperties: Property zookeeper.connect is overridden to
18/07/18 20:02:53 INFO SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException
Exception in thread "main" org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for Set([app_topic,0])
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:385)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:385)
at scala.util.Either.fold(Either.scala:98)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:384)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at Log_From_kafka$.main(Log_From_kafka.scala:17)
at Log_From_kafka.main(Log_From_kafka.scala)
scala代码如下
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Log_From_kafka {
def main(args: Array[String]): Unit = {
val brokers="192.168.45.140:9092"
val topics="app_topic"
val sparkConf = new SparkConf().setAppName("Log_From_kafka").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf,Seconds(5))
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String,String]("metadata.broker.list"->brokers)
val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicsSet)
messages.map(_._2).print()
ssc.start()
ssc.awaitTermination()
}
}
pom.xml 内容如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.newtouch.jianx</groupId>
<artifactId>Logcj</artifactId>
<version>1.0</version>
<properties>
<scala.version>2.11.8</scala.version>
<kafka.version>0.9.0.0</kafka.version>
<spark.version>2.2.0</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
</project>