使用了1.9版本的Flink模板来创建项目的。
根据本课学习的内容,运行代码如下:
public static void main(String[] args) throws Exception{
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> input = env.readTextFile("file:///Users/jeff/projects/study/flink-train-java/test.txt");
input.print();
input.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] tokens = s.toLowerCase().split(" ");
for (String token: tokens){
if (token.length() > 0) {
collector.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}).groupBy(0).sum(1).print();
env.execute("Flink Batch Java API Skeleton");
}
执行该Main函数时,并没有输出想要的结果,并且程序不会自动停止,而是处于一个运行着的状态。
翻了一下日志,有报一个错:
00:42:34,972 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource (at main(BatchWCJavaApp.java:12) (org.apache.flink.api.java.io.TextInputFormat)) (1/8) (98a5cbecb493ed76e173902afdfaad42) switched from CREATED to SCHEDULED.
00:42:34,980 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{858aa7d798c6f34199c63394476feacd}]
如何解决这个问題?