请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

自定义watermark生成器如何使用

/**
 * 该 watermark 生成器可以覆盖的场景是:数据源在一定程度上乱序。
 * 即某个最新到达的时间戳为 t 的元素将在最早到达的时间戳为 t 的元素之后最多 n 毫秒到达。
 */
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

    val maxOutOfOrderness = 3500L // 3.5 秒

    var currentMaxTimestamp: Long = _

    override def onEvent(element: MyEvent, eventTimestamp: Long): Unit = {
        currentMaxTimestamp = max(eventTimestamp, currentMaxTimestamp)
    }

    override def onPeriodicEmit(): Unit = {
        // 发出的 watermark = 当前最大时间戳 - 最大乱序时间
        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
    }
}

官方文档中给出了一个自定义watermark生成器的方法,请问这个要怎么调用呢?

input.assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessGenerator)

会报错Cannot resolve overloaded method ‘assignTimestampsAndWatermarks’

正在回答 回答被采纳积分+3

1回答

Michael_PK 2021-04-30 00:36:41

你要看看入参传什么呢,你的代码里面java new东西都不需要括号的吗?官网上是给参考的,具体的还是要适当调整的

0 回复 有任何疑惑可以回复我~
  • 提问者 bking3629688 #1
    就是官网给了个自定义watermark生成器,但是不知道该怎么调用- -,搜了一遍都没找到合适的答案
    回复 有任何疑惑可以回复我~ 2021-04-30 01:00:02
  • Michael_PK 回复 提问者 bking3629688 #2
    你这代码idea不报错吗
    回复 有任何疑惑可以回复我~ 2021-04-30 01:03:47
问题已解决,确定采纳
还有疑问,暂不采纳
意见反馈 帮助中心 APP下载
官方微信