请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

checkpoint问题

PK哥,我想做一个能记住上次结果的wordcount统计,比如不断的输入单词”a“,会出现(a,1), (a,2), (a,3) …,但是,当我关闭再重启它又从头统计了,又出现了(a,1),我想要接着关闭之前统计。我已经设置了checkpoint和state。代码如下:
val env = StreamExecutionEnvironment.getExecutionEnvironment

val topic = "test"
val properties = new Properties()
properties.setProperty(“bootstrap.servers”, “localhost:9092”)
properties.setProperty(“zookeeper.connect”, “localhost:2181”)
properties.setProperty(“group.id”, “test”)
val kafkaSource = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), properties)

val data = env.addSource(kafkaSource)

env.setStateBackend(new RocksDBStateBackend(“file:///Users/kewei.mao/app/tmp/checkpoint”))
env.enableCheckpointing(1000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
env.getCheckpointConfig.setCheckpointTimeout(60000)
env.getCheckpointConfig.setFailOnCheckpointingErrors(true)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

val counts: DataStream[(String, Int)] = data
.flatMap(.toLowerCase.split(“W+”))
.filter(
.nonEmpty)
.map((_, 1))
.keyBy(0)
.sum(1)

env.execute(“Streaming WordCount”)

正在回答 回答被采纳积分+3

2回答

liangyuchong_712 2019-11-26 17:47:42

楼主,想知道你重启job的时候也是用上面的代码吗?还是说使用flink run -s 这样的命令来重启作业,如果是在idea中直接重启的代码的话?有没有使用什么特殊的方法来指定重启的时候使用哪个checkpoint文件呀

0 回复 有任何疑惑可以回复我~
Michael_PK 2019-07-27 00:08:00

这种建议每次处理完保存起来,后续的可以进行累加

0 回复 有任何疑惑可以回复我~
问题已解决,确定采纳
还有疑问,暂不采纳
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号