节点数 | |
---|---|
zookeeper | 1或3 |
kafka | 1+ |
k8s-zookeeper
docker pull zookeeper:3.6.2
mkdir /data/nfs-volume/zookeeper
# 这个是单机,打开注释replicas 3集群
cat >zookeeper-cluster.yaml<<\EOF
apiVersion: v1
kind: ConfigMap
metadata:
name: zookeeper
namespace: efk
data:
zoo.cfg: |
tickTime=2000
initLimit=5
syncLimit=2
maxClientCnxns=120
dataDir=/data
log_Dir=/data
server.1=zookeeper-0:2888:3888;2181
# server.2=zookeeper-1:2888:3888;2181
# server.3=zookeeper-2:2888:3888;2181
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
labels:
name: zookeeper
name: zookeeper
namespace: efk
spec:
replicas: 1
serviceName: zookeeper
revisionHistoryLimit: 10
selector:
matchLabels:
name: zookeeper
template:
metadata:
labels:
name: zookeeper
spec:
nodeSelector:
efk: "true" ## 指定部署在哪个节点。需根据环境来修改
initContainers:
- name: fix-permissions
image: alpine:3.6
securityContext:
privileged: true
env:
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
command: [ "sh", "-c", "mkdir -p /mnt/${NAMESPACE}/${POD_NAME}/data && echo ${POD_NAME} |awk -F '-' '{print $2+1}' >/mnt/${NAMESPACE}/${POD_NAME}/myid && chown -R 1000:1000 /mnt"]
imagePullPolicy: IfNotPresent
volumeMounts:
- name: zookeeper-data
mountPath: /mnt
containers:
- name: zookeeper
image: zookeeper:3.6.2
securityContext:
runAsUser: 1000
runAsGroup: 1000
env:
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: JVMFLAGS
value: "-Xms512m -Xmx512m"
resources:
limits:
cpu: 1000m
memory: 1Gi
requests:
cpu: 500m
memory: 600Mi
ports:
- containerPort: 2181
protocol: TCP
- containerPort: 2888
protocol: TCP
- containerPort: 3888
protocol: TCP
volumeMounts:
- name: zkp-config
mountPath: /conf/zoo.cfg
subPath: zoo.cfg
- name: zookeeper-data
mountPath: /data
subPathExpr: $(NAMESPACE)/$(POD_NAME)
volumes:
- name: zkp-config
configMap:
name: zookeeper
- name: zookeeper-data
nfs:
server: 10.0.0.2
path: /data/nfs-volume/zookeeper/
---
apiVersion: v1
kind: Service
metadata:
name: zookeeper
namespace: efk
spec:
ports:
- name: client
port: 2181
protocol: TCP
targetPort: 2181
- name: server
port: 2888
protocol: TCP
targetPort: 2888
- name: leade
port: 3888
protocol: TCP
targetPort: 3888
selector:
name: zookeeper
clusterIP: None
EOF
# 登陆pod验证
zkServer.sh status
Mode: xxx
k8s-kafka
cat >Dockerfile<<\EOF
FROM infra/jre8:8u271
ARG KAFKA_USER=kafka
ADD kafka_2.13-2.6.0.tgz /opt
RUN ln -ns /opt/kafka_2.13-2.6.0/ /opt/kafka \
&& useradd $KAFKA_USER \
&& [ `id -u $KAFKA_USER` -eq 1000 ] \
&& [ `id -g $KAFKA_USER` -eq 1000 ] \
&& chown -R $KAFKA_USER:$KAFKA_USER /opt/kafka*
USER kafka
WORKDIR /opt/kafka
EOF
docker build . -t infra/kafka:2.6.0
mkdir /data/nfs-volume/kafka
# 参考 https://github.com/cuishuaigit/k8s-kafka/blob/master/kafka.yaml
# 更改 replicas: 进行集群
cat >kafka.yaml<<\EOF
apiVersion: v1
kind: Service
metadata:
name: kafka
namespace: efk
spec:
ports:
- name: kafka
port: 9092
targetPort: 9092
clusterIP: None
selector:
name: kafka
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
labels:
name: kafka
name: kafka
namespace: efk
spec:
replicas: 1
serviceName: kafka
revisionHistoryLimit: 10
selector:
matchLabels:
name: kafka
template:
metadata:
labels:
name: kafka
spec:
# nodeSelector:
# elasticsearch: "true" ## 指定部署在哪个节点。需根据环境来修改
securityContext:
runAsUser: 1000
containers:
- name: kafka
image: infra/kafka:2.6.0
resources:
requests:
memory: 800Mi
cpu: 500m
ports:
- containerPort: 9092
protocol: TCP
command:
- sh
- -c
- "exec bin/kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \
--override listeners=PLAINTEXT://:9092 \
--override zookeeper.connect=zookeeper.efk.svc.cluster.local \
--override log.dir=/var/lib/kafka \
--override auto.create.topics.enable=true \
--override auto.leader.rebalance.enable=true \
--override delete.topic.enable=true \
--override log.retention.hours=72 \
--override socket.request.max.bytes=104857600 \
--override socket.send.buffer.bytes=102400 \
--override unclean.leader.election.enable=true \
--override zookeeper.session.timeout.ms=6000 \
--override max.request.size=5242880 \
--override message.max.bytes=6291456 \
--override fetch.max.bytes=7340032"
env:
- name: KAFKA_HEAP_OPTS
value : "-Xmx512M -Xms512M"
- name: KAFKA_OPTS
value: "-Dlogging.level=INFO"
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
readinessProbe:
tcpSocket:
port: 9092
timeoutSeconds: 1
initialDelaySeconds: 5
volumeMounts:
- name: kafka-data
mountPath: /var/lib/kafka
subPathExpr: $(NAMESPACE)/$(POD_NAME)
volumes:
- name: kafka-data
nfs:
server: 10.0.0.2
path: /data/nfs-volume/kafka/
EOF
log.retention.hours
优化参考: https://zhuanlan.zhihu.com/p/137720038
num.partitions=3
# 更具节点数量来看
#指定创建topic的默认分区数量,该值默认为1,建议根据具体的情况进行设定,越多的分区对于海量数据来说可以提高吞吐,但是对于>少量数据来说,也可能增加网络消耗
#注意:分区数一旦指定,只能增加,不能减少
# 自动创建topics,例如,当收到消息时,topics未创建,配置elk,不然filebeat无法自动创建topice
auto.create.topics.enable
# 是否开启leader自动平衡
auto.leader.rebalance.enable
# 对应影响的其他两个参数
# leader.imbalance.per.broker.percentage : 每个broker允许leader不平衡比例(如果每个broker上超过了这个值,controller将会>执行分区再平衡),默认值10.
# leader.imbalance.check.interval.seconds: 主分区再平衡的频率,默认值为300s
# 是否允许删除topic,通过管理工具删除topic仅为标记删除,建议设由集群管理员定期统一的进行删除和管理
delete.topic.enable
# 默认副本因子,适用于自动创建的主题的默认复制因子,默认1,建议2
default.replication.factor
min.insync.replicas=2
# 为false,就只从ISR中获取leader保证了数据的可靠性,,true则从replica中获取,则可用性增强
unclean.leader.election.enable=false
登陆pod验证
# 创建主题
bin/kafka-topics.sh \
--zookeeper zookeeper.efk.svc.cluster.local:2181 \
--create \
--topic test1 \
--partitions 2 \
--replication-factor 1
# 展示主题
bin/kafka-topics.sh \
--zookeeper zookeeper.efk.svc.cluster.local:2181 \
--list
# 查看主题详情
bin/kafka-topics.sh \
--zookeeper zookeeper.efk.svc.cluster.local:2181 \
--describe --topic test1
#--topic 指定所创建主题名称
#--partitions 指定分区个数
#--replication-factor 指定分区副本因子
k8s-kafkamanager
# docker pull kafkamanager/kafka-manager:3.0.0.4
cat >kafkamanager.yaml<<EOF
apiVersion: v1
kind: Service
metadata:
namespace: efk
name: kafka-manager
labels:
app: kafka-manager
spec:
type: ClusterIP
ports:
- name: kafka-manager
port: 9000
targetPort: 9000
selector:
app: kafka-manager
---
apiVersion: apps/v1
kind: Deployment
metadata:
namespace: efk
name: kafka-manager
labels:
app: kafka-manager
spec:
replicas: 1
selector:
matchLabels:
app: kafka-manager
template:
metadata:
labels:
app: kafka-manager
spec:
containers:
- name: kafka-manager
image: kafkamanager/kafka-manager:3.0.0.4
imagePullPolicy: IfNotPresent
ports:
- name: kafka-manager
containerPort: 9000
protocol: TCP
env:
- name: ZK_HOSTS
value: "zookeeper.efk.svc.cluster.local:2181"
# readinessProbe:
# httpGet:
# path: /api/health
# port: kafka-manager
resources:
limits:
cpu: 500m
memory: 512Mi
requests:
cpu: 250m
memory: 256Mi
EOF
验证
# 访问 主机:30900
# Add Cluster
Cluster Zookeeper Hosts: zookeeper.efk.svc.cluster.local
# 也可以添加ingress
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: kafka
namespace: efk
annotations:
nginx.ingress.kubernetes.io/backend-protocol: "HTTP"
spec:
rules:
- host: kafka.zs.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: kafka-manager
port:
number: 9000