#独家
Pyflink 作业,本地运行正常,提交集群时错误,该如何提交集群运行?

2023-08-09 0 3,247

主要报错
【ClassNotFoundException: org.apache.kafka.clients.consumer.OffsetResetStrategy】

flink 版本 1.17.1
python 版本 3.10

demo 代码

import json

from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer, FlinkKafkaProducer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.common.typeinfo import Types
from json import dumps


if __name__ == '__main__':
    brokers = '172.18.98.96:9092'
    source_topic = "test1"  # 源数据
    sink_topic = "test3"  # 结果

    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.AUTOMATIC)

    env.add_jars("file:///home/demo/jar/flink-sql-connector-kafka-1.17.1.jar")
    # env.add_jars("file:///usr/local/lib/python3.10/dist-packages/lib/flink-sql-connector-kafka-1.17.1.jar")

    source = KafkaSource.builder() \
        .set_bootstrap_servers(brokers) \
        .set_topics(source_topic) \
        .set_group_id("demo") \
        .set_starting_offsets(KafkaOffsetsInitializer.latest()) \
        .set_value_only_deserializer(SimpleStringSchema()) \
        .build()

    ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")

    def str_to_dict(data):
        json_data = json.loads(data)
        action=json_data.get('action')
        is_click=1 if action=='click' else 0
        return json_data.get('name'), is_click

    def format_json(data):
        return json.dumps({'name':data[0] ,'click_num':data[1]},ensure_ascii=False)


    ds = ds.map(str_to_dict,output_type=Types.TUPLE([Types.STRING(), Types.INT()]))
    ds = ds.key_by(lambda x: x[0]).sum(1).map(format_json,output_type=Types.STRING())
    
    serialization_schema = SimpleStringSchema()
    kafka_producer = FlinkKafkaProducer(
        topic=sink_topic,
        serialization_schema=serialization_schema,
        producer_config={'bootstrap.servers': brokers, 'group.id': 'my-group'})

    ds.add_sink(kafka_producer)

    env.execute('demo')

本地运行 python demo.py 正常
提交flink 时 执行

flink run -m 172.19.98.96:8081 -py demo.py --jarfile /home/demo/jar/flink-sql-connector-kafka-1.17.1.jar

报错

root@a68045bb7b7a:/home/demo# flink run -m 172.19.98.96:8081 -py demo.py --jarfile /home/demo/jar/flink-sql-connector-kafka-1.17.1.jar
Traceback (most recent call last):
  File "/home/demo/demo.py", line 22, in <module>
    source = KafkaSource.builder() \
  File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/connectors/kafka.py", line 387, in builder
  File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/connectors/kafka.py", line 430, in __init__
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, in deco
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.flink.connector.kafka.source.KafkaSource.builder.
: java.lang.NoClassDefFoundError: org/apache/kafka/clients/consumer/OffsetResetStrategy
        at org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer.earliest(OffsetsInitializer.java:147)
        at org.apache.flink.connector.kafka.source.KafkaSourceBuilder.<init>(KafkaSourceBuilder.java:106)
        at org.apache.flink.connector.kafka.source.KafkaSource.builder(KafkaSource.java:123)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.OffsetResetStrategy
        at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        ... 14 more

org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
        at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
        ... 14 more

补充 jobmanager 类库

先说下我这里测试运行发现的可能的报错原因:
缺少org.apache.kafka.clients.consumer.OffsetResetStrategy类。猜测可能是因为你的Flink集群中缺少相关依赖项。
我的建议是将flink-sql-connector-kafka-1.17.1.jar上传到集群的lib目录中。使用这个命令

cp /home/demo/jar/flink-sql-connector-kafka-1.17.1.jar /opt/flink/lib/

将其复制到Flink的lib目录,重启后应该就可以解决这个环境问题。

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

1. 有软官网所有资源来源于开发团队,加入会员即可下载使用!如有问题请联系右下角在线客服!
2. 有软官方保障所有软件都通过人工亲测,为每位会员用户提供安全可靠的应用软件、游戏资源下载及程序开发服务。
3. 有软团队针对会员诉求,历经多年拥有现今开发成果, 每款应用程序上线前都经过人工测试无误后提供安装使用,只为会员提供安全原创的应用。
4. PC/移动端应用下载后如遇安装使用问题请联系右下角在线客服或提交工单,一对一指导解决疑难。

有软官网_用软件,找有软 技术分享 Pyflink 作业,本地运行正常,提交集群时错误,该如何提交集群运行? https://www.jiaruvip.com/2451.html

有软应用商店是经过官方安全认证,保障正版软件平台

相关资源

官方客服团队

为您解决烦忧 - 24小时在线 专业服务