描述:
开启了consumer
[hadoop@spark000 ~]$ kafka-console-consumer.sh --bootstrap-server spark000:9092,spark000:9093,spark000:9094 --topic zhang-replicated-topic
开启了flume
[hadoop@spark000 config]$ flume-ng agent \
> --name a1 \
> --conf $FLUME_HOME/conf \
> --conf-file $FLUME_HOME/config/access-kafka.conf \
> -Dflume.root.logger=INFO,console
[hadoop@spark000 ~]$ jps -m
9408 Kafka /home/hadoop/app/kafka_2.12-2.5.0/config/server-zhang1.properties
11712 Jps -m
9829 Kafka /home/hadoop/app/kafka_2.12-2.5.0/config/server-zhang2.properties
7656 NodeManager
9928 Master --host spark000 --port 7077 --webui-port 8080
6795 NameNode
9003 Kafka /home/hadoop/app/kafka_2.12-2.5.0/config/server-zhang0.properties
10795 Application --name a1 --conf-file /home/hadoop/app/apache-flume-1.6.0-cdh5.16.2-bin/config/access-kafka.conf
6956 DataNode
7342 ResourceManager
8366 jar
7151 SecondaryNameNode
11288 ConsoleConsumer --bootstrap-server spark000:9092,spark000:9093,spark000:9094 --topic zhang-replicated-topic
8633 QuorumPeerMain /home/hadoop/app/zookeeper-3.4.5-cdh5.16.2/bin/../conf/zoo.cfg
consumer接收数据没问题
SSSApp.scala报错如下:
Driver stacktrace:
22/04/02 15:02:19 INFO DAGScheduler: Job 1 failed: start at SSSApp.scala:40, took 0.414385 s
22/04/02 15:02:19 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@36dbeac9 is aborting.
22/04/02 15:02:19 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@36dbeac9 aborted.
22/04/02 15:02:19 ERROR MicroBatchExecution: Query [id = 45823615-2b27-4ba1-8325-015d1e45ac8d, runId = 0625451e-0790-4865-b999-b46fd8c7434f] terminated with error
org.apache.spark.SparkException: Writing job aborted.
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, 192.168.131.1, executor driver): java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.String
Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.String
我觉得是kafka的数据没有传递到结构化流里,但没有意识到缺少了什么。
是consumer不应该开启吗?
但不开启consumer,结构化流就直接从kafka里获取数据了?