请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

调用2次foreach方法分别保存到redis的2个key时,只有第2次保存成功

pk老师您好,调用2次foreach方法,分别保存offsets和统计结果表到redis时,第1次foreach并没有将结果保存的到redis中,编译和运行时都不报错。是不是方法用的方式不正确?还是foreach不支持同时保存2张表的操作,必须用foreachbatch来实现?
代码如下:

object SSSOffsetsApp {

def main(args: Array[String]): Unit = {
Logger.getLogger(this.getClass.getSimpleName).setLevel(Level.ERROR)
val spark = SparkSession.builder()
.master(“local[4]”)
.appName(this.getClass.getName)
.config(“spark.sql.shuffle.partitions”,“10”)
.getOrCreate()

	val topic = "access-topic-prod"
val groupid = ""
val startingOffsets: String = RedisUtils.getJsonOffsets(topic, groupid)


import spark.implicits._
val frame: DataFrame = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "MacBookAir.local:9092,MacBookAir.local:9093,MacBookAir.local:9094")
  .option("subscribe", "access-topic-prod")
  .option("startingOffsets", startingOffsets)
  .load()
  
  storeOffsets(frame, spark)
  save2DB(frame, spark)

}

/**

  • 保存offsets
  • @param dataFrame
  • @param spark
    */
    def storeOffsets(dataFrame:DataFrame,spark:SparkSession){
    import spark.implicits._
    dataFrame.selectExpr(“topic”,“partition”,“offset”)//注释掉该行,获取包含offset的全部信息,根据已有offset,合理设置startingOffsets中的值。
    .as[(String,Int,Long)]
    .map(x => {
    val topic = x._1
    val partition = x._2
    val offset = x._3
    //topic,partition,offset
    (topic,partition,offset)
    })
    .toDF(“topic”,“partition”,“offset”)
    .writeStream
    /*输出到redis/
    .outputMode(OutputMode.Update())
    .foreach(new RedisOffsetsForeachWriter())
    .start()
    .awaitTermination()

}

/**

  • 保存统计结果
  • @param dataFrame
  • @param spark
    */
    def save2DB(dataFrame:DataFrame,spark:SparkSession){
    import spark.implicits._
    dataFrame.selectExpr(“cast (value as String)”)
    .as[String]
    .map(x => {
    val splits = x.split("\t")
    val time = splits(0)
    val ip = splits(2)
    //eventTime,日期,省份
    (DateUtils.parseToTimestamp(time),DateUtils.parseToDay(time),IPUtils.parseIP(ip))
    })
    .toDF(“ts”,“day”,“province”)
    .withWatermark(“ts” ,“10 minutes”)
    .groupBy(“day”,“province”)
    .count()
    .writeStream
    /*输出到redis/
    .outputMode(OutputMode.Update())
    .foreach(new RedisDBForeachWriter())
    .start()
    .awaitTermination()
    }
    }

class RedisOffsetsForeachWriter extends ForeachWriter[Row] {
var client:Jedis = _
override def open(partitionId: Long, epochId: Long): Boolean = {
println(s"打开connection:partitionId,partitionId,partitionId,epochId")
client = RedisUtils.getJedisClient()
client != null
}
override def process(value: Row): Unit = {
//从Row中获取offsets,然后保存到redis中
val topic = value.getAsString
val partition = value.getAsInt
val offset = value.getAsLong
client.hset(topic,partition+"",offset+"")
println(s"topic:topic,partition:topic,partition:topic,partition:partition,offset:$offset")
}
override def close(errorOrNull: Throwable): Unit = {
if(null != client){
RedisUtils.returnResource(client)
}
}
}

class RedisDBForeachWriter extends ForeachWriter[Row] {
var client:Jedis = _
override def open(partitionId: Long, epochId: Long): Boolean = {
client = RedisUtils.getJedisClient()
client != null
}
override def process(value: Row): Unit = {
val day = value.getString(0)
val province = value.getString(1)
val cnts = value.getLong(2)
client.hset(“day-province-cnts-”+day, province,cnts+"")
println(s"day:day,province:day,province:day,province:province,cnts:$cnts")
}
override def close(errorOrNull: Throwable): Unit = {
if(null != client){
RedisUtils.returnResource(client)
}
}
}

正在回答 回答被采纳积分+3

1回答

Michael_PK 2020-10-29 12:52:13

第一次报什么错?Redis是覆盖的,第二次会覆盖前面的吧

0 回复 有任何疑惑可以回复我~
  • 提问者 qq_梦也_1 #1
    第1次并没有报错信息。我在每次foreach后都调用了一次start,感觉好像不对。
    回复 有任何疑惑可以回复我~ 2020-10-29 12:55:21
  • Michael_PK 回复 提问者 qq_梦也_1 #2
    很简单的测试方案,第二次的屏蔽掉,你就知道第一次是否OK了,如果OK了,你再打开第二个,直接debug走一遍,确定数据还不是被覆盖就知道了
    回复 有任何疑惑可以回复我~ 2020-10-29 12:57:35
  • 提问者 qq_梦也_1 回复 Michael_PK #3
    单独执行,2个都没问题。单个成功后才整合到一起运行的。
    回复 有任何疑惑可以回复我~ 2020-10-29 13:00:29
问题已解决,确定采纳
还有疑问,暂不采纳
意见反馈 帮助中心 APP下载
官方微信