Flink 使用之 PyFlink

Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

前言

Flink作业开发语言除了Java、Scala外,还支持使用Python。PyFlink 是 Apache Flink 的 Python API。本篇为大家带来PyFlink的开发环境和生产环境的部署搭建过程。

环境

  • Fedora 42
  • Flink 1.17.2
  • Python 3.10 (3.7 - 3.10 版本可用)
  • JDK 1.8

开发环境安装和运行

安装Python3.10:

dnf install python3.10 python3.10-devel

配置虚拟环境和依赖:

mkdir pyflink-demo
cd pyflink-demo

uv python pin 3.10
uv init
uv add apache-flink==1.17.2 -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
uv add setuptools -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple

配置Java home

export JAVA_HOME=/path/to/openjdk-1.8

编辑main.py

import argparse
import logging
import sys

from pyflink.common import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.file_system import FileSource, StreamFormat, FileSink, OutputFileConfig, RollingPolicy


word_count_data = ["To be, or not to be,--that is the question:--",
                   "Whether 'tis nobler in the mind to suffer",
                   "The slings and arrows of outrageous fortune",
                   "Or to take arms against a sea of troubles,",
                   "And by opposing end them?--To die,--to sleep,--",
                   "No more; and by a sleep to say we end",
                   "The heartache, and the thousand natural shocks",
                   "That flesh is heir to,--'tis a consummation",
                   "Devoutly to be wish'd. To die,--to sleep;--",
                   "To sleep! perchance to dream:--ay, there's the rub;",
                   "For in that sleep of death what dreams may come,",
                   "When we have shuffled off this mortal coil,",
                   "Must give us pause: there's the respect",
                   "That makes calamity of so long life;",
                   "For who would bear the whips and scorns of time,",
                   "The oppressor's wrong, the proud man's contumely,",
                   "The pangs of despis'd love, the law's delay,",
                   "The insolence of office, and the spurns",
                   "That patient merit of the unworthy takes,",
                   "When he himself might his quietus make",
                   "With a bare bodkin? who would these fardels bear,",
                   "To grunt and sweat under a weary life,",
                   "But that the dread of something after death,--",
                   "The undiscover'd country, from whose bourn",
                   "No traveller returns,--puzzles the will,",
                   "And makes us rather bear those ills we have",
                   "Than fly to others that we know not of?",
                   "Thus conscience does make cowards of us all;",
                   "And thus the native hue of resolution",
                   "Is sicklied o'er with the pale cast of thought;",
                   "And enterprises of great pith and moment,",
                   "With this regard, their currents turn awry,",
                   "And lose the name of action.--Soft you now!",
                   "The fair Ophelia!--Nymph, in thy orisons",
                   "Be all my sins remember'd."]


def word_count(input_path, output_path):
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.BATCH)
    # write all the data to one file
    env.set_parallelism(1)

    # define the source
    if input_path is not None:
        ds = env.from_source(
            source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
                                                       input_path)
                             .process_static_file_set().build(),
            watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
            source_name="file_source"
        )
    else:
        print("Executing word_count example with default input data set.")
        print("Use --input to specify file input.")
        ds = env.from_collection(word_count_data)

    def split(line):
        yield from line.split()

    # compute word count
    ds = ds.flat_map(split) \
           .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
           .key_by(lambda i: i[0]) \
           .reduce(lambda i, j: (i[0], i[1] + j[1]))

    # define the sink
    if output_path is not None:
        ds.sink_to(
            sink=FileSink.for_row_format(
                base_path=output_path,
                encoder=Encoder.simple_string_encoder())
            .with_output_file_config(
                OutputFileConfig.builder()
                .with_part_prefix("prefix")
                .with_part_suffix(".ext")
                .build())
            .with_rolling_policy(RollingPolicy.default_rolling_policy())
            .build()
        )
    else:
        print("Printing result to stdout. Use --output to specify output path.")
        ds.print()

    # submit for execution
    env.execute()


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        required=False,
        help='Input file to process.')
    parser.add_argument(
        '--output',
        dest='output',
        required=False,
        help='Output file to write results to.')

    argv = sys.argv[1:]
    known_args, _ = parser.parse_known_args(argv)

    word_count(known_args.input, known_args.output)

