老师,我用的spark 2.4.4,Scala是2.12,之前的networkWordCount,是可以运行的。但是到了状态更新这个就有问题了。
代码为:
object UpdateByKeyWC {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("UpdateByKeyWC").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(10))
val ds = ssc.socketTextStream("localhost", 6789)
val value = ds.flatMap(_.split(" ")).map((_, 1))
val wc = value.updateStateByKey[Int](updateFunction _)
wc.print()
ssc.start()
ssc.awaitTermination()
}
def updateFunction(currValues: Seq[Int], oldValues: Option[Int]): Option[Int] = {
val currCount = currValues.sum
val oldCount = oldValues.getOrElse(0)
Some(currCount + oldCount)
}
}
提示出错的那一行就是val wc那一行。在网上找了一下,他们的任务没有序列化都是SparkContext,在上面加@Transisent注解就好了,我试着把这个注解加到了所有可以加的地方,不行。而且我这个好像报的是我这个类没有序列化。
现在的代码还没有到checkpoint的那一步。
异常显示为:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.streaming.dstream.PairDStreamFunctions.$anonfun$updateStateByKey$3(PairDStreamFunctions.scala:436)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:699)
at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:265)
at org.apache.spark.streaming.dstream.PairDStreamFunctions.updateStateByKey(PairDStreamFunctions.scala:435)
at org.apache.spark.streaming.dstream.PairDStreamFunctions.$anonfun$updateStateByKey$1(PairDStreamFunctions.scala:401)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:699)
at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:265)
at org.apache.spark.streaming.dstream.PairDStreamFunctions.updateStateByKey(PairDStreamFunctions.scala:401)
at com.lb.spark.UpdateByKeyWC$.main(UpdateByKeyWC.scala:23)
at com.lb.spark.UpdateByKeyWC.main(UpdateByKeyWC.scala)
Caused by: java.io.NotSerializableException: com.lb.spark.UpdateByKeyWC$
Serialization stack:
- object not serializable (class: com.lb.spark.UpdateByKeyWC$, value: com.lb.spark.UpdateByKeyWC$@70abf9b0)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class com.lb.spark.UpdateByKeyWC$, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic com/lb/spark/UpdateByKeyWC$.$anonfun$main$3:(Lcom/lb/spark/UpdateByKeyWC$;Lscala/collection/Seq;Lscala/Option;)Lscala/Option;, instantiatedMethodType=(Lscala/collection/Seq;Lscala/Option;)Lscala/Option;, numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class com.lb.spark.UpdateByKeyWC$$$Lambda$568/78377968, com.lb.spark.UpdateByKeyWC$$$Lambda$568/78377968@398474a2)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 17 more