Kubernetes :容器编排引擎
Flink:分布式流数据流引擎
minio:开源的对象存储服务器(类似阿里OSS),兼容亚马逊的S3协议 , 对Kubernetes能够友好的支持
1.MiniKube 环境准备
# curl -Lo minikube https://kubernetes.oss-cn-hangzhou.aliyuncs.com/minikube/releases/v1.17.1/minikube-linux-amd64 && chmod +x minikube && sudo mv minikube /usr/local/bin/
minikube start --cpus=5 \
--memory=10G \
--disk-size=40g \
--vm-driver=virtualbox \
--cache-images=true \
--registry-mirror=https://82jqj6ii.mirror.aliyuncs.com
2.Minio部署
helm install minio minio \
--repo https://helm.min.io \
--values values-minio.yaml
values-minio.yaml
accessKey: admin
secretKey: password
defaultBucket:
enabled: true
name: vvp
resources:
requests:
memory: 256Mi
cpu: 250m
limits:
memory: 256Mi
cpu: 1000m
通过MiniKube查看我们的minio安装情况:
➜ ~ kga #查看集群资源
NAME READY STATUS RESTARTS AGE
pod/minio-69b7b8789b-v49lt 1/1 Running 0 3h
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 46h
service/minio ClusterIP 10.110.43.169 <none> 9000/TCP 3h
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/minio 1/1 1 1 3h
➜ ~ kpf service/minio 3000:9000 #暴露服务端口9000到本地3000
Forwarding from 127.0.0.1:3000 -> 9000
Forwarding from [::1]:3000 -> 9000
访问: http://localhost:3000 , 账密: admin/password
3.Flink 集群启动及配置
3.1.配置Flink
# $FLINK_HOME/conf/flink-conf.yaml
# --------------自定义配置项-----------------
kubernetes.cluster-id: flink-session-cluster
kubernetes.namespace: default
#启用s3插件,以支持s3协议
containerized.master.env.ENABLE_BUILT_IN_PLUGINS: flink-s3-fs-presto-1.12.2.jar
containerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS: flink-s3-fs-presto-1.12.2.jar
s3.endpoint: http://minio:9000
s3.path.style.access: true
s3.access-key: admin
s3.secret-key: password
# state.backend: filesystem
# state.checkpoints.dir: s3://vvp/checkpoints/
# --------------------------------------------
3.2.启动Flink
cd $FLINK_HOME
./kubernetes-session.sh
3.3.查看集群资源
➜ ~ kga
NAME READY STATUS RESTARTS AGE
pod/flink-session-cluster-6f84d4cc9b-g4ctl 1/1 Running 0 29m
pod/minio-69b7b8789b-v49lt 1/1 Running 0 3h6m
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/flink-session-cluster ClusterIP None <none> 6123/TCP,6124/TCP 29m
service/flink-session-cluster-rest LoadBalancer 10.100.159.121 <pending> 8081:30165/TCP 29m
service/kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 46h
service/minio ClusterIP 10.110.43.169 <none> 9000/TCP 3h6m
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/flink-session-cluster 1/1 1 1 29m
deployment.apps/minio 1/1 1 1 3h6m
NAME DESIRED CURRENT READY AGE
replicaset.apps/flink-session-cluster-6f84d4cc9b 1 1 1 29m
replicaset.apps/minio-69b7b8789b 1 1 1 3h6m
➜ ~
查看配置项
➜ ~ kgcm
NAME DATA AGE
flink-config-flink-session-cluster 3 34m
kube-root-ca.crt 1 46h
minio 1 3h11m
➜ ~ kgcm flink-config-flink-session-cluster -o yaml
apiVersion: v1
data:
flink-conf.yaml: |
blob.server.port: 6124
taskmanager.memory.process.size: 1728m
kubernetes.internal.jobmanager.entrypoint.class: org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
jobmanager.execution.failover-strategy: region
jobmanager.rpc.address: flink-session-cluster.default
containerized.master.env.ENABLE_BUILT_IN_PLUGINS: flink-s3-fs-presto-1.12.2.jar
execution.target: kubernetes-session
jobmanager.memory.process.size: 1600m
kubernetes.cluster-id: flink-session-cluster
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
s3.access-key: admin
s3.secret-key: password
s3.endpoint: http://minio:9000
internal.cluster.execution-mode: NORMAL
kubernetes.namespace: default
parallelism.default: 1
taskmanager.numberOfTaskSlots: 1
containerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS: flink-s3-fs-presto-1.12.2.jar
s3.path.style.access: true
log4j-console.properties: |
monitorInterval=30
...
logger.netty.level = OFF
logback-console.xml: |
<configuration>
...
</configuration>
kind: ConfigMap
metadata:
labels:
app: flink-session-cluster
type: flink-native-kubernetes
name: flink-config-flink-session-cluster
namespace: default
ownerReferences:
- apiVersion: apps/v1
blockOwnerDeletion: true
controller: true
kind: Deployment
name: flink-session-cluster
➜ ~
4.配置Ingress暴露服务
4.1.启用拓展项
除了通过 kubectl port-forward ${SVC_NAME} ${host_port}:${target_port}
方式以外,我们直接配置一个ingress来暴露我们的服务。
➜ ~ mk addons list
|-----------------------------|----------|--------------|
| ADDON NAME | PROFILE | STATUS |
|-----------------------------|----------|--------------|
| ambassador | minikube | disabled |
| csi-hostpath-driver | minikube | disabled |
| dashboard | minikube | disabled |
| default-storageclass | minikube | enabled ✅ |
| efk | minikube | disabled |
| freshpod | minikube | disabled |
| gcp-auth | minikube | disabled |
| gvisor | minikube | disabled |
| helm-tiller | minikube | disabled |
| ingress | minikube | enabled ✅ |
| ingress-dns | minikube | disabled |
| istio | minikube | disabled |
| istio-provisioner | minikube | disabled |
| kubevirt | minikube | disabled |
| logviewer | minikube | disabled |
| metallb | minikube | disabled |
| metrics-server | minikube | disabled |
| nvidia-driver-installer | minikube | disabled |
| nvidia-gpu-device-plugin | minikube | disabled |
| olm | minikube | disabled |
| pod-security-policy | minikube | disabled |
| registry | minikube | disabled |
| registry-aliases | minikube | disabled |
| registry-creds | minikube | disabled |
| storage-provisioner | minikube | enabled ✅ |
| storage-provisioner-gluster | minikube | disabled |
| volumesnapshots | minikube | disabled |
|-----------------------------|----------|--------------|
➜ ~ mk enable ingress # 因为我已经enable了,这边旧不再做一次enable
4.2.配置ingress
apiVersion: networking.k8s.io/v1beta1
kind: Ingress
metadata:
name: ingress-svc
namespace: default
spec:
rules:
- host: minio.cluster.io
http:
paths:
- backend:
serviceName: minio
servicePort: 9000
path: /
- host: flink.cluster.io
http:
paths:
- backend:
serviceName: flink-session-cluster-rest
servicePort: 8081
path: /
查看ingress暴露的ip地址
➜ ~ kgi
NAME CLASS HOSTS ADDRESS PORTS AGE
ingress-svc <none> flink.cluster.io,minio.cluster.io 192.168.99.102 80 27h
➜ ~
4.3.配置本地hosts
# MiniKube
192.168.99.102 flink.cluster.io
192.168.99.102 minio.cluster.io
5.查看集成结果
5.1.访问minio
账密:admin/password
添加: bucket/input/test.txt (用于下面执行flink-job)
5.2.访问flink
5.3.提交作业
- 进入左边
Submit New Job
菜单项 add new
- 选择本地jar包(
$FLINK_HOME/example/batch/WordCount.jar
),提交上传 - 点击刚提交的作业,此时出现作业参数配置菜单
- 在
Program Arguments
项配置--input s3://input/input.txt --output s3://output/output.txt
参考文档
Flink官方文档.Deployment.ResourceProviders.NativeKubernetes
Flink官方文档.Deployment.文件系统.AmasonS3
Flink官方文档.Deployment.文件系统.Plugins
Flink官方文档.Deployment.命令行界面.Advanced CLI
Flink官方文档.Deployment.配置参数
Ververica官方文档.GettingStart.Setup
Minikube - Kubernetes本地实验环境(from阿里云开发者社区)