pk老师您好,调用2次foreach方法,分别保存offsets和统计结果表到redis时,第1次foreach并没有将结果保存的到redis中,编译和运行时都不报错。是不是方法用的方式不正确?还是foreach不支持同时保存2张表的操作,必须用foreachbatch来实现?
代码如下:
object SSSOffsetsApp {
def main(args: Array[String]): Unit = {
Logger.getLogger(this.getClass.getSimpleName).setLevel(Level.ERROR)
val spark = SparkSession.builder()
.master(“local[4]”)
.appName(this.getClass.getName)
.config(“spark.sql.shuffle.partitions”,“10”)
.getOrCreate()
val topic = "access-topic-prod"
val groupid = ""
val startingOffsets: String = RedisUtils.getJsonOffsets(topic, groupid)
import spark.implicits._
val frame: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "MacBookAir.local:9092,MacBookAir.local:9093,MacBookAir.local:9094")
.option("subscribe", "access-topic-prod")
.option("startingOffsets", startingOffsets)
.load()
storeOffsets(frame, spark)
save2DB(frame, spark)
}
/**
}
/**
class RedisOffsetsForeachWriter extends ForeachWriter[Row] {
var client:Jedis = _
override def open(partitionId: Long, epochId: Long): Boolean = {
println(s"打开connection:partitionId,partitionId,partitionId,epochId")
client = RedisUtils.getJedisClient()
client != null
}
override def process(value: Row): Unit = {
//从Row中获取offsets,然后保存到redis中
val topic = value.getAsString
val partition = value.getAsInt
val offset = value.getAsLong
client.hset(topic,partition+"",offset+"")
println(s"topic:topic,partition:topic,partition:topic,partition:partition,offset:$offset")
}
override def close(errorOrNull: Throwable): Unit = {
if(null != client){
RedisUtils.returnResource(client)
}
}
}
class RedisDBForeachWriter extends ForeachWriter[Row] {
var client:Jedis = _
override def open(partitionId: Long, epochId: Long): Boolean = {
client = RedisUtils.getJedisClient()
client != null
}
override def process(value: Row): Unit = {
val day = value.getString(0)
val province = value.getString(1)
val cnts = value.getLong(2)
client.hset(“day-province-cnts-”+day, province,cnts+"")
println(s"day:day,province:day,province:day,province:province,cnts:$cnts")
}
override def close(errorOrNull: Throwable): Unit = {
if(null != client){
RedisUtils.returnResource(client)
}
}
}