# Configuration and service definition
$ kubectl create -f flink-configuration-configmap.yaml
$ kubectl create -f jobmanager-service.yaml
# Create the deployments for the cluster
$ kubectl create -f jobmanager-session-deployment.yaml
$ kubectl create -f taskmanager-session-deployment.yaml
Flink高可用会话模式集群配置文件
flink服务账户
---------------------------------------------------------------------------------------------------
flink-service-account.yaml
---------------------------------------------------------------------------------------------------
apiVersion: v1
kind: ServiceAccount
metadata:
name: flink-service-account
namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: default
name: flink-role
rules:
- apiGroups: [""]
resources: ["pods", "services", "configmaps"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["networking.k8s.io"]
resources: ["ingresses"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: flink-rolebinding
namespace: default
subjects:
- kind: ServiceAccount
name: flink-service-account
namespace: default
roleRef:
kind: Role
name: flink-role
apiGroup: rbac.authorization.k8s.io
---------------------------------------------------------------------------------------------------
配置Flink在k8s上的运行环境,用于存储配置信息
---------------------------------------------------------------------------------------------------
flink-configuration-configmap.yaml
---------------------------------------------------------------------------------------------------
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: [flink-jobmanager] # JobManager 的 RPC 地址,这里是 flink-jobmanager,Service 的名字
taskmanager.numberOfTaskSlots: [2] # 每个 TaskManager 的任务槽数量
blob.server.port: [6124] # 指定 BLOB(Binary Large Object)服务器的端口
jobmanager.rpc.port: [6123] # JobManager 的 RPC 服务端口
taskmanager.rpc.port: [6122] # TaskManager 的 RPC 服务端口
queryable-state.proxy.ports: [6125] # Queryable State 代理的端口
jobmanager.memory.process.size: 1600m # JobManager 的内存大小
taskmanager.memory.process.size: 1728m # TaskManager 的内存大小
parallelism.default: [2] # 默认的并行度
kubernetes.cluster-id: <cluster-id> # 指定 Flink 集群在 Kubernetes 中的唯一标识符 >>>HA集群所需参数
high-availability: kubernetes # 启用 Flink 的高可用性模式,并指定使用 Kubernetes 作为高可用性服务的后端 >>>HA集群所需参数
high-availability.storageDir: hdfs:///flink/recovery # 指定高可用性状态存储的目录, 根据实际情况更改为其他存储路径,如 S3、GCS 或 NFS >>>HA集群所需参数
restart-strategy: fixed-delay # 指定 Flink 作业的重启策略 >>>HA集群所需参数
restart-strategy.fixed-delay.attempts: 10 # 指定固定延迟重启策略下的重启尝试次数 >>>HA集群所需参数
log4j-console.properties: |+
rootLogger.level = INFO # 设置根日志记录器的日志级别为 INFO
rootLogger.appenderRef.console.ref = ConsoleAppender # 将日志输出到控制台
rootLogger.appenderRef.rolling.ref = RollingFileAppender # 将日志输出到滚动文件
# 为一些常见的库(如 Akka、Kafka、Hadoop、Zookeeper)单独设置日志级别为 INFO
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
---------------------------------------------------------------------------------------------------
jobmanager服务配置
---------------------------------------------------------------------------------------------------
jobmanager-service.yaml
---------------------------------------------------------------------------------------------------
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob-server
port: 6124
- name: webui
port: 8081
selector:
app: flink
component: jobmanager
---------------------------------------------------------------------------------------------------
将 jobmanager 端口公开为公共 Kubernetes 节点的端口
---------------------------------------------------------------------------------------------------
jobmanager-rest-service.yaml
---------------------------------------------------------------------------------------------------
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager-rest
spec:
type: NodePort
ports:
- name: rest
port: 8081
targetPort: 8081
nodePort: 30081
selector:
app: flink
component: jobmanager
---------------------------------------------------------------------------------------------------
启用可查询状态
---------------------------------------------------------------------------------------------------
taskmanager-query-state-service.yaml
---------------------------------------------------------------------------------------------------
apiVersion: v1
kind: Service
metadata:
name: flink-taskmanager-query-state
spec:
type: NodePort
ports:
- name: query-state
port: 6125
targetPort: 6125
nodePort: 30025
selector:
app: flink
component: taskmanager
---------------------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------------------
jobmanager-session-deployment-ha.yaml
---------------------------------------------------------------------------------------------------
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 2 # 在高可用配置中,JobManager 通常需要多个副本
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
serviceAccountName: flink-service-account
containers:
- name: jobmanager
image: apache/flink:1.16.2-scala_2.12
env:
- name: POD_IP # 获取 Pod 的 IP 地址
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
args: ["jobmanager", "$(POD_IP)"] # 指定启动 JobManager 并传递 Pod IP
ports:
- containerPort: 6123 # RPC 端口
name: rpc
- containerPort: 6124 # BLOB 服务器端口
name: blob-server
- containerPort: 8081 # Web UI 端口
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts: # 定义挂载卷
- name: flink-config-volume # 卷名称
mountPath: /opt/flink/conf # 挂载路径
securityContext: # 安全上下文
runAsUser: 9999 # 以指定用户 ID 运行容器
serviceAccountName: flink-service-account # 指定使用的服务账户
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
---------------------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------------------
taskmanager-session-deployment.yaml
---------------------------------------------------------------------------------------------------
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
serviceAccountName: flink-service-account
containers:
- name: taskmanager
image: apache/flink:1.16.2-scala_2.12
args: ["taskmanager"]
ports:
- containerPort: 6122 # RPC 端口
name: rpc
- containerPort: 6125 # 查询状态端口
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999 # 以指定用户 ID 运行容器
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
---------------------------------------------------------------------------------------------------
Flink on k8s配置样例
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 1.高可用K8S集群在硬件资源不足条件下的临时处理 资源不足的情况下,直接开两台master节点;⇒由于etcd集...
- 简介: 本文主要介绍 Flink on Yarn/K8s 的原理及应用实践,文章将从 Flink 架构、Flink...
- 本文基于 Flink-1.13 介绍 Flink on Kubernetes 的部署模式,重点讲述 Session...