目录:
- native flink on k8s 部署方式:session && application
- natvie flink on k8s高可用方式: k8s HA && ZK HA
一、部署
下载flink1.12版本,添加hadoop依赖包,flink-shaded-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jar,commons-cli-1.4.jar
注:hadoop包可以网上找或者自己打包:mvn clean install -DskipTests -Dhadoop.version=3.1.1 ,找到对应flink-shaded-hadoop包即可。
二、执行
1.在k8s上创建serviceaccount:
kubectl create serviceaccount flink
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink
在有kubectl环境提交命令
a. application mode:
(1) dockerfile:
FROM flink:1.12.0-scala_2.12
RUN mkdir -p $FLINK_HOME/usrlib
COPY flink-clickhousetest-1.0-SNAPSHOT.jar $FLINK_HOME/usrlib/
注:目前只能使用本地包,不能使用外部hdfs之类的jar包路径,所以在打包的时候将执行的jar包打到镜像里。
(2)执行
./bin/flink run-application -p 16 -t kubernetes-application \
-c test.datawarehouse.DwSplitDataJob \
-Dkubernetes.jobmanager.service-account=flink \
-Dkubernetes.cluster-id=flink-k8s-application-cluster \
-Dtaskmanager.memory.process.size=6g \
-Dkubernetes.taskmanager.cpu=5 \
-Dtaskmanager.numberOfTaskSlots=16 \
-Dkubernetes.container.image=flink-test12:v1 \
-Dkubernetes.rest-service.exposed.type=NodePort \
local:///opt/flink/usrlib/flink-clickhousetest-1.0-SNAPSHOT.jar
b. session mode:
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=flink-k8s-session-cluster \
-Dkubernetes.jobmanager.service-account=flink \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dtaskmanager.numberOfTaskSlots=16 \
-Dtaskmanager.memory.process.size=6g \
-Dkubernetes.taskmanager.cpu=5
session mode任务提交和standalone模式相同
三、相关问题
hadoop依赖问题
参考:https://www.jianshu.com/p/cd5f8e2e2e9cjobmanager重启,日志报错Too old resource version
参考:https://www.jianshu.com/p/bddda290e0a8
四、JM高可用
背景
高可用目的:JobManager 对任务起到调度协调作用,他管理这任务调度和资源管理。默认是一个flink集群有单个JM,当JM挂了会导致单点失败,其他任务无法提交。JM高可用的目的是为了消除单点问题。
问题1:对比hdfs和flink高可用区别:
HDFS的HA切换,主要保证的是数据请求处理的正常服务。而Flink要让所有的失败任务能够快速恢复。我们可以从更高层面来理解这样的差异:一个是存储系统的HA实现,一个是计算框架的HA实现。
问题2:flink高可用需要什么?
FlinkJobMnager在服务发生切换的时候要及时地通知外界事物。这里的外界事物包括:
- JobManager管理的TaskManager
- 在跑的所有Job
- 在请求的JobClient客户端
然后这些Job,JobClient收到新的leader信息后,能够主动重新连接新的JobManager地址,保证任务的正常执行。
高可用实现方式
1. k8s HA service方式
a.配置高可用参数
kubernetes.cluster-id: <cluster-id>
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: hdfs:///flink/recovery
b. 提交任务执行
k8s HA service方式,是当JM挂了之后,会重新创建新的JM,从hdfs 数据恢复,重新执行任务。
实际存储:
1)blob:存储用户jar之类的
2)checkpoint:存储任务上次完成的checkpoint指针
3) jobgraph:存储任务jobgraph
测试现象:使用 k8s configmap实现高可用,当JM挂了之后,会自动新建一个JM,然后整体任务重启。
参考:https://zhuanlan.zhihu.com/p/89537466
2. ZK HA services
high-availability.storageDir: hdfs:///user/flink/zk/recovery
high-availability: zookeeper
high-availability.zookeeper.quorum: ip:2181
high-availability.zookeeper.path.root: /flink
测试现象:使用zk 实现高可用,当JM挂了之后,会自动新建JM同时新建对应TM,旧的TM会在任务正常后删除。
其他
flink 高可用局限:JM挂了之后任务需要重启而不是无缝连接
参考文章:https://zhuanlan.zhihu.com/p/89537466