请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

使用foreach方法保存offset相关信息时,offset相关信息必须作为groupby参数传递,统计结果表不能聚合

pk哥你好,
期望的结果如下:
①统计结果表含有 日期,省份,数量 3列
②offset单独保存到redis中。

按照视频中老师的讲解,试着保存offset信息。
grouby(日期,省份,数量)
使用foreach方法,将offset保存的redis过程中,ForeachWriter的process(value: Row)方法中的value只能获取到grouby的3个参数列。如果获取offset相关信息,需要把topic,partition,offset 作为参数传入到grouby中,即grouby(日期,省份,数量,topic,partition,offset),这样就导致统计结果不能按日期,省份聚合了。我试了一下,foreach方式好像不支持多个sink(本想在grouby(日期,省份,数量) 之前用1个writeStream输出offset信息,再用1个writeStream输出统计结果信息,但编译时就报错了)。所以想问下老师,offset相关的信息,是否能不作为grouby的参数,传到ForeachWriter的process(value: Row)方法中的? 或是此处需要使用ForeachBatch方法来实现?希望老师指点一下。

正在回答 回答被采纳积分+3

2回答

夜愿小夜 2022-01-14 14:11:32

同学这个问题解决了嘛,同样问题碰到了!

0 回复 有任何疑惑可以回复我~
  • 从其他同学那边看到了解决办法:
    frame做两次foreach,最后一次foreach之后调用awaitTermination,前一次不要调用,避免阻止。实际操作可行。
    回复 有任何疑惑可以回复我~ 2022-01-14 14:34:59
Michael_PK 2020-10-28 22:54:58

不太对啊,offset没有聚合一说的。总体的思路是,offset获得之后和业务结果都要进行存储。两者要么是事务性的,要么结果是幂等性才能保证结果正确性

0 回复 有任何疑惑可以回复我~
  • 提问者 qq_梦也_1 #1
    主要代码如下,老师看看哪儿的问题:
      .option("startingOffsets", """{"access-topic-prod":{"0":149}}""")
    	      .load()
    	      .selectExpr("cast (value as String)","topic","partition","offset")
    	      .as[(String,String,Int,Long)]
    	      .map(x => {
    	        val splits = x._1.split("\t")
    	        val time = splits(0)
    	        val ip = splits(2)
    	        val topic = x._2
    	        val partition = x._3
    	        val offset = x._4
    	        //eventTime,日期,省份,topic,partition,offset
    	    (DateUtils.parseToTimestamp(time),DateUtils.parseToDay(time),IPUtils.parseIP(ip),topic,partition,offset)
    	      })
    	      .toDF("ts","day","province","topic","partition","offset")
    	      .withWatermark("ts"  ,"10 minutes")
    	      .groupBy("day","province","topic","partition","offset")
    	      .count()
    	      .writeStream
    	      .outputMode(OutputMode.Update())
    	      .foreach(new RedisOffsetsForeachWriter())
    回复 有任何疑惑可以回复我~ 2020-10-28 23:11:44
  • 提问者 qq_梦也_1 #2
    class RedisOffsetsForeachWriter extends ForeachWriter[Row] {
    ...
     override def process(value: Row): Unit = {
        val day = value.getString(0)
        val province = value.getString(1)
        val cnts = value.getLong(2)
        //从Row中获取offsets,然后保存到redis中
         val topic = value.getAs[String]("topic")
         val partition = value.getAs[Int]("partition")
         val offset = value.getAs[Long]("offset")
        client.hset("day-province-cnts-"+day, province,cnts+"")
        client.hset(topic,partition+"",offset+"")
      }
    ...
    }
    回复 有任何疑惑可以回复我~ 2020-10-28 23:13:14
  • Michael_PK 回复 提问者 qq_梦也_1 #3
    这段代码没看出来你的offset存储到哪里去了?
    回复 有任何疑惑可以回复我~ 2020-10-28 23:13:27
问题已解决,确定采纳
还有疑问,暂不采纳
意见反馈 帮助中心 APP下载
官方微信