public class BatchWCJavaApp {
public static void main(String[] args) throws Exception {
String input = "file:///home/input";
// step1:获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// step2:read data
DataSource<String> text = env.readTextFile(input);
// step3: transform
text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] tokens = value.toLowerCase().split("\t");
for(String token : tokens) {
if(token.length() > 0) {
collector.collect(new Tuple2<String,Integer>(token,1));
}
}
}
}).groupBy(0).sum(1).print();
}
}
就这个demo,我在服务器的/home/input下面放了文件,然后打包放到服务器用java -jar flink-train-java-1.0.jar 可以成功。
然后用
flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 1024 /opt/cloudera/parcels/FLINK/lib/flink/examples/streaming/flink-train-java-1.0.jar
就报错,报错信息如下:
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:152)
at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner5(Dispatcher.java:375)atorg.apache.flink.util.function.CheckedSupplier.lambda5(Dispatcher.java:375)
at org.apache.flink.util.function.CheckedSupplier.lambda5(Dispatcher.java:375)atorg.apache.flink.util.function.CheckedSupplier.lambdaunchecked$0(CheckedSupplier.java:34)
… 7 more
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: File file:/home/input does not exist or the user running Flink (‘yarn’) has insufficient permissions to access it.
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:270)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:907)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:230)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
at org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
at org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
at org.apache.flink.runtime.scheduler.LegacyScheduler.(LegacyScheduler.java:176)
at org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275)
at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:265)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:146)
… 10 more
Caused by: java.io.FileNotFoundException: File file:/home/input does not exist or the user running Flink (‘yarn’) has insufficient permissions to access it.
at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:588)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:62)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:256)
麻烦老师帮忙看一下。