介绍
Kafka 简介
Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,由 Scala 和 Java 编写。它是一个分布式、支持分区的的、多副本,基于 zookeeper 协调的分布式消息系统。它最大特性是可以实时的处理大量数据以满足各种需求场景,比如基于 Hadoop 的批处理系统、低延时的实时系统、storm/Spark 流式处理引擎,web/nginx 日志,访问日志,消息服务等等。
Zookeeper 简介
ZooKeeper 是一种分布式协调服务,用于管理大型主机。在分布式环境中协调和管理服务是一个复杂的过程。ZooKeeper 通过其简单的架构和 API 解决了这个问题。 ZooKeeper 允许开发人员专注于核心应用程序逻辑,而不必担心应用程序的分布式特性。
Kafka 中主要利用 zookeeper 解决分布式一致性问题。Kafka 使用 Zookeeper 的分布式协调服务将生产者,消费者,消息储存结合在一起。同时借助 Zookeeper,Kafka 能够将生产者、消费者和 Broker 在内的所有组件在无状态的条件下建立起生产者和消费者的订阅关系,实现生产者的负载均衡。
Kafka Manager 简介
Kafka Manager 是目前最受欢迎的 Kafka 集群管理工具,最早由雅虎开源,用户可以在 Web 界面执行一些简单的集群管理操作。
支持以下功能:
- 管理 Kafka 集群
- 方便集群状态监控 (包括topics, consumers, offsets, brokers, replica distribution, partition distribution)
- 方便选择分区副本
- 配置分区任务,包括选择使用哪些 Brokers
- 可以对分区任务重分配
- 提供不同的选项来创建及删除 Topic
- Topic List 会指明哪些topic被删除
- 批量产生分区任务并且和多个topic和brokers关联
- 批量运行多个主题对应的多个分区
- 向已经存在的主题中添加分区
- 对已经存在的 Topic 修改配置
- 可以在 Broker Level 和 Topic Level 的度量中启用 JMX Polling 功能
- 可以过滤在 ZooKeeper 上没有 ids/owners/offsets/directories 的 consumer
此环境不建议线上使用!!!
部署过程
这个流程需要部署三个组件,分别为 Zookeeper、Kafka、Kafka Manager:
(1)、Zookeeper: 首先部署 Zookeeper,方便后续部署 Kafka 节点注册到 Zookeeper,用 StatefulSet 方式部署三个节点。
(2)、Kafka: 第二个部署的是 Kafka,设置环境变量来指定 Zookeeper 地址,用 StatefulSet 方式部署。
(3)、Kafka Manager: 最后部署的是 Kafka Manager,用 Deployment 方式部署,然后打开 Web UI 界面来管理、监控 Kafka。
Kubernetes 部署 Zookeeper & Kafka & Kafka Manager
1、创建 StorageClass
由于都是使用 StatefulSet 方式部署的有状态服务,所以 Kubernetes 集群需要提前设置一个 StorageClass 方便后续部署时指定存储分配(如果想指定为已经存在的 StorageClass 创建 PV 则跳过此步骤)。
此处用的是 NFS 存储驱动,如果是其它存储需要提前设置好相关配置
创建 StorageClass 部署文件
nfs-storage.yaml
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: nfs-storage
provisioner: qgg-nfs-storage #动态卷分配服务指定的名称
parameters:
archiveOnDelete: "true" #设置为"false"时删除PVC不会保留数据,"true"则保留数据
mountOptions:
- hard #指定为硬挂载方式
- nfsvers=4 #指定NFS版本
部署 StorageClass
kubectl apply -f nfs-storage.yaml
2、Kubernetes 部署 Zookeeper
创建 Zookeeper 部署文件
zookeeper.yaml
#部署 Service Headless,用于Zookeeper间相互通信
apiVersion: v1
kind: Service
metadata:
name: zookeeper-headless
labels:
app: zookeeper
spec:
type: ClusterIP
clusterIP: None
publishNotReadyAddresses: true
ports:
- name: client
port: 2181
targetPort: client
- name: follower
port: 2888
targetPort: follower
- name: election
port: 3888
targetPort: election
selector:
app: zookeeper
---
#部署 Service,用于外部访问 Zookeeper
apiVersion: v1
kind: Service
metadata:
name: zookeeper
labels:
app: zookeeper
spec:
type: ClusterIP
ports:
- name: client
port: 2181
targetPort: client
- name: follower
port: 2888
targetPort: follower
- name: election
port: 3888
targetPort: election
selector:
app: zookeeper
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: zookeeper
labels:
app: zookeeper
spec:
serviceName: zookeeper-headless
replicas: 3
podManagementPolicy: Parallel
updateStrategy:
type: RollingUpdate
selector:
matchLabels:
app: zookeeper
template:
metadata:
name: zookeeper
labels:
app: zookeeper
spec:
securityContext:
fsGroup: 1001
containers:
- name: zookeeper
image: docker.io/bitnami/zookeeper:3.4.14-debian-9-r25
imagePullPolicy: IfNotPresent
securityContext:
runAsUser: 1001
command:
- bash
- -ec
- |
# Execute entrypoint as usual after obtaining ZOO_SERVER_ID based on POD hostname
HOSTNAME=`hostname -s`
if [[ $HOSTNAME =~ (.*)-([0-9]+)$ ]]; then
ORD=${BASH_REMATCH[2]}
export ZOO_SERVER_ID=$((ORD+1))
else
echo "Failed to get index from hostname $HOST"
exit 1
fi
. /opt/bitnami/base/functions
. /opt/bitnami/base/helpers
print_welcome_page
. /init.sh
nami_initialize zookeeper
exec tini -- /run.sh
env:
- name: ZOO_PORT_NUMBER
value: "2181"
- name: ZOO_TICK_TIME
value: "2000"
- name: ZOO_INIT_LIMIT
value: "10"
- name: ZOO_SYNC_LIMIT
value: "5"
- name: ZOO_MAX_CLIENT_CNXNS
value: "60"
- name: ZOO_SERVERS
value: "
zookeeper-0.zookeeper-headless:2888:3888,
zookeeper-1.zookeeper-headless:2888:3888,
zookeeper-2.zookeeper-headless:2888:3888
"
- name: ZOO_ENABLE_AUTH
value: "no"
- name: ZOO_HEAP_SIZE
value: "1024"
- name: ZOO_LOG_LEVEL
value: "ERROR"
- name: ALLOW_ANONYMOUS_LOGIN
value: "yes"
ports:
- name: client
containerPort: 2181
- name: follower
containerPort: 2888
- name: election
containerPort: 3888
livenessProbe:
tcpSocket:
port: client
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
successThreshold: 1
failureThreshold: 6
readinessProbe:
tcpSocket:
port: client
initialDelaySeconds: 5
periodSeconds: 10
timeoutSeconds: 5
successThreshold: 1
failureThreshold: 6
volumeMounts:
- name: data
mountPath: /bitnami/zookeeper
volumeClaimTemplates:
- metadata:
name: data
annotations:
spec:
storageClassName: managed-nfs-storage #指定为上面创建的 storageclass
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 5Gi
部署 Zookeeper
-n:指定应用启动的 Namespace,替换自己集群的 Namespace
kubectl apply -f zookeeper.yaml -n kafka
这些是zookeeper的myid,根据pod数目自增的。
3、Kubernetes 部署 Kafka
创建 Kafka 部署文件
kafka.yaml
#部署 Service Headless,用于Kafka间相互通信
apiVersion: v1
kind: Service
metadata:
name: kafka-headless
labels:
app: kafka
spec:
type: ClusterIP
clusterIP: None
ports:
- name: kafka
port: 9092
targetPort: kafka
selector:
app: kafka
---
#部署 Service,用于外部访问 Kafka
apiVersion: v1
kind: Service
metadata:
name: kafka
labels:
app: kafka
spec:
type: ClusterIP
ports:
- name: kafka
port: 9092
targetPort: kafka
selector:
app: kafka
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: "kafka"
labels:
app: kafka
spec:
selector:
matchLabels:
app: kafka
serviceName: kafka-headless
podManagementPolicy: "Parallel"
replicas: 3
updateStrategy:
type: "RollingUpdate"
template:
metadata:
name: "kafka"
labels:
app: kafka
spec:
securityContext:
fsGroup: 1001
runAsUser: 1001
containers:
- name: kafka
image: "docker.io/bitnami/kafka:2.3.0-debian-9-r4"
imagePullPolicy: "IfNotPresent"
env:
- name: MY_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: MY_POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: KAFKA_CFG_ZOOKEEPER_CONNECT
value: "zookeeper" #Zookeeper Service 名称
- name: KAFKA_PORT_NUMBER
value: "9092"
- name: KAFKA_CFG_LISTENERS
value: "PLAINTEXT://:$(KAFKA_PORT_NUMBER)"
- name: KAFKA_CFG_ADVERTISED_LISTENERS
value: 'PLAINTEXT://$(MY_POD_NAME).kafka-headless:$(KAFKA_PORT_NUMBER)'
- name: ALLOW_PLAINTEXT_LISTENER
value: "yes"
- name: KAFKA_HEAP_OPTS
value: "-Xmx512m -Xms512m"
- name: KAFKA_CFG_LOGS_DIRS
value: /opt/bitnami/kafka/data
- name: JMX_PORT
value: "9988"
ports:
- name: kafka
containerPort: 9092
livenessProbe:
tcpSocket:
port: kafka
initialDelaySeconds: 10
periodSeconds: 10
timeoutSeconds: 5
successThreshold: 1
failureThreshold: 2
readinessProbe:
tcpSocket:
port: kafka
initialDelaySeconds: 5
periodSeconds: 10
timeoutSeconds: 5
successThreshold: 1
failureThreshold: 6
volumeMounts:
- name: data
mountPath: /bitnami/kafka
volumeClaimTemplates:
- metadata:
name: data
spec:
storageClassName: managed-nfs-storage #指定为上面创建的 storageclass
accessModes:
- "ReadWriteOnce"
resources:
requests:
storage: 5Gi
部署 Kafka
-n:指定应用启动的 Namespace,替换自己集群的 Namespace
kubectl apply -f kafka.yaml -n kafka
这些是kafka的myid,根据pod数目自增的。
4、Kubernetes 部署 Kafka Manager
创建 Kafka Manager 部署文件
kafka-manager.yaml
apiVersion: v1
kind: Service
metadata:
name: kafka-manager
labels:
app: kafka-manager
spec:
type: NodePort
ports:
- name: kafka
port: 9000
targetPort: 9000
nodePort: 30900
selector:
app: kafka-manager
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-manager
labels:
app: kafka-manager
spec:
replicas: 1
selector:
matchLabels:
app: kafka-manager
template:
metadata:
labels:
app: kafka-manager
spec:
containers:
- name: kafka-manager
image: zenko/kafka-manager:1.3.3.22
imagePullPolicy: IfNotPresent
ports:
- name: kafka-manager
containerPort: 9000
protocol: TCP
env:
- name: ZK_HOSTS
value: "zookeeper:2181"
livenessProbe:
httpGet:
path: /api/health
port: kafka-manager
readinessProbe:
httpGet:
path: /api/health
port: kafka-manager
部署 Kafka Manager
-n:指定应用启动的 Namespace,替换自己集群的 Namespace
kubectl apply -f kafka-manager.yaml -n kafka
四、进入 Kafka Manager 管理 Kafka 集群
这里的 Kubernetes 集群地址为:192.168.0.197,并且在上面设置 Kafka-Manager 网络策略为 NodePort 方式,且设置端口为 30900,这里输入地址:http://192.168.0.197:30900 访问 Kafka Manager。
进入后先配置 Kafka Manager,增加一个 Zookeeper 地址。
配置三个必填参数:
Cluster Name:自定义一个名称,任意输入即可。
Zookeeper Hosts:输入 Zookeeper 地址,这里设置为 Zookeeper 服务名+端口。
Kafka Version:选择 kafka 版本。