测试运行:

uv run main.py

PyFlink作业提交Yarn集群运行

这次以较老的CentOS7的集群为例,说明PyFlink作业提交到集群运行的方式。

环境信息:

  • CentOS 7.4
  • Hadoop 3.1.1
  • Flink 1.17.2
  • OpenJDK 1.8

Flink 1.17.2需要使用Python3.10.x。CentOS7系统未提供此版本,需要源代码在所有Yarn NodeManager节点编译安装。编译安装方式参见附录。

使用yarn-per-job模式提交作业

编译安装完毕之后(Python3.10路径位于/usr/local/python310/bin/python3.10),执行:

/usr/local/python310/bin/python3.10 -m venv python310
source python310/bin/activate

pip3 install apache-flink==1.17.2 -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple

然后复制创建好的Python虚拟环境到所有的Yarn NodeManager节点相同的目录下。

提交示例WordCount作业的命令:

# 必须先激活虚拟环境
source python310/bin/activate
cd $FLINK_HOME
export HADOOP_CLASSPATH=`hadoop classpath`

./bin/flink run --target yarn-per-job --python examples/python/table/word_count.py -pyexec /opt/python310/bin/python

注意:对于虚拟环境中的Python需要指定Python的可执行文件路径,参数为-pyexec

查询官网可知-pyclientexec配置项用来指定提交作业到集群前用来生成执行计划的Python进程对应的Python可执行文件(环境)。尽管该例子没有配置但仍可使用。建议按照官网要求,同时配置-pyclientexec-pyexec参数。

如果作业成功运行,可使用yarn logs -applicationId application_xxxxxx(application_xxxxxx为Flink作业对应的Yarn Application ID)查看运行结果。

更加详细的作业提交方式参见官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/deployment/cli/#submitting-pyflink-jobs

可能遇到的错误

错误1:

ModuleNotFoundError: No module named '_ctypes'

解决方法:

yum install libffi-devel

错误2:

ModuleNotFoundError: No module named '_lzma'

解决方法:

yum install xz-devel

如果是编译的Python3.10,解决完以上错误后需要重新编译Python3.10才能生效。

使用yarn-application方式提交作业

环境要求:需要在所有Yarn NodeManager节点上都编译安装Python3.10。否则如果TaskManager被调度到没有编译安装Python3.10的节点上,会运行失败。

错误例如:

Python path configuration:
  PYTHONHOME = (not set)
  PYTHONPATH = '/tmp/pyflink/b1443a72-7f19-48a2-8a20-688e6f906ab1/88edab8b-7484-47fe-b4d4-708b506d1221/shipfiles'
  program name = 'venv.zip/python310/bin/python'
  isolated = 0
  environment = 1
  user site = 1
  import site = 1
  sys._base_executable = '/tmp/pyflink/b1443a72-7f19-48a2-8a20-688e6f906ab1/python-archives/venv.zip/python310/bin/python'
  sys.base_prefix = '/usr/local/python310'
  sys.base_exec_prefix = '/usr/local/python310'
  sys.platlibdir = 'lib'
  sys.executable = '/tmp/pyflink/b1443a72-7f19-48a2-8a20-688e6f906ab1/python-archives/venv.zip/python310/bin/python'
  sys.prefix = '/usr/local/python310'
  sys.exec_prefix = '/usr/local/python310'
  sys.path = [
    '/tmp/pyflink/b1443a72-7f19-48a2-8a20-688e6f906ab1/88edab8b-7484-47fe-b4d4-708b506d1221/shipfiles',
    '/usr/local/python310/lib/python310.zip',
    '/usr/local/python310/lib/python3.10',
    '/usr/local/python310/lib/lib-dynload',
  ]
