/**
* 该 watermark 生成器可以覆盖的场景是:数据源在一定程度上乱序。
* 即某个最新到达的时间戳为 t 的元素将在最早到达的时间戳为 t 的元素之后最多 n 毫秒到达。
*/
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
val maxOutOfOrderness = 3500L // 3.5 秒
var currentMaxTimestamp: Long = _
override def onEvent(element: MyEvent, eventTimestamp: Long): Unit = {
currentMaxTimestamp = max(eventTimestamp, currentMaxTimestamp)
}
override def onPeriodicEmit(): Unit = {
// 发出的 watermark = 当前最大时间戳 - 最大乱序时间
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}
}
官方文档中给出了一个自定义watermark生成器的方法,请问这个要怎么调用呢?
input.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessGenerator)
会报错Cannot resolve overloaded method ‘assignTimestampsAndWatermarks’