PyFlink集群部署

前言

前面讲到flink实时计算用户画像的功能,flink需要调用python写的算法库,之前的两种方案都存在各种各样的问题,最后决定采用pyflink来开发。

一门新技术的引入必定面临各种坑,期间很多命令都是照着官网写的,但是还是报错。没办法,只能花时间一个一个的解决,本篇就总结下这两天部署pyflink过程中遇到的问题。

部署脚本

run.sh

#!/usr/bin/env bash

unset PYTHONPATH

export PYTHONPATH="/home/work/python3.7.1"
export FLINK_HOME="/home/work/flink-1.15.3"

if [ ! -f realtime_calc_label.zip ];then
    zip -q -r ./realtime_calc_label.zip ./*
fi

# 不加这个alias命令会失效
shopt -s expand_aliases

alias python=/home/work/python3.7.1/bin/python3

/home/work/flink-1.15.3/bin/flink run \
--detached \
-t yarn-per-job \
-Dyarn.application.name=flink_user_profile \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=3096m \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dyarn.application.queue=t0 \
-Dpython.systemenv.enabled=false \
-p 24 \
-pyarch /home/work/python3.7.1/python3.7.1.zip,./realtime_calc_label.zip \
-pyclientexec ./python3.7.1.zip/bin/python3 \
-pyexec ./python3.7.1.zip/bin/python3 \
-pyfs ./realtime_calc_label.zip \
--python ./label_calc_stream.py \
--jarfile jars/flink-sql-connector-kafka-1.15.2.jar
  • 与run.sh同级目录下有整个项目的打包文件realtime_calc_label.zip,pyflink程序入口label_calc_stream.py,依赖Java的jar包文件夹jars
  • -pyclientexec、-pyexec采用的都是相对路径,因为flink作业提交的时候会把需要的资源都拷贝到临时目录下
  • 由于yarn集群运行着很多pyspark任务,这次由要运行pyflink任务,所以不可避免需要支持多种版本的python环境,所以我们最好打包上自己的python环境和作业一起提交,这里需要执行cd /home/work/python3.7.1 & zip -q -r ./python3.7.1.zip ./* 把python环境打包
  • 由于PYTHONPATH下python2.x的包跟自己python3.7.1.zip中有个包是冲突的,目前发布的flink版本1.15.2对于这种情况的处理存在bug,这里需要把flink从1.15.2升级到1.15.3,与flink社区沟通后得知最快2周之后才能发布1.15.3版本,这里手动从github中下载flink的release-1.15 build源码进行编译安装

编译python

前面讲到提交pyflink代码,最好携带上自己需要的python依赖,为了尽量使包体小一些,这里自己打包一份纯洁版的python环境,命令如下:

cd /home/work

mkdir python3.9.0
#注意如果在私有云环境下,不能直接联网下载,可手动上官网下载,再通过工具上传
wget https://www.python.org/ftp/python/3.9.0/Python-3.9.0.tgz

cd Python-3.9.0

./configure --prefix=/home/work/python3.9.0  --with-ssl

make

make install
  • 下载依赖
# 更新pip版本
/home/work/python3.7.1/bin/python3 -m pip install --upgrade --force pip

/home/work/python3.7.1/bin/python3 -m pip install apache-flink==1.15.2
# 业务代码中用到,读取配置文件的
/home/work/python3.7.1/bin/python3 -m pip install configparser==5.0.0

/home/work/python3.7.1/bin/python3 -m pip install protobuf
  • 打包依赖
# 在python目录下执行命令打zip包
zip -q -r ./python3.7.1.zip ./*

# 也可以打成gz包
tar -czvf ./python3.7.1.tgz ./*

编译Flink

  • mac下shell环境配置。编辑~/.zshrc文件
JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_241.jdk/Contents/Home
SCALA_HOME=/Users/david.dong/soft/scala-2.12.13
MAVEN_HOME=/Users/david.dong/soft/apache-maven-3.6.3

export PATH=$JAVA_HOME/bin:$SCALA_HOME/bin:$MAVEN_HOME/bin:$PATH:.
  • 查看mac当前环境下所使用的java环境
# 命令行输入: /usr/libexec/java_home
david.dong@MacBook-Pro ~ % /usr/libexec/java_home
/Users/david.dong/Library/Java/JavaVirtualMachines/corretto-11.0.14.1/Contents/Home

# 会发现跟采用:java -version命令看到的不一样
david.dong@MacBook-Pro ~ % java -version
java version "1.8.0_241"
Java(TM) SE Runtime Environment (build 1.8.0_241-b07)
Java HotSpot(TM) 64-Bit Server VM (build 25.241-b07, mixed mode)
  • 编译flink源码
# 下载源码
git clone https://github.com/apache/flink.git
# 由于已发布的版本1.15.2有bug,但是在1.15.3会修复,代码已经修改,所以切换到release-1.15分支
git checkout -b release-1.15
# 在源码中注释掉test相关的module,然后执行编译命令,编译后的安装包存放在flink/flink-dist/target/目录下
mvn clean install -DskipTests -Drat.skip=true

PyFlink代码

  • 项目结构


    project structure
  • 主逻辑

import json
import sys
from pyflink.common import Types
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
from udf_lib.calc_label_func import CalcLabelFunc

sys.path.append(".")


def label_calc(profile):
    env = StreamExecutionEnvironment.get_execution_environment()
    # the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
    # env.add_jars(
    #     "file:///Users/dengpengfei/PycharmProjects/realtime_calc_label/src/jars/flink-sql-connector-kafka-1.15.2.jar")

    job_conf = {'env': profile}
    env.get_config().set_global_job_parameters(job_conf)

    kafka_consumer = FlinkKafkaConsumer(
        topics='change_user_preview',
        deserialization_schema=SimpleStringSchema(),
        properties={'bootstrap.servers': '127.0.0.1:9092',
                    'group.id': 'offline',
                    'auto.offset.reset': 'earliest'})

    source_ds = env.add_source(kafka_consumer).name('source_kafka')

    map_df = source_ds.map(lambda row: json.loads(row))

    process_df = map_df.key_by(lambda row: row['unique_id']).flat_map(CalcLabelFunc(), output_type=Types.STRING()).name('flat_map_calc')

    kafka_producer = FlinkKafkaProducer(
        topic='change_label_preview',
        serialization_schema=SimpleStringSchema(),
        producer_config={'bootstrap.servers': '127.0.0.1:9092'})

    process_df.add_sink(kafka_producer).name('sink_kafka')

    env.execute('label_calc')


if __name__ == '__main__':
    run_env = 'preview'
    if len(sys.argv) > 1:
        run_env = sys.argv[1]
    label_calc(run_env)
  • udf函数
import json
import sys
import time
from pyflink.datastream import RuntimeContext, FlatMapFunction
from pymongo import MongoClient
from calc_lib.online_calc import OnlineCalc
from db_utils.mysql_util import get_sql_conn, get_dict_data_sql

sys.path.append(".")


class CalcLabelFunc(FlatMapFunction):

    def __init__(self):
        self.env = None
        self.mongo = None
        self.mysql_conf = None
        self.online_calc = None
        self.cur_time_stamp = None

    def open(self, runtime_context: RuntimeContext):

        self.mongo = MongoClient('mongodb://localhost:27017')
        self.mysql_conf = {
            'host': '127.0.0.1',
            'username': 'root',
            'password': '123456',
            'db': 'user_profile'
        }
        self.cur_time_stamp = 0

    def flat_map(self, value):
        # update conf
        if time.time() - self.cur_time_stamp > 60 * 3:
            self.cur_time_stamp = time.time()
            self.update_conf()

        unique_id = value['unique_id']
        entity_type = value['entity_type']
        version = value['version']

        pp_db = 'pp_{}_{}_{}'.format(entity_type, version, self.env)
        pp_doc = self.mongo[pp_db]['pp_table'].find_one({'unique_id': unique_id})

        profile_db = 'profile_{}_{}_{}'.format(entity_type, version, self.env)
        profile_doc = self.mongo[profile_db]['profile_table'].find_one({'unique_id': unique_id})

        if pp_doc:
            if not profile_doc:
                profile_doc = {}
            profile_new = self.online_calc.calc(pp_doc, profile_doc)
            self.mongo[profile_db]['profile_table'].replace_one({'unique_id': unique_id}, profile_new, True)
            profile_new['entity_type'] = entity_type
            return [json.dumps(profile_new)]
        else:
            return []

    def close(self):
        self.mongo.close()

    def update_conf(self):
        con, cursor = get_sql_conn(self.mysql_conf)
        strategy_data = get_dict_data_sql(cursor, 'SELECT * FROM calc_strategy')
        relation_data = get_dict_data_sql(cursor, 'SELECT * FROM table_relation')
        con.close()

        self.online_calc = OnlineCalc(strategy_data, relation_data)

        print('update config ...')
  • mysql 工具类
import pymysql


def get_sql_conn(conf):
    """
    获取数据库连接
    """
    conn = pymysql.connect(host=conf['host'], user=conf['username'], password=conf['password'], db=conf['db'])
    cursor = conn.cursor()
    return conn, cursor


def get_index_dict(cursor):
    """
    获取数据库对应表中的字段名
    """
    index_dict = dict()
    index = 0
    for desc in cursor.description:
        index_dict[desc[0]] = index
        index = index + 1
    return index_dict


def get_dict_data_sql(cursor, sql):
    """
    运行sql语句,获取结果,并根据表中字段名,转化成dict格式(默认是tuple格式)
    """
    cursor.execute(sql)
    data = cursor.fetchall()
    index_dict = get_index_dict(cursor)
    res = []
    for datai in data:
        resi = dict()
        for indexi in index_dict:
            resi[indexi] = datai[index_dict[indexi]]
        res.append(resi)
    return res

本次部署主要遇到的问题是使用的flink的新版本,提交命令发生了变化。然后也发现了一个flink1.15.2的bug,解决方式是升级版本,添加配置 python.systemenv.enabled=false

https://issues.apache.org/jira/browse/FLINK-29479

https://github.com/apache/flink/pull/21110

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容