Flink 使用介绍相关文档目录
前言
Flink作业开发语言除了Java、Scala外,还支持使用Python。PyFlink 是 Apache Flink 的 Python API。本篇为大家带来PyFlink的开发环境和生产环境的部署搭建过程。
环境
- Fedora 42
- Flink 1.17.2
- Python 3.10 (3.7 - 3.10 版本可用)
- JDK 1.8
开发环境安装和运行
安装Python3.10:
dnf install python3.10 python3.10-devel
配置虚拟环境和依赖:
mkdir pyflink-demo
cd pyflink-demo
uv python pin 3.10
uv init
uv add apache-flink==1.17.2 -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
uv add setuptools -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
配置Java home
export JAVA_HOME=/path/to/openjdk-1.8
编辑main.py:
import argparse
import logging
import sys
from pyflink.common import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.file_system import FileSource, StreamFormat, FileSink, OutputFileConfig, RollingPolicy
word_count_data = ["To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,",
"And by opposing end them?--To die,--to sleep,--",
"No more; and by a sleep to say we end",
"The heartache, and the thousand natural shocks",
"That flesh is heir to,--'tis a consummation",
"Devoutly to be wish'd. To die,--to sleep;--",
"To sleep! perchance to dream:--ay, there's the rub;",
"For in that sleep of death what dreams may come,",
"When we have shuffled off this mortal coil,",
"Must give us pause: there's the respect",
"That makes calamity of so long life;",
"For who would bear the whips and scorns of time,",
"The oppressor's wrong, the proud man's contumely,",
"The pangs of despis'd love, the law's delay,",
"The insolence of office, and the spurns",
"That patient merit of the unworthy takes,",
"When he himself might his quietus make",
"With a bare bodkin? who would these fardels bear,",
"To grunt and sweat under a weary life,",
"But that the dread of something after death,--",
"The undiscover'd country, from whose bourn",
"No traveller returns,--puzzles the will,",
"And makes us rather bear those ills we have",
"Than fly to others that we know not of?",
"Thus conscience does make cowards of us all;",
"And thus the native hue of resolution",
"Is sicklied o'er with the pale cast of thought;",
"And enterprises of great pith and moment,",
"With this regard, their currents turn awry,",
"And lose the name of action.--Soft you now!",
"The fair Ophelia!--Nymph, in thy orisons",
"Be all my sins remember'd."]
def word_count(input_path, output_path):
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
# write all the data to one file
env.set_parallelism(1)
# define the source
if input_path is not None:
ds = env.from_source(
source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
input_path)
.process_static_file_set().build(),
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name="file_source"
)
else:
print("Executing word_count example with default input data set.")
print("Use --input to specify file input.")
ds = env.from_collection(word_count_data)
def split(line):
yield from line.split()
# compute word count
ds = ds.flat_map(split) \
.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
.key_by(lambda i: i[0]) \
.reduce(lambda i, j: (i[0], i[1] + j[1]))
# define the sink
if output_path is not None:
ds.sink_to(
sink=FileSink.for_row_format(
base_path=output_path,
encoder=Encoder.simple_string_encoder())
.with_output_file_config(
OutputFileConfig.builder()
.with_part_prefix("prefix")
.with_part_suffix(".ext")
.build())
.with_rolling_policy(RollingPolicy.default_rolling_policy())
.build()
)
else:
print("Printing result to stdout. Use --output to specify output path.")
ds.print()
# submit for execution
env.execute()
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
required=False,
help='Input file to process.')
parser.add_argument(
'--output',
dest='output',
required=False,
help='Output file to write results to.')
argv = sys.argv[1:]
known_args, _ = parser.parse_known_args(argv)
word_count(known_args.input, known_args.output)
测试运行:
uv run main.py
PyFlink作业提交Yarn集群运行
这次以较老的CentOS7的集群为例,说明PyFlink作业提交到集群运行的方式。
环境信息:
- CentOS 7.4
- Hadoop 3.1.1
- Flink 1.17.2
- OpenJDK 1.8
Flink 1.17.2需要使用Python3.10.x。CentOS7系统未提供此版本,需要源代码在所有Yarn NodeManager节点编译安装。编译安装方式参见附录。
使用yarn-per-job模式提交作业
编译安装完毕之后(Python3.10路径位于/usr/local/python310/bin/python3.10),执行:
/usr/local/python310/bin/python3.10 -m venv python310
source python310/bin/activate
pip3 install apache-flink==1.17.2 -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
然后复制创建好的Python虚拟环境到所有的Yarn NodeManager节点相同的目录下。
提交示例WordCount作业的命令:
# 必须先激活虚拟环境
source python310/bin/activate
cd $FLINK_HOME
export HADOOP_CLASSPATH=`hadoop classpath`
./bin/flink run --target yarn-per-job --python examples/python/table/word_count.py -pyexec /opt/python310/bin/python
注意:对于虚拟环境中的Python需要指定Python的可执行文件路径,参数为
-pyexec。
查询官网可知
-pyclientexec配置项用来指定提交作业到集群前用来生成执行计划的Python进程对应的Python可执行文件(环境)。尽管该例子没有配置但仍可使用。建议按照官网要求,同时配置-pyclientexec和-pyexec参数。
如果作业成功运行,可使用yarn logs -applicationId application_xxxxxx(application_xxxxxx为Flink作业对应的Yarn Application ID)查看运行结果。
更加详细的作业提交方式参见官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/deployment/cli/#submitting-pyflink-jobs
可能遇到的错误
错误1:
ModuleNotFoundError: No module named '_ctypes'
解决方法:
yum install libffi-devel
错误2:
ModuleNotFoundError: No module named '_lzma'
解决方法:
yum install xz-devel
如果是编译的Python3.10,解决完以上错误后需要重新编译Python3.10才能生效。
使用yarn-application方式提交作业
环境要求:需要在所有Yarn NodeManager节点上都编译安装Python3.10。否则如果TaskManager被调度到没有编译安装Python3.10的节点上,会运行失败。
错误例如:
Python path configuration:
PYTHONHOME = (not set)
PYTHONPATH = '/tmp/pyflink/b1443a72-7f19-48a2-8a20-688e6f906ab1/88edab8b-7484-47fe-b4d4-708b506d1221/shipfiles'
program name = 'venv.zip/python310/bin/python'
isolated = 0
environment = 1
user site = 1
import site = 1
sys._base_executable = '/tmp/pyflink/b1443a72-7f19-48a2-8a20-688e6f906ab1/python-archives/venv.zip/python310/bin/python'
sys.base_prefix = '/usr/local/python310'
sys.base_exec_prefix = '/usr/local/python310'
sys.platlibdir = 'lib'
sys.executable = '/tmp/pyflink/b1443a72-7f19-48a2-8a20-688e6f906ab1/python-archives/venv.zip/python310/bin/python'
sys.prefix = '/usr/local/python310'
sys.exec_prefix = '/usr/local/python310'
sys.path = [
'/tmp/pyflink/b1443a72-7f19-48a2-8a20-688e6f906ab1/88edab8b-7484-47fe-b4d4-708b506d1221/shipfiles',
'/usr/local/python310/lib/python310.zip',
'/usr/local/python310/lib/python3.10',
'/usr/local/python310/lib/lib-dynload',
]
Fatal Python error: init_fs_encoding: failed to get the Python codec of the filesystem encoding
Python runtime state: core initialized
ModuleNotFoundError: No module named 'encodings'
目标机器的/usr/local/python310/lib/python3.10不存在,会出现问题。
在/opt/python310创建虚拟环境并安装依赖:
cd /opt
/usr/local/python310/bin/python3.10 -m venv python310
source python310/bin/activate
pip3 install apache-flink==1.17.2 -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
准备shipfile目录。将前面准备好的/opt/python310虚拟环境压缩为zip:
zip -r venv.zip python310
然后建立shipfiles目录,将venv.zip和用户作业代码(例如hudi-demo.py)放入。
最终目录结构如下所示:
[root@master opt]# ll shipfiles
total 716868
-rw-r--r-- 1 root root 1005 Aug 20 10:55 hudi-demo.py
-rw-r--r-- 1 root root 734068339 Aug 20 10:54 venv.zip
进入Flink安装目录,执行提交命令(和前面per-job模式不同,这里执行命令前无需激活虚拟环境):
export HADOOP_CLASSPATH=`hadoop classpath`
# 此方式提交作业不要求必须激活虚拟环境
./bin/flink run-application -t yarn-application \
-Dyarn.ship-files=/opt/shipfiles \
-pyarch shipfiles/venv.zip \
-pyclientexec venv.zip/python310/bin/python \
-pyexec venv.zip/python310/bin/python \
-pyfs shipfiles \
-pym hudi-demo
其中:
-
venv.zip/python310/bin/python为python可执行文件在venv.zip中的路径。 -
-Dyarn.ship-files指定shipfiles目录的绝对路径。
作业运行结果的print输出可以通过yarn logs -applicationId application_xxxxxx查看。
可能遇到的错误
2025-08-20 09:48:09,640 WARN org.apache.flink.client.python.PythonEnvUtils [] - Create symbol link from /tmp/pyflink/e84c1886-ef4b-4a48-b658-27562a4b0bba/591c7db3-8541-48ed-b98f-512a03134ae4/shipfiles to /hadoop/yarn/local/usercache/root/appcache/application_1754873268771_0082/container_e08_1754873268771_0082_01_000001/shipfiles failed and copy instead.
java.nio.file.NoSuchFileException: /tmp/pyflink/e84c1886-ef4b-4a48-b658-27562a4b0bba/591c7db3-8541-48ed-b98f-512a03134ae4/shipfiles
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) ~[?:1.8.0_121]
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) ~[?:1.8.0_121]
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) ~[?:1.8.0_121]
at sun.nio.fs.UnixFileSystemProvider.createSymbolicLink(UnixFileSystemProvider.java:457) ~[?:1.8.0_121]
at java.nio.file.Files.createSymbolicLink(Files.java:1043) ~[?:1.8.0_121]
at org.apache.flink.client.python.PythonEnvUtils.createSymbolicLink(PythonEnvUtils.java:230) ~[flink-python-1.17.2.jar:1.17.2]
at org.apache.flink.client.python.PythonEnvUtils.addToPythonPath(PythonEnvUtils.java:284) ~[flink-python-1.17.2.jar:1.17.2]
at org.apache.flink.client.python.PythonEnvUtils.preparePythonEnvironment(PythonEnvUtils.java:146) ~[flink-python-1.17.2.jar:1.17.2]
at org.apache.flink.client.python.PythonEnvUtils.launchPy4jPythonClient(PythonEnvUtils.java:477) ~[flink-python-1.17.2.jar:1.17.2]
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:92) ~[flink-python-1.17.2.jar:1.17.2]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_121]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_121]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_121]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_121]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) ~[flink-dist-1.17.2.jar:1.17.2]
说明目标Yarn NodeManager中的/tmp/pyflink目录没有访问权限,需要执行:
chown yarn:hadoop /tmp/pyflink/
使得yarn用户能够访问到。
实例:读取Hudi表
编写hudi-demo.py,内容如下:
from pyflink.common import Row
from pyflink.table import (DataTypes, TableEnvironment, EnvironmentSettings, ExplainDetail)
from pyflink.table.expressions import *
from pyflink.table.udf import udtf, udf, udaf, AggregateFunction, TableAggregateFunction, udtaf
def basic_operations():
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
create_stmt = """
CREATE TABLE hudi_student (
id int,
name string,
tel int,
PRIMARY KEY(id) NOT ENFORCED
) with (
'connector' = 'hudi',
'path' = 'hdfs:///hudi_student_1',
'table.type' = 'COPY_ON_WRITE',
'hoodie.datasource.write.operation' = 'bulk_insert',
'hoodie.datasource.write.recordkey.field' = 'id'
)
"""
t_env.execute_sql(create_stmt)
# t_env.execute_sql("insert into hudi_student values(1, 'Paul', 123456)")
t_env.sql_query("select * from hudi_student").execute().print()
if __name__ == '__main__':
basic_operations()
提交命令:
./bin/flink run --target yarn-per-job --python /path/to/hudi-demo.py -pyexec /opt/python310/bin/python --jarfile /path/to/hudi-flink-xxx.jar
注意:如果Hudi Flink的依赖jar包在Flink的lib目录,不需要额外指定
--jarfile。
控制台稍后会输出运行结果,例如:
+----+-------------+--------------------------------+-------------+
| op | id | name | tel |
+----+-------------+--------------------------------+-------------+
| +I | 1 | Paul | 111111 |
| +I | 2 | Kate | 222222 |
| +I | 3 | Peter | 333333 |
| +I | 4 | Jane | 444444 |
| +I | 5 | John | 555555 |
| +I | 6 | Mary | 666666 |
| +I | 7 | Emily | 777777 |
| +I | 10 | Jim | 999999 |
+----+-------------+--------------------------------+-------------+
8 rows in set
附录
CentOS 7 编译安装 Python3.10
参考文献:
- 编译安装Python3.10:https://www.cnblogs.com/ding2016/p/16973626.html
- 解决pip3安装依赖SSL不可用问题:https://blog.csdn.net/qq_39719415/article/details/121361399
Python3.10要求openssl 1.1.1以上,若系统未安装需要先编译安装openssl 1.1.1l。步骤如下:
wget http://www.openssl.org/source/openssl-1.1.1l.tar.gz
# 解压并进入源代码目录
mkdir /usr/local/ssl
./config --prefix=/usr/local/ssl --openssldir=/usr/local/ssl no-zlib
make && make install
echo "/usr/local/ssl/lib" >> /etc/ld.so.conf
接下来编译安装Python3.10。
编译前安装必须依赖。如下所示:
yum install zlib-devel bzip2-devel ncurses-devel sqlite-devel readline-devel tk-devel
yum install libffi-devel xz-devel
# 如果在前面步骤编译安装了openssl,这里不再需要安装openssl-devel
yum install openssl-devel
下载源代码并配置编译环境:
wget https://www.python.org/ftp/python/3.10.9/Python-3.10.9.tgz
# 解压源代码包,进入目录
./configure --prefix=/usr/local/python310
接着需要启用SSL模块。编辑Modules/Setup文件,找到并取消如下行的注释:
# Socket module helper for socket(2)
207 _socket socketmodule.c
208
209 # Socket module helper for SSL support; you must comment out the other
210 # socket line above, and edit the OPENSSL variable:
211 OPENSSL=/usr/local/ssl
212 _ssl _ssl.c \
213 -I$(OPENSSL)/include -L$(OPENSSL)/lib \
214 -lssl -lcrypto
将其中OPENSSL=修改为前面编译openssl时候config的prefix,即/usr/local/ssl。
最后执行编译:
make && make install
编译好的Python3.10位于/usr/local/python310。
可用如下Python代码测试必须的模块是否完整:
import ssl
import lzma
如果不报错,说明没有缺失模块。
参考链接
https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/python/datastream_tutorial/