安装与配置
- linux系统,通过虚拟机安装centos或者win10 系统自己挂载的ubuntu系统都可以。
- 首先需要安装java环境,同时配置环境变量,步骤如下:
- 官网下载jdk:https://www.oracle.com/technetwork/java/javase/dowloads/jdk12-dowloads-5295953.html
- 选择Linux的jdk,版本为12.0.2.
- 将其解压缩放到centos7的虚拟机上/root。
- 需要配置环境变量,在/etc/profile文件中配置变量,如下:
export JAVA_HOME=/opt/java/jdk-12.0.1
export JRE_HOME=${JAVA_HOME}/jre
# export MAVEN_HOME=/opt/maven/apache-maven-3.5.0
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
# export PATH=${JAVA_HOME}/Bin:${MAVEN_HOME}/bin:$PATH
export PATH=${JAVA_HOME}/bin:$PATH
- java -version 查看版本
zookeepe 安装
- zookeeper是安装kafka的必要组件,kafka是通过zookeeper來实施对元数据信息的管理,包括集群、主题、分区等内容。
- 同样在官网下载安装包指定目录解压,步骤如下:
- 官网:https://zookeeper.apache.org/
下载并安装 - 修改zookeeper的配置参数,首先进入安装目录conf目录,并将zoo_sample.cfg文件修改为zoo.cfg,并对核心参数进行配置。
- zk服务器的心跳时间
tickTime=2000
- 投票选举新Leader的初始化时间
initLimit=10
syncLimit=5
dataDir=temp/zookeeper/data
dataLogDir=temp/zookeeper/log
clientPort=2181
- 启动zookeeper命令,
bin/zkServer.sh start
ps -ef | grep zookeeper
查看是否启动
jps -l
kafka安装与部署
- 官网下载并安装解压缩:https://kafka.apache.org/download
- 以2.3.0版本为例
- 启动命令
bin/kafka-server-start.sh config/server.properties
- server.properties 配置中需要关注以下几个参数:
broker.id=0 # 表示broker的编号,如果集群中有多个broker,则每个broker的编号需要设置的不同
listeners=PLAINTEXT://:9092 ##brokder对外提供的服务入口地址
log.dirs=/tmp/kafka/log #设置存放消息日志文件的地址
zookeeper.connect=localhost:2181
-
Kafka所需zookeeper的集群地址,教学中zookeeper和kafka都安装本机
Kafka测试消息生产与消费
- 首先创建一个主题
bin/kafka-topic.sh --zookeeper localhost:2181 --create --topic sword --partitions 2 --replication-factor1
--zookeeper,指定了所要创建的Zookeeper服务地址
--topic:指定了所要创建主题的名称
--partitions: 指定了分区个数
--replication-factor:指定了副本因子
--create创建主题的动作命令 - 展示所有主题
bin/kafka-topic.sh --zookeeper localhost:2181 --list
- 查看主题详情
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic mia
- 启动消费端接收消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mia
--bootstrap-server 指定了连接kafka集群的地址
--topic指定了消费端订阅的主题 - 生产者发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mia
通过java程序来进行Kafka收发消息的测试
-
java的第一个程序 通过Kafka自身提供的java客户端来测试消息的收发,与Kafka的java客户端相关的Maven依赖如下:
消费者详解
- 消费者和消费组
-
Kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息,假设有一个T1主题,该主题有4个分区,同时我们有一个消费组G1,这个消费组只有一个消费者C1,那么消费者C1将会收到这4服务分区的消息,如下:
主题
- 创建主题
bin/kafka-topics.sh-zookeeper localhost:2181 --create --topic sword --partitions 2 --replication-factor 1 localhost:2181
--replication-factor 用于设置主题副体数,每个副本分布在不通节点,不能超过总结点数。 - topic元数据信息保存在zookeeper节点中,
bin/zkCli.sh -server localhost:2181
get /brokers/topics/sword
- 修改主题
bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic sword --config flush.messages=1
- 删除主题
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic sword --dele-config flush.messages=1
- 增加分区
bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic sword --partitions 1
分区
- Kafka可以将主题划分为多个分区,会根据分区规则选择把消息存储到哪个分区,只要如果分区规则设置的合理,那么所有的消息将会被均匀的分布到不同的分区,这样就实现了负载均衡和水平扩展。另外,多个订阅者可以从一个或者多个分区中同时消费数据,以支撑数据处理能力。简单说一句,由于消息是以追加到分区中的,多个分区顺序写磁盘的总效率要比随机写内存还要高(引用apache-A High Throughput Distributed Messaging System的观点),是Kafka高吞吐量的重要保证之一。
- 副本机制
- 由于Producer和Consumer
都只会与Leader角色的分区副本相连,所以Kafka需要以集群的组织形式提供主题下的消息可用。Kafka支持主备复制,所以消息具备高可用和持久性。 - 一个分区有多个副本,这些副本 保存在不同的broker上,每个分区的副本中都会有一个作为Leader,当一个broker失败时,Leader在这台broker上的分区都会变得不可用,Kafka会自动移除Leader,再其他副本中选一个作为新的Leader。
-
通常增加分区可以提高Kafka集群的吞吐量,然后也应注意总分区数或是单台服务器上的分区数过多,会增加不可用机延迟的风险。
- 由于Producer和Consumer
- 分区Leader选举
- 可以预见的是,如果某个分区的Leader挂了,那么其他跟随者将会进行选举产生一个新Leader,之后所有的读写就会转移到这个新的Leader上,在Kafka中,其不是采用常见的多数选举的方式进行副本的Leader选举,而是会在Zookeeper上针对每个topic维护一个成为ISR(in-sync replica,已同步的副本)的集合,显然还有一些副本没有来得及同步,只有这个ISR列表里面的才有资格成为Leader(先试用ISR里面的第一个,如果不行依次类推,因为ISR里面的是同步副本,消息是最完整且各个节点都是一样的)
- 分区重新分配 我们在已经部署好的Kafka集群里面添加机器是正常不过的需求,而且添加起来非常地方便,我们需要做的是从已经部署好的Kafkai节点中复制相应的配置文件,然后将里面的broker id修改为全局唯一的,最后启动这个节点即可将它加入到现有的Kafka集群中。
- AR(Assigned Replicas) 分区中所有的副本统称为AR
- ISR(In-Sync Replicas) 所有与Leader部分保持一定程度的副(包括Leader副本在内)本组成ISR
- OSR(Out-of-Sync Replicas) 与Leader副本同步之后过多的副本
- 具体步骤:
- 1.我们创建一个有三个节点的集群
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic sword-par --partitions 3 --replication-factor 3
- 复制了三个目录
cp -rf kafka_2.12-2.2.1 ./kafka-01
cp -rf kafka_2.12-2.2.1 ./kafka-02
cp -rf kafka_2.12-2.2.1 ./kafka-01
- 修改一下配置
cd kafka-01
vi config/Server.properties
- conf里修改broker-id必须是全局唯一的。
- zookeeper.connect=localhost:2181
- port 三个节点不同
- log.dirs=/opt/kafka/kafka-2/logs
bin/kafka-server-start.sh config/server.properties
bin/kafka-topic.sh --describe --zookeeper localhost:2181 --topic sword-par
查看分区信息 - 分区只能增加,不能减少
bin/kafka-topic.sh --alter --zookeeper localhost:2181 --topic sword-par --partitions 4
- 3.再添加一个broker节点
cp -rf kafka-01 ./kafka-04
新参加的主题不参与主题分配 - 4.重新分配 我们需要将原来分布在broker 1-3节点的分区重新分布到broker1-4节点上,借助kafka-reassign-partitions.sh工具生成reassign plan,不过我们先按照要求定一个文件,里面有说明哪些topic需要重新分区,文件内容如下