请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

Flume和Kafka接收到数据后等待时间过长是什么原因呢?

以下是我的flume的conf文件配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink
 
#define source
#agent1.sources.avro-source.type=avro
agent1.sources.avro-source.type=netcat
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414
 
#define channel
agent1.channels.logger-channel.type=memory
 
#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = access-topic-prod
agent1.sinks.kafka-sink.brokerList = 192.168.189.142:9092
agent1.sinks.kafka-sink.requiredAcks = 1
agent1.sinks.kafka-sink.batchSize = 20
 
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel

我在运行StreamingApp这个类的时候,通过网络TCP端口的方式向41414端口不断地传您logGenerator类产生的日志

https://img1.sycdn.imooc.com/szimg/5ffcf4a20961c02009620229.jpg

但是flume接收到日志以后貌似非常非常久以后才向Kafka发送数据(发送了一两次以后,第三次由于等待时间过长直接就不发送了!)

https://img1.sycdn.imooc.com//szimg/5ffcf4d7096fee4519030882.jpg

请问这是什么原因造成的呢?

除此之外,批量处理不是一次性处理一大批数据吗?为什么我把一堆同样格式并且回过行的数据一起穿进来的时候为什么会报错(以下是我通过telnet传的批量数据)

1
2
3
4
5
6
7
1610413639729 6000 222.24.223.48 6 69 user12 5678 1.1.3 Android
1610413639729 6000 222.24.223.48 6 69 user13 5678 1.1.3 Android
1610413639729 6000 222.24.223.48 6 69 user14 5678 1.1.3 Android
1610413639729 6000 222.24.223.48 6 69 user15 5678 1.1.3 Android
1610413639729 6000 222.24.223.48 6 69 user16 5678 1.1.3 Android
1610413639729 6000 222.24.223.48 6 69 user17 5678 1.1.3 Android
1610413639729 6000 222.24.223.48 6 69 user18 5678 1.1.3 Android

https://img1.sycdn.imooc.com//szimg/5ffcfc0809a047fa14320393.jpg

streaming报错控制台:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
Connected to the target VM, address: '127.0.0.1:50939', transport: 'socket'
ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NumberFormatException: For input string: "1610413639729 6000 222.24.223.48 6 69 user12 5678 1.1.3 Android"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Long.parseLong(Long.java:589)
    at java.lang.Long.parseLong(Long.java:631)
    at scala.collection.immutable.StringLike.toLong(StringLike.scala:309)
    at scala.collection.immutable.StringLike.toLong$(StringLike.scala:309)
    at scala.collection.immutable.StringOps.toLong(StringOps.scala:33)
    at com.imooc.bigdata.project.utils.DateUtils$.parseToHour(DateUtils.scala:12)
    at com.imooc.bigdata.project.stream.StreamingApp$.$anonfun$main$1(StreamingApp.scala:55)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
