type mismatch;
found : org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
required: org.apache.flink.streaming.api.windowing.assigners.WindowAssigner[_ >: Int, ?]
Note: Object <: Any (and org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows <: org.apache.flink.streaming.api.windowing.assigners.WindowAssigner[Object,org.apache.flink.streaming.api.windowing.windows.TimeWindow]), but Java-defined class WindowAssigner is invariant in type T.
You may wish to investigate a wildcard type such as _ <: Any. (SLS 3.2.10)
}).windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
package com.hnyd.flink.window
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.assigners.{TumblingProcessingTimeWindows, WindowAssigner}
import org.apache.flink.streaming.api.windowing.time.Time
object WindowApp {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment()
test01(env)
env.execute("windowapp")
}
def test01(env:StreamExecutionEnvironment): Unit ={
// env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //原来的API需要设定,否则会报错
// val source: DataStream[String] = env.socketTextStream(“localhost”, 9527)
// source.map(new MapFunction[String,Int]{
// override def map(value: String): Int = {
// Integer.parseInt(value)
// }
// }).timeWindowAll(Time.seconds(5))
// .sum(0).print()
val source: DataStream[String] = env.socketTextStream("localhost", 9527)
source.map(new MapFunction[String,Int]{
override def map(value: String): Int = {
Integer.parseInt(value)
}
}).windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(5)))
}
}
已经在做大数据,Flink助力轻松提薪;尚未入行,让你弯道超车
了解课程