Flink on k8s配置样例

# Configuration and service definition
$ kubectl create -f flink-configuration-configmap.yaml
$ kubectl create -f jobmanager-service.yaml
# Create the deployments for the cluster
$ kubectl create -f jobmanager-session-deployment.yaml
$ kubectl create -f taskmanager-session-deployment.yaml

Flink高可用会话模式集群配置文件




flink服务账户
---------------------------------------------------------------------------------------------------
flink-service-account.yaml
---------------------------------------------------------------------------------------------------
apiVersion: v1
kind: ServiceAccount
metadata:
  name: flink-service-account
  namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: default
  name: flink-role
rules:
- apiGroups: [""]
  resources: ["pods", "services", "configmaps"]
  verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["networking.k8s.io"]
  resources: ["ingresses"]
  verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: flink-rolebinding
  namespace: default
subjects:
- kind: ServiceAccount
  name: flink-service-account
  namespace: default
roleRef:
  kind: Role
  name: flink-role
  apiGroup: rbac.authorization.k8s.io
---------------------------------------------------------------------------------------------------




配置Flink在k8s上的运行环境,用于存储配置信息
---------------------------------------------------------------------------------------------------
flink-configuration-configmap.yaml
---------------------------------------------------------------------------------------------------
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: [flink-jobmanager]            # JobManager 的 RPC 地址,这里是 flink-jobmanager,Service 的名字
    taskmanager.numberOfTaskSlots: [2]                    # 每个 TaskManager 的任务槽数量
    blob.server.port: [6124]                            # 指定 BLOB(Binary Large Object)服务器的端口
    jobmanager.rpc.port: [6123]                            # JobManager 的 RPC 服务端口
    taskmanager.rpc.port: [6122]                        # TaskManager 的 RPC 服务端口
    queryable-state.proxy.ports: [6125]                    # Queryable State 代理的端口
    jobmanager.memory.process.size: 1600m                # JobManager 的内存大小
    taskmanager.memory.process.size: 1728m                # TaskManager 的内存大小
    parallelism.default: [2]                            # 默认的并行度
    kubernetes.cluster-id: <cluster-id>                    # 指定 Flink 集群在 Kubernetes 中的唯一标识符                                        >>>HA集群所需参数
    high-availability: kubernetes                        # 启用 Flink 的高可用性模式,并指定使用 Kubernetes 作为高可用性服务的后端                >>>HA集群所需参数
    high-availability.storageDir: hdfs:///flink/recovery    # 指定高可用性状态存储的目录, 根据实际情况更改为其他存储路径,如 S3、GCS 或 NFS            >>>HA集群所需参数
    restart-strategy: fixed-delay                        # 指定 Flink 作业的重启策略                                                        >>>HA集群所需参数
    restart-strategy.fixed-delay.attempts: 10            # 指定固定延迟重启策略下的重启尝试次数                                                >>>HA集群所需参数
  log4j-console.properties: |+
    rootLogger.level = INFO                                # 设置根日志记录器的日志级别为 INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender    # 将日志输出到控制台
    rootLogger.appenderRef.rolling.ref = RollingFileAppender    # 将日志输出到滚动文件

    # 为一些常见的库(如 Akka、Kafka、Hadoop、Zookeeper)单独设置日志级别为 INFO
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO

    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

    # Log all infos in the given rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10
---------------------------------------------------------------------------------------------------




jobmanager服务配置
---------------------------------------------------------------------------------------------------
jobmanager-service.yaml
---------------------------------------------------------------------------------------------------
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  selector:
    app: flink
    component: jobmanager
---------------------------------------------------------------------------------------------------



将 jobmanager 端口公开为公共 Kubernetes 节点的端口
---------------------------------------------------------------------------------------------------
jobmanager-rest-service.yaml
---------------------------------------------------------------------------------------------------
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-rest
spec:
  type: NodePort
  ports:
  - name: rest
    port: 8081
    targetPort: 8081
    nodePort: 30081
  selector:
    app: flink
    component: jobmanager
---------------------------------------------------------------------------------------------------



启用可查询状态
---------------------------------------------------------------------------------------------------
taskmanager-query-state-service.yaml
---------------------------------------------------------------------------------------------------
apiVersion: v1
kind: Service
metadata:
  name: flink-taskmanager-query-state
spec:
  type: NodePort
  ports:
  - name: query-state
    port: 6125
    targetPort: 6125
    nodePort: 30025
  selector:
    app: flink
    component: taskmanager
---------------------------------------------------------------------------------------------------



---------------------------------------------------------------------------------------------------
jobmanager-session-deployment-ha.yaml
---------------------------------------------------------------------------------------------------
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 2                                     # 在高可用配置中,JobManager 通常需要多个副本
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      serviceAccountName: flink-service-account
      containers:
      - name: jobmanager
        image: apache/flink:1.16.2-scala_2.12
        env:
        - name: POD_IP                            # 获取 Pod 的 IP 地址
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: status.podIP
        args: ["jobmanager", "$(POD_IP)"]        # 指定启动 JobManager 并传递 Pod IP
        ports:
        - containerPort: 6123                     # RPC 端口
          name: rpc
        - containerPort: 6124                    # BLOB 服务器端口
          name: blob-server
        - containerPort: 8081                    # Web UI 端口
          name: webui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:                            # 定义挂载卷
        - name: flink-config-volume                # 卷名称
          mountPath: /opt/flink/conf            # 挂载路径
        securityContext:                        # 安全上下文
          runAsUser: 9999                        # 以指定用户 ID 运行容器
      serviceAccountName: flink-service-account    # 指定使用的服务账户
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
---------------------------------------------------------------------------------------------------





---------------------------------------------------------------------------------------------------
taskmanager-session-deployment.yaml
---------------------------------------------------------------------------------------------------
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      serviceAccountName: flink-service-account
      containers:
      - name: taskmanager
        image: apache/flink:1.16.2-scala_2.12
        args: ["taskmanager"]
        ports:
        - containerPort: 6122                    # RPC 端口
          name: rpc
        - containerPort: 6125                    # 查询状态端口
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999                        # 以指定用户 ID 运行容器
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
---------------------------------------------------------------------------------------------------

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容