Kakfa介绍
Kafka是什么
Kafka最初是LinkedIn的内部内部基础设施系统。它被认为是一个流平台,在Kafka上可以发布和订阅流数据,并把它们保存起来、进行处理。但是我们在使用Kafka中,最多的就是将它作为一个消息系统使用,类似于ActiveMQ、RabbitMQ等。但是Kafka与这些传统的消息系统又有着许多的不同点,这些差异使它又不同于消息系统。
- Kafka是一个分布式系统,以集群(支持自由伸缩)的方式运行。(所以我们总称为分布式消息队列)
- Kafka可以用来存储数据,数据存储的时间长短由你自己定义(以容错持久化的方式存储)。并且只要数据还存储在Kafka中,你可以重复读取。
- 流式处理将数据处理的层次提升到了新高度。
而传统的消息系统,只会用来传递消息。
Kafka也可以看成是实时版的Hadoop(这也是设计Kafka的初衷之一)。Hadoop可以存储和定期处理大量的数据文件,而Kafka而可以存储和持续型的处理大型的数据流。Hadoop主要应用于数据分析上,而Kafka因其低延迟的特点更合适应用于核心业务上,业务事件发生时,Kafka能够针对这些事件及时做出相应。同时kafka也可以和ETL进行比较,因为它们擅长移动数据。
Kafka属于消息系统中的发布-订阅模式消息系统。消息发送者不会将消息直接发送到消息接受者里,而是将消息首先进行分类(topic),然后将消息发布到消息系统中。消息接受者选择需要订阅的消息类型(topic),然后就能够从消息系统中接收所订阅的消息了。
Kafka中的消息和批次
Kafka中的数据单元称为消息,消息可以看成关系型数据库中的“数据行”或“记录”。消息是由键值对组成,其中键称之为元数据,是可选的。消息中的键值对是由字节数组组成,所以消息里的数据没有特别格式或含义(schema)。键主要用来分区写,比如通过键生成一个一致性散列值,然后使用散列值对分区取模,为消息选取分区,保证了相同类型键的消息都写入到了相同分区内。
为了提高消息写入效率,消息被分批次写入Kafka中。批次就是一组消息,这些消息属于同一topic下的同一分区。这样减少了网络开销,但是这需要在时间延迟和吞吐量之间作出平衡。批次的数据会被压缩,这样提升了数据的传输和存储能力,但同样做了更多的计算(这也是Kafka对CPU性能的要求点)。
Kafka中的主题和分区
Kafka的消息通过主题(topic)进行分类,主题类似关系型数据库中的表,或者文件系统中的文件夹。一个主题可以被分为若干个分区(partition),一个分区就是一个提交日志。消息以追加的方式写入分区,然后以先进先出的顺序读取。因为一个topic一般由多个partition组成,所以Kafka不能保证主题范围内的消息顺序,但是能够保证单个分区的消息顺序。如果要保证整个主题的有序性,就只能一个主题只有一个分区。Kafka通过分区来完成消息的冗余和伸缩性,分区可以分布在不同的服务器上,这样比单个服务器具有更高的性能。
每个分区都是一个有序、不可变的记录序列,新提交的记录会不断的追加到分区中。分区中的每条记录都会被分配一个连续的序列号叫做offset(偏移),用于唯一标识分区中的每个记录。
在一个可配置的保留周期内(保存时间或保存大小),Kafka集群会持久化所有发布的记录,无论这个记录是否被消费过。比如,我们将保存周期设置为2天,则记录在发布的两天内都可以重复被使用,当过了两天后,这条记录就会被丢弃以释放空间。Kafka的性能是与数据大小无关的常量,所以数据存储多长时间都没有问题。
为什么要进行日志分区
- 使得每个topic日志不会太大,以便单个服务能够保存。
- 每个分区能够单独发布和消费,为并发消费topic提供一种可能。(也是最重要的)
Kafka客户端
Kafka的客户端就是Kafka的系统用户,它们被分为两种基本类型:生产者和消费者。除了这些基础API之外,Kafka还提供了一些高级API,比如用于数据集成的Kafka Connect API,用于流式处理的Kafka Streams和用于管理Kafka的AdminClient。
- Producer API(http://kafka.apache.org/documentation.html#producerapi):用于应用程序将数据流发布到一个或多个topic上。
- Consumer API(http://kafka.apache.org/documentation.html#consumerapi):用于应用程序订阅一个或多个topic,并处理这些流记录。
- Streams API(http://kafka.apache.org/documentation/streams):用于流式处理,消费来自一个或多个topic的输入流,并生成一个输出流到一个或多个topic上,输入输出都是kafka。
- Connector API(http://kafka.apache.org/documentation.html#connect):用于Kafka topic与现有的应用程序或数据系统集成的API。
Kafka生产者
Kafka中的生产者是用于写入消息的,一般一个消息会被写入到一个指定的主题内。生产者默认会将消息均衡地分布到主题的所有分区上。但是我们可以通过消息键或者分区器来将消息分类,将同类数据写入到同一个分区内。
Kafka消费者
Kafka中的消费者是用于读取消息的,消费者会订阅一个或多个主题,并且按照消息的生成顺序读取它们。消费者会通过消息的“偏移量”来记录已经读取的位置,偏移量是一种元数据,它是一个不断自增的整数值。在消息写入到分区内时,Kafka会为该条消息生成所在分区内的唯一数值。消费者会把最后读取消息所在的偏移量保存到Zookeeper或Kafka中,如果消费者关闭或重启,则会重新读取该偏移量。
在Kafka中消费者是消费者群组的一部分,即一个群组可能有多个消费者共同读取一个主题。但是群组能够保证每个分区内的消息只能被消费者群组中的一个消费者消费。
消费者与分区之间的关系称为消费者对分区的所有权。当一个消费者挂掉后,同一群组的消费者可以接管失效消费者的工作。
消息的有序性
相比传统的消息系统,Kafka可以很好的保证有序性。
传统消息队列在服务器上保存有序消息,但是当多个Consumer消费队列中的数据时,由于消息被异步发送到不同的Consumer上,所以消息到达时可能已经失去了原来的顺序。通常这种情况如果需要强顺序读取,则只能有一个Consumer消费消息。这样也就失去了并发性。
Kafka由于使用了分区概念,可以在多个Consumer组并发的情况下提供较好的有序性和负载均衡。将每个分区只发给一个Consumer,这样一个分区就只被一个Consumer消费了,就可以顺序消费这个分区的消息了,由于一个topic有多个partition,所以可以使用多个Consumer消费,来实现负载均衡。但是Kafka只能保证一个分区的消息是有序的,如果需要topic所有消息都有序,那只能一个topic只有一个分区,也就只能有一个Consumer消费。
Kafka集群
在多台机器上分别部署Kafka,就会组成一个Kafka集群。每台机器运行的Kafka服务称为broker,broker用于接收生产者消息,为消息设置偏移量,并且将消息保存到磁盘中。broker还为消费者提供读取消息服务,向消费者返回已经提交到磁盘中的消息。单个broker可以轻松处理数千分区以及每秒百万级消息量(依赖于具体机器性能)。
在broker集群中,会选举出一个leader,作为集群控制器的角色。leader控制器负责管理工作,比如将分区分配给broker和监控broker。在broker集群中,一个分区隶属于一个broker,这个broker称为分区的leader。一个分区可以分配到多个broker上,而这些其它broker上的分区数据是分区leader的复制数据,当分区leader挂掉后,其它broker可以接管领导权,但是这时候相关的消费者和生产者会连接到新的分区leader上。这种分区复制的机制为kafka提供消息冗余,保证了kafka的容错性和负载均衡。
broker集群中的消息会持久化到磁盘上,这是kafka的一个重要特性。Kafka broker默认的消息保留策略有两种:保留到指定的时间和保留到消息到达一定的字节数。当达到上限时,就消息就会被删除。
Kafka集群搭建
对于Kafka集群来说,单个节点broker和多个节点的broker并没有任何区别。多broker节点只是在集群启动过程中,每个broker节点都需要启动。
Kafka安装包下载(2.0.0版本)
下载路径:
https://www.apache.org/dyn/closer.cgi?path=/kafka/2.0.0/kafka_2.11-2.0.0.tgz
tar -zxvf kafka_2.11-2.0.0.tgz
cd kafka_2.11-2.0.0
安装Zookeeper
Kafka是使用Zookeeper来保存集群元数据信息和消费者信息。虽然Kafka发行版已经自带了Zookeeper,可以通过脚本直接启动,但仍然建议安装一个完整版的Zookeeper。
Zookeeper部署:http://zookeeper.apache.org/doc/r3.3.3/zookeeperAdmin.html
注意:
1)、在部署Zookeeper时,应该使用Linux监督(supervision)。因为Zookeeper遇到任何失败情况,都会快速失败并且退出线程,查看:http://zookeeper.apache.org/doc/r3.3.3/zookeeperAdmin.html#sc_supervision。
2)、部署完Zookeeper应该配置一个cron来定时压缩zk的数据和日志,因为zk并不会做这些事。如果我们不设置cron,系统磁盘有可能会被zk打满。
https://www.cnblogs.com/fesh/p/3900253.htmlhttps://blog.csdn.net/qq_37716485/article/details/71786894
Kafka配置
Kafka的配置文件在${KAFKA_HOME}/config/server.properties目录,我们只需要简单进行配置下:
broker.id=1 #当前broker在集群中的唯一标识,类似zk中的myid
log.dir=/opt/yangjianzhang/kafka/log #消息日志输出目录
zookeeper.connect=192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2281 #使用的zk集群
分发安装并启动
将Kafka安装包分发到其它机器上:
scp -r kafka_2.11-2.0.0 root@192.168.0.1:/opt/yangjianzhang/kafka/
#启动集群中每台机器的Kafka服务
bin/kafka-server-start.sh -daemon config/server.properties #需要指定启动配置文件
创建topic
#创建test topic,只有一个分区、三个副本
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic test
#查看topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
test
查看创建的topic信息
[root@yjz01 kafka_2.11-2.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic demo
Topic:demo PartitionCount:1 ReplicationFactor:3 Configs:
Topic: demo Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
第一行输出是对所有分区的一个描述,然后每个分区会有一行输出。
leader:当前分区所在的leader节点,负责处理消息的读和写,leader是从所有分区所在broker中随机选择出来的。
replicas:列出了所有副本节点(包含了leader节点),无论该节点当前是否存活。
isr:分区副本所在节点,并且该节点正常运行服务。
当前分区leader是broker 3,我们kill 掉broker 3中的kafka服务,然后再看分区信息:
[root@yjz01 kafka_2.11-2.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic demo
Topic:demo PartitionCount:1 ReplicationFactor:3 Configs:
Topic: demo Partition: 0 Leader: 1 Replicas: 3,1,2 Isr: 1,2
leader重新进行了选举,并且当前服务节点isr中已经把3剔除。
使用kafka命令行发送和消费消息
Kafka附带了一个命令行客户端,允许读取文件或标准输入发送到Kafka集群中,默认情况下每行作为一条消息发送。
[root@yjz01 kafka_2.11-2.0.0]# bin/kafka-console-producer.sh -broker-list localhost:2181 --topic demo
>hello world
hello kafka
使用命令行consumer读数据:
bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic demo --from-beginning
>hello world
hello kafka
关注我
欢迎关注我的公众号,会定期推送优质技术文章,让我们一起进步、一起成长!
公众号搜索:data_tc
或直接扫码:🔽