Flink on k8s配置样例

# Configuration and service definition
$ kubectl create -f flink-configuration-configmap.yaml
$ kubectl create -f jobmanager-service.yaml
# Create the deployments for the cluster
$ kubectl create -f jobmanager-session-deployment.yaml
$ kubectl create -f taskmanager-session-deployment.yaml

Flink高可用会话模式集群配置文件




flink服务账户
---------------------------------------------------------------------------------------------------
flink-service-account.yaml
---------------------------------------------------------------------------------------------------
apiVersion: v1
kind: ServiceAccount
metadata:
  name: flink-service-account
  namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: default
  name: flink-role
rules:
- apiGroups: [""]
  resources: ["pods", "services", "configmaps"]
  verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["networking.k8s.io"]
  resources: ["ingresses"]
  verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: flink-rolebinding
  namespace: default
subjects:
- kind: ServiceAccount
  name: flink-service-account
  namespace: default
roleRef:
  kind: Role
  name: flink-role
  apiGroup: rbac.authorization.k8s.io
---------------------------------------------------------------------------------------------------




配置Flink在k8s上的运行环境,用于存储配置信息
---------------------------------------------------------------------------------------------------
flink-configuration-configmap.yaml
---------------------------------------------------------------------------------------------------
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: [flink-jobmanager]            # JobManager 的 RPC 地址,这里是 flink-jobmanager,Service 的名字
    taskmanager.numberOfTaskSlots: [2]                    # 每个 TaskManager 的任务槽数量
    blob.server.port: [6124]                            # 指定 BLOB(Binary Large Object)服务器的端口
    jobmanager.rpc.port: [6123]                            # JobManager 的 RPC 服务端口
    taskmanager.rpc.port: [6122]                        # TaskManager 的 RPC 服务端口
    queryable-state.proxy.ports: [6125]                    # Queryable State 代理的端口
    jobmanager.memory.process.size: 1600m                # JobManager 的内存大小
    taskmanager.memory.process.size: 1728m                # TaskManager 的内存大小
    parallelism.default: [2]                            # 默认的并行度
    kubernetes.cluster-id: <cluster-id>                    # 指定 Flink 集群在 Kubernetes 中的唯一标识符                                        >>>HA集群所需参数
    high-availability: kubernetes                        # 启用 Flink 的高可用性模式,并指定使用 Kubernetes 作为高可用性服务的后端                >>>HA集群所需参数
    high-availability.storageDir: hdfs:///flink/recovery    # 指定高可用性状态存储的目录, 根据实际情况更改为其他存储路径,如 S3、GCS 或 NFS            >>>HA集群所需参数
    restart-strategy: fixed-delay                        # 指定 Flink 作业的重启策略                                                        >>>HA集群所需参数
    restart-strategy.fixed-delay.attempts: 10            # 指定固定延迟重启策略下的重启尝试次数                                                >>>HA集群所需参数
  log4j-console.properties: |+
    rootLogger.level = INFO                                # 设置根日志记录器的日志级别为 INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender    # 将日志输出到控制台
    rootLogger.appenderRef.rolling.ref = RollingFileAppender    # 将日志输出到滚动文件

    # 为一些常见的库(如 Akka、Kafka、Hadoop、Zookeeper)单独设置日志级别为 INFO
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO

    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

    # Log all infos in the given rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10
---------------------------------------------------------------------------------------------------




jobmanager服务配置
---------------------------------------------------------------------------------------------------
jobmanager-service.yaml
---------------------------------------------------------------------------------------------------
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  selector:
    app: flink
    component: jobmanager
---------------------------------------------------------------------------------------------------



将 jobmanager 端口公开为公共 Kubernetes 节点的端口
---------------------------------------------------------------------------------------------------
jobmanager-rest-service.yaml
---------------------------------------------------------------------------------------------------
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-rest
spec:
  type: NodePort
  ports:
  - name: rest
    port: 8081
    targetPort: 8081
    nodePort: 30081
  selector:
    app: flink
    component: jobmanager
---------------------------------------------------------------------------------------------------



启用可查询状态
---------------------------------------------------------------------------------------------------
taskmanager-query-state-service.yaml
---------------------------------------------------------------------------------------------------
apiVersion: v1
kind: Service
metadata:
  name: flink-taskmanager-query-state
spec:
  type: NodePort
  ports:
  - name: query-state
    port: 6125
    targetPort: 6125
    nodePort: 30025
  selector:
    app: flink
    component: taskmanager
---------------------------------------------------------------------------------------------------



---------------------------------------------------------------------------------------------------
jobmanager-session-deployment-ha.yaml
---------------------------------------------------------------------------------------------------
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 2                                     # 在高可用配置中,JobManager 通常需要多个副本
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      serviceAccountName: flink-service-account
      containers:
      - name: jobmanager
        image: apache/flink:1.16.2-scala_2.12
        env:
        - name: POD_IP                            # 获取 Pod 的 IP 地址
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: status.podIP
        args: ["jobmanager", "$(POD_IP)"]        # 指定启动 JobManager 并传递 Pod IP
        ports:
        - containerPort: 6123                     # RPC 端口
          name: rpc
        - containerPort: 6124                    # BLOB 服务器端口
          name: blob-server
        - containerPort: 8081                    # Web UI 端口
          name: webui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:                            # 定义挂载卷
        - name: flink-config-volume                # 卷名称
          mountPath: /opt/flink/conf            # 挂载路径
        securityContext:                        # 安全上下文
          runAsUser: 9999                        # 以指定用户 ID 运行容器
      serviceAccountName: flink-service-account    # 指定使用的服务账户
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
---------------------------------------------------------------------------------------------------





---------------------------------------------------------------------------------------------------
taskmanager-session-deployment.yaml
---------------------------------------------------------------------------------------------------
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      serviceAccountName: flink-service-account
      containers:
      - name: taskmanager
        image: apache/flink:1.16.2-scala_2.12
        args: ["taskmanager"]
        ports:
        - containerPort: 6122                    # RPC 端口
          name: rpc
        - containerPort: 6125                    # 查询状态端口
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999                        # 以指定用户 ID 运行容器
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
---------------------------------------------------------------------------------------------------

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,496评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,407评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,632评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,180评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,198评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,165评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,052评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,910评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,324评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,542评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,711评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,424评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,017评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,668评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,823评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,722评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,611评论 2 353

推荐阅读更多精彩内容