Fatal Python error: init_fs_encoding: failed to get the Python codec of the filesystem encoding
Python runtime state: core initialized
ModuleNotFoundError: No module named 'encodings'

目标机器的/usr/local/python310/lib/python3.10不存在,会出现问题。

/opt/python310创建虚拟环境并安装依赖:

cd /opt
/usr/local/python310/bin/python3.10 -m venv python310
source python310/bin/activate

pip3 install apache-flink==1.17.2 -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple

准备shipfile目录。将前面准备好的/opt/python310虚拟环境压缩为zip:

zip -r venv.zip python310

然后建立shipfiles目录,将venv.zip和用户作业代码(例如hudi-demo.py)放入。

最终目录结构如下所示:

[root@master opt]# ll shipfiles
total 716868
-rw-r--r-- 1 root root      1005 Aug 20 10:55 hudi-demo.py
-rw-r--r-- 1 root root 734068339 Aug 20 10:54 venv.zip

进入Flink安装目录,执行提交命令(和前面per-job模式不同,这里执行命令前无需激活虚拟环境):

export HADOOP_CLASSPATH=`hadoop classpath`

# 此方式提交作业不要求必须激活虚拟环境

./bin/flink run-application -t yarn-application \
      -Dyarn.ship-files=/opt/shipfiles \
      -pyarch shipfiles/venv.zip \
      -pyclientexec venv.zip/python310/bin/python \
      -pyexec venv.zip/python310/bin/python \
      -pyfs shipfiles \
      -pym hudi-demo

其中:

  • venv.zip/python310/bin/python为python可执行文件在venv.zip中的路径。
  • -Dyarn.ship-files指定shipfiles目录的绝对路径。

作业运行结果的print输出可以通过yarn logs -applicationId application_xxxxxx查看。

可能遇到的错误

2025-08-20 09:48:09,640 WARN  org.apache.flink.client.python.PythonEnvUtils                [] - Create symbol link from /tmp/pyflink/e84c1886-ef4b-4a48-b658-27562a4b0bba/591c7db3-8541-48ed-b98f-512a03134ae4/shipfiles to /hadoop/yarn/local/usercache/root/appcache/application_1754873268771_0082/container_e08_1754873268771_0082_01_000001/shipfiles failed and copy instead.
java.nio.file.NoSuchFileException: /tmp/pyflink/e84c1886-ef4b-4a48-b658-27562a4b0bba/591c7db3-8541-48ed-b98f-512a03134ae4/shipfiles
        at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) ~[?:1.8.0_121]
        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) ~[?:1.8.0_121]
        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) ~[?:1.8.0_121]
        at sun.nio.fs.UnixFileSystemProvider.createSymbolicLink(UnixFileSystemProvider.java:457) ~[?:1.8.0_121]
        at java.nio.file.Files.createSymbolicLink(Files.java:1043) ~[?:1.8.0_121]
        at org.apache.flink.client.python.PythonEnvUtils.createSymbolicLink(PythonEnvUtils.java:230) ~[flink-python-1.17.2.jar:1.17.2]
        at org.apache.flink.client.python.PythonEnvUtils.addToPythonPath(PythonEnvUtils.java:284) ~[flink-python-1.17.2.jar:1.17.2]
        at org.apache.flink.client.python.PythonEnvUtils.preparePythonEnvironment(PythonEnvUtils.java:146) ~[flink-python-1.17.2.jar:1.17.2]
        at org.apache.flink.client.python.PythonEnvUtils.launchPy4jPythonClient(PythonEnvUtils.java:477) ~[flink-python-1.17.2.jar:1.17.2]
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:92) ~[flink-python-1.17.2.jar:1.17.2]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_121]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_121]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_121]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_121]
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.17.2.jar:1.17.2]
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.17.2.jar:1.17.2]
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) ~[flink-dist-1.17.2.jar:1.17.2]

说明目标Yarn NodeManager中的/tmp/pyflink目录没有访问权限,需要执行:

chown yarn:hadoop /tmp/pyflink/

使得yarn用户能够访问到。

实例:读取Hudi表

