1. 前言
Flink是批流一体化的数据处理框架,性能卓越,诸多大厂都在使用。
由于时间原因,本篇文章只简单了解一下,后续会深入分析,以及经典case分享。
- Flink开源社区的视频教程,很赞
https://space.bilibili.com/33807709
2. 安装pyflink
- 安装虚拟环境和pyflink
$ conda create -n py36 python=3.6
$ conda activate py36
$ conda install Cython
$ python -m pip install apache-flink==1.10.1
- runtime需要jdk,下载jdk 10后解压,配置env
export JAVA_HOME=/home/xxx/java/jdk-10.0.2
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
- flink 1.10 最高支持python3.7,暂不支持py38
- 通过clone repo的方式,可以安装最新版本的pyflink
3. run server
- 创建一个docker-compose,启动flink单机cluster
$ cat docker-compose.yml
version: "2.1"
services:
jobmanager:
image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
- 启动flink,确认服务ok
$ docker-compose up -d
$ docker-compose scale taskmanager=3
$ docker-compose ps
Name Command State Ports
-----------------------------------------------------------------------------------------------------
flink_jobmanager_1 /docker-entrypoint.sh jobm ... Up 6123/tcp, 0.0.0.0:8081->8081/tcp
flink_taskmanager_1 /docker-entrypoint.sh task ... Up 6121/tcp, 6122/tcp, 6123/tcp, 8081/tcp
flink_taskmanager_2 /docker-entrypoint.sh task ... Up 6121/tcp, 6122/tcp, 6123/tcp, 8081/tcp
flink_taskmanager_3 /docker-entrypoint.sh task ... Up 6121/tcp, 6122/tcp, 6123/tcp, 8081/tcp
- flink dashboard
http://10.30.4.21:8081/#/overview
4. Job开发与测试
test example (pyflink本地启动了mini cluster,未使用我们启动的flink)
cd到pyflink的安装目录,参考: ~/soft/miniconda3/envs/py36/lib/python3.6/site-packages/pyflink
执行word_count.py(大数据版hello-world)
$ python examples/python/table/batch/word_count.py
pyflink shell streaming api
$ ./bin/pyflink-shell.sh remote localhost 8081
# 输入streaming的demo代码
import tempfile
import os
import shutil
sink_path = tempfile.gettempdir() + '/streaming.csv'
if os.path.exists(sink_path):
if os.path.isfile(sink_path):
os.remove(sink_path)
else:
shutil.rmtree(sink_path)
s_env.set_parallelism(1)
t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
st_env.connect(FileSystem().path(sink_path)) \
.with_format(OldCsv()
.field_delimiter(',')
.field("a", DataTypes.BIGINT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.STRING())) \
.with_schema(Schema()
.field("a", DataTypes.BIGINT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.STRING())) \
.register_table_sink("stream_sink")
t.select("a + 1, b, c").insert_into("stream_sink")
st_env.execute("stream_job")
- exec到task container,查看结果文件,结果ok
# cat /tmp/streaming.csv
2,hi,hello
3,hi,hello
-
dashboard查看job执行情况
其他提交job方式
通过flink提交py脚本,或者,直接提交flink sql
$ ./bin/flink run -py examples/python/table/batch/word_count.py