ERROR JobScheduler: Error running job streaming job 1610415110000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, MSI, executor driver): java.lang.NumberFormatException: For input string: "1610413639729 6000 222.24.223.48 6 69 user12 5678 1.1.3 Android"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Long.parseLong(Long.java:589)
    at java.lang.Long.parseLong(Long.java:631)
    at scala.collection.immutable.StringLike.toLong(StringLike.scala:309)
    at scala.collection.immutable.StringLike.toLong$(StringLike.scala:309)
    at scala.collection.immutable.StringOps.toLong(StringOps.scala:33)
    at com.imooc.bigdata.project.utils.DateUtils$.parseToHour(DateUtils.scala:12)
    at com.imooc.bigdata.project.stream.StreamingApp$.$anonfun$main$1(StreamingApp.scala:55)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
 
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:994)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:992)
    at com.imooc.bigdata.project.stream.StreamingApp$.$anonfun$main$4(StreamingApp.scala:75)
    at com.imooc.bigdata.project.stream.StreamingApp$.$anonfun$main$4$adapted(StreamingApp.scala:68)
    at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2(DStream.scala:629)
    at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2$adapted(DStream.scala:629)
    at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
    at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.util.Try$.apply(Try.scala:213)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:256)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NumberFormatException: For input string: "1610413639729 6000 222.24.223.48 6 69 user12 5678 1.1.3 Android"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Long.parseLong(Long.java:589)
    at java.lang.Long.parseLong(Long.java:631)
    at scala.collection.immutable.StringLike.toLong(StringLike.scala:309)
    at scala.collection.immutable.StringLike.toLong$(StringLike.scala:309)
    at scala.collection.immutable.StringOps.toLong(StringOps.scala:33)
    at com.imooc.bigdata.project.utils.DateUtils$.parseToHour(DateUtils.scala:12)
    at com.imooc.bigdata.project.stream.StreamingApp$.$anonfun$main$1(StreamingApp.scala:55)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    ... 3 more
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, MSI, executor driver): java.lang.NumberFormatException: For input string: "1610413639729 6000 222.24.223.48 6 69 user12 5678 1.1.3 Android"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Long.parseLong(Long.java:589)
    at java.lang.Long.parseLong(Long.java:631)
    at scala.collection.immutable.StringLike.toLong(StringLike.scala:309)
    at scala.collection.immutable.StringLike.toLong$(StringLike.scala:309)
    at scala.collection.immutable.StringOps.toLong(StringOps.scala:33)
    at com.imooc.bigdata.project.utils.DateUtils$.parseToHour(DateUtils.scala:12)
    at com.imooc.bigdata.project.stream.StreamingApp$.$anonfun$main$1(StreamingApp.scala:55)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
 
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:994)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:992)
    at com.imooc.bigdata.project.stream.StreamingApp$.$anonfun$main$4(StreamingApp.scala:75)
    at com.imooc.bigdata.project.stream.StreamingApp$.$anonfun$main$4$adapted(StreamingApp.scala:68)
    at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2(DStream.scala:629)
    at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2$adapted(DStream.scala:629)
    at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
    at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.util.Try$.apply(Try.scala:213)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:256)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NumberFormatException: For input string: "1610413639729 6000 222.24.223.48 6 69 user12 5678 1.1.3 Android"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Long.parseLong(Long.java:589)
    at java.lang.Long.parseLong(Long.java:631)
    at scala.collection.immutable.StringLike.toLong(StringLike.scala:309)
    at scala.collection.immutable.StringLike.toLong$(StringLike.scala:309)
    at scala.collection.immutable.StringOps.toLong(StringOps.scala:33)
    at com.imooc.bigdata.project.utils.DateUtils$.parseToHour(DateUtils.scala:12)
    at com.imooc.bigdata.project.stream.StreamingApp$.$anonfun$main$1(StreamingApp.scala:55)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    ... 3 more
ERROR Inbox: Ignoring error
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.executor.Executor$TaskRunner@63160b6d rejected from java.util.concurrent.ThreadPoolExecutor@63a9d3ab[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at org.apache.spark.executor.Executor.launchTask(Executor.scala:228)
    at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93)
    at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91)
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:68)
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Exception in thread "streaming-job-executor-0" java.lang.Error: java.lang.InterruptedException
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1148)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:242)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:187)
    at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:335)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:743)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:994)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:992)
    at com.imooc.bigdata.project.stream.StreamingApp$.$anonfun$main$9(StreamingApp.scala:101)
    at com.imooc.bigdata.project.stream.StreamingApp$.$anonfun$main$9$adapted(StreamingApp.scala:100)
    at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2(DStream.scala:629)
    at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2$adapted(DStream.scala:629)
    at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
    at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.util.Try$.apply(Try.scala:213)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:256)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    ... 2 more
Disconnected from the target VM, address: '127.0.0.1:50939', transport: 'socket'
 
Process finished with exit code 1


正在回答 回答被采纳积分+3

插入代码

1回答

Michael_PK 2021-01-13 12:12:14

第一个问题,flume到kafka你的ack设置的要确认,这个估计和你机器当前的网络或者负载有一定的关系了
第二个问题,java.lang.NumberFormatException: For input string: "1610413639729 6000 222.24.223.48 6 69 user12 5678 1.1.3 Android"。你的代码在处理的时候肯定是有一个你需要的是数值类型的,但是清晰不彻底是字符串,所以字符串转数值类型是发生了异常。这里可以使用trycatch抓住后再进行处理

0 回复 有任何疑惑可以回复我~
  • 提问者 BillyWebber #1
    我把flume和kafka机器都放在同一台虚拟机里面,按理来讲速度应该很快才对,在哪里看ACK确认机制呢?如果我关了ACK确认机制会有什么影响呢?应该怎么提速呢?
    回复 有任何疑惑可以回复我~ 2021-01-13 17:00:47
  • Michael_PK 回复 提问者 BillyWebber #2
    ack不确认是否丢失,会快些。你都配置里面不是看到有ack的配置么。你本机是没错,也不能保证说你机器的吞吐就一定跟的上的。所以对于单机的建议能把功能跑通就可以。想多机器测试直接找个云厂商花点钱开个按量收费的。
    回复 有任何疑惑可以回复我~ 2021-01-13 18:00:37
问题已解决,确定采纳
还有疑问,暂不采纳
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号