请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

Flink无法消费kafka数据

  1. 老师,我在这节学习后,将咱们gitee仓库的代码拿到本地运行,但是发现producer.py产生数据后,flink_kafka.py一直无法接收到数据,但是能执行到print_schema()。两个文件的代码完全和gitee上的内容一样。也完全按照requirements.txt部署了环境。
  2. 我尝试了使用“非”FlinkkafkaConsumer来接受producer.py的数据(就是按照第三章的内容,用kafkaConsumer),结果没有报错正常输出,应该不是本地kafka的问题,我感觉可能是Flink或者FlinkkafkaConsumer的问题。
  3. 整体运行结果如下(只能到打印出schema):

图片描述

  1. 之后的报错如下:
Traceback (most recent call last):
  File "flink_kafka.py", line 77, in <module>
  File "flink_kafka.py", line 64, in datastream_api_demo
    user_id = result[1]
  File "/Users/macbookpro/PycharmProjects/movierecSys/venv/lib/python3.8/site-packages/pyflink/table/table_result.py", line 236, in __next__
    if not self._j_closeable_iterator.hasNext():
  File "/Users/macbookpro/PycharmProjects/movierecSys/venv/lib/python3.8/site-packages/py4j/java_gateway.py", line 1285, in __call__
    return_value = get_return_value(
  File "/Users/macbookpro/PycharmProjects/movierecSys/venv/lib/python3.8/site-packages/pyflink/util/exceptions.py", line 146, in deco
    return f(*a, **kw)
  File "/Users/macbookpro/PycharmProjects/movierecSys/venv/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o100.hasNext.
: java.lang.RuntimeException: Failed to fetch next result
        at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
        at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
        at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: Failed to fetch job execution result
        at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:177)
        at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
        at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
        ... 13 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
        at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
        ... 15 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
        at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
        at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:134)
        at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:174)
        ... 15 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
        at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
        at sun.reflect.GeneratedMethodAccessor33.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at akka.actor.Actor.aroundReceive(Actor.scala:537)
        at akka.actor.Actor.aroundReceive$(Actor.scala:535)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
        at akka.actor.ActorCell.invoke(ActorCell.scala:548)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
        at akka.dispatch.Mailbox.run(Mailbox.scala:231)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: java.lang.NoSuchMethodError: org.apache.flink.api.common.functions.RuntimeContext.getMetricGroup()Lorg/apache/flink/metrics/MetricGroup;
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:762)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)

正在回答 回答被采纳积分+3

1回答

好帮手慕小李 2025-02-20 17:37:26

根据你提供的错误信息和搜索结果,问题的核心是 java.lang.NoSuchMethodError: org.apache.flink.api.common.functions.RuntimeContext.getMetricGroup(),这通常是由于 Flink 版本与 Kafka 连接器版本不兼容导致的

可能的原因

  1. Flink 和 Kafka 连接器版本不匹配:Flink 的 Kafka 连接器版本必须与 Flink 本身版本兼容。例如,Flink 1.19.0 需要一个与之匹配的 flink-connector-kafka 版本

  2. 依赖冲突:可能存在多个版本的 Flink 或 Kafka 依赖,导致运行时加载了错误的类

解决方案

  1. 检查 Flink 和 Kafka 连接器版本

    • 确保你使用的 flink-connector-kafka 版本与 Flink 版本兼容。例如,如果你使用的是 Flink 1.19.0,需要找到一个支持 Kafka 2.6.3 的连接器版本

    • 如果不确定版本兼容性,可以参考 Flink 官方文档或连接器的发行说明

  2. 清理依赖冲突

    • 如果你使用的是 Maven 或 Gradle,可以通过 mvn dependency:tree 或类似工具检查项目中的依赖版本,确保没有冲突

    • 如果发现冲突,可以通过排除冲突依赖或统一版本来解决

  3. 更新 Flink 和 Kafka 连接器

    • 如果你当前使用的 Flink 或 Kafka 连接器版本较旧,建议更新到最新版本(例如 Flink 1.19.2)。同时,确保 Kafka 连接器版本与之匹配。

  4. 检查代码逻辑

    • 确保代码中没有误用 Flink 或 Kafka 的 API,尤其是与版本相关的特性

验证步骤

  1. 修改依赖后,重新运行项目,观察是否还会出现 NoSuchMethodError

  2. 如果问题仍然存在,可以尝试在本地启动一个简单的 Flink-Kafka 示例程序,验证是否是环境问题


0 回复 有任何疑惑可以回复我~
问题已解决,确定采纳
还有疑问,暂不采纳
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号