007-docker部署Flink测试PyFlink提交任务

docker部署 和 Pyflink环境准备

创建网络
FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"
docker network create flink-network
启动一个 job_manager 和一个 task_manager
docker run \
    --rm \
    --name=jobmanager \
    --network flink-network \
    --publish 8099:8081 \
    --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
    flink:latest jobmanager
    
docker run \
    -d \
    --name=taskmanager \
    --network flink-network \
    --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
    flink:latest taskmanager
访问

访问 ip:8099 查看 Web UI

Pyflink环境
  • jdk11
  • python3.7
  • 安装 pyflink
pip3 install apache-flink -i https://pypi.tuna.tsinghua.edu.cn/simple some-package

实例

pyflink代码
from pyflink.table import EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

df = pd.DataFrame() # 一个pandas.DataFrame
table = table_env.from_pandas(df)


query = ''' select * from %s '''
table_out = table_env.sql_query(query % (table,table))
pdf = table_out.to_pandas()
print(pdf)
本地测试
  • 直接执行即可

提交到flink集群

  • 下载 flink (用于执行命令行)
wget https://dlcdn.apache.org/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz --no-check-certificate
tar -zxvf flink-1.14.3-bin-scala_2.12.tgz -C ./
  • 修改配置 指定python所在位置
    flink/conf/flink-conf.yaml
python.client.executable: /usr/local/python3/bin/python3
  • 提交任务
flink run --jobmanager ip:8099 -Dexecution.runtime-mode=BATCH --python flink_demo.py
  • 在 WebUI 查看任务
    ip:8099
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容