编写hudi-demo.py,内容如下:

from pyflink.common import Row
from pyflink.table import (DataTypes, TableEnvironment, EnvironmentSettings, ExplainDetail)
from pyflink.table.expressions import *
from pyflink.table.udf import udtf, udf, udaf, AggregateFunction, TableAggregateFunction, udtaf


def basic_operations():
    t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
    create_stmt = """
        CREATE TABLE hudi_student (
        id int,
        name string,
        tel int,
        PRIMARY KEY(id) NOT ENFORCED
    ) with (
        'connector' = 'hudi',
        'path' = 'hdfs:///hudi_student_1',
        'table.type' = 'COPY_ON_WRITE',
        'hoodie.datasource.write.operation' = 'bulk_insert',
        'hoodie.datasource.write.recordkey.field' = 'id'
    )
    """
    t_env.execute_sql(create_stmt)
#    t_env.execute_sql("insert into hudi_student values(1, 'Paul', 123456)")
    t_env.sql_query("select * from hudi_student").execute().print()


if __name__ == '__main__':
    basic_operations()

提交命令:

./bin/flink run --target yarn-per-job --python /path/to/hudi-demo.py -pyexec /opt/python310/bin/python --jarfile /path/to/hudi-flink-xxx.jar

注意:如果Hudi Flink的依赖jar包在Flink的lib目录,不需要额外指定--jarfile

控制台稍后会输出运行结果,例如:

+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         tel |
+----+-------------+--------------------------------+-------------+
| +I |           1 |                           Paul |      111111 |
| +I |           2 |                           Kate |      222222 |
| +I |           3 |                          Peter |      333333 |
| +I |           4 |                           Jane |      444444 |
| +I |           5 |                           John |      555555 |
| +I |           6 |                           Mary |      666666 |
| +I |           7 |                          Emily |      777777 |
| +I |          10 |                            Jim |      999999 |
+----+-------------+--------------------------------+-------------+
8 rows in set

附录

CentOS 7 编译安装 Python3.10

参考文献:

Python3.10要求openssl 1.1.1以上,若系统未安装需要先编译安装openssl 1.1.1l。步骤如下:

wget http://www.openssl.org/source/openssl-1.1.1l.tar.gz
# 解压并进入源代码目录

mkdir /usr/local/ssl
./config --prefix=/usr/local/ssl --openssldir=/usr/local/ssl no-zlib
make && make install
echo "/usr/local/ssl/lib" >> /etc/ld.so.conf

接下来编译安装Python3.10。

编译前安装必须依赖。如下所示:

yum install zlib-devel bzip2-devel ncurses-devel sqlite-devel readline-devel tk-devel
yum install libffi-devel xz-devel
# 如果在前面步骤编译安装了openssl,这里不再需要安装openssl-devel
yum install openssl-devel

下载源代码并配置编译环境:

wget https://www.python.org/ftp/python/3.10.9/Python-3.10.9.tgz
# 解压源代码包,进入目录

./configure --prefix=/usr/local/python310

接着需要启用SSL模块。编辑Modules/Setup文件,找到并取消如下行的注释:

# Socket module helper for socket(2)
207 _socket socketmodule.c
208
209 # Socket module helper for SSL support; you must comment out the other
210 # socket line above, and edit the OPENSSL variable:
211 OPENSSL=/usr/local/ssl
212  _ssl _ssl.c \
213      -I$(OPENSSL)/include -L$(OPENSSL)/lib \
214      -lssl -lcrypto

将其中OPENSSL=修改为前面编译openssl时候configprefix,即/usr/local/ssl

最后执行编译:

make && make install

编译好的Python3.10位于/usr/local/python310

可用如下Python代码测试必须的模块是否完整:

import ssl
import lzma

如果不报错,说明没有缺失模块。

参考链接

https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/python/datastream_tutorial/

https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/python/table/table_environment/#tablesql-%E6%93%8D%E4%BD%9C

https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/deployment/cli/#submitting-pyflink-jobs

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容