kafka官网上介绍kafka是一个分布式流处理平台。
那什么是流处理平台呢,流处理平台有以下三种特性:
- 可以让你发布和订阅流式的记录。这一方面与消息队列或者企业消息系统类似。
- 可以储存流式的记录,并且有较好的容错性。
- 可以在流式记录产生时就进行处理。
第一个特性很好理解,我们可以用kafka去发消息和接受消息,做一个广播,这个很多工具都可以做到,redis也支持,自己实现也可以,但是kafka强大在他的高可用高性能和可靠性。
第二点,kafka他自己有个参数,log.retention.hours,日志删除的时间阈值(小时为单位),默认是168小时,也就是七天,这七天内的消息,你都可以重新消费到,也可以确定从何处开始消费。
第三点,kafka利用Kafka Streams,我们可以对kafka消息流进行处理,比如有一些要对消息进行特殊格式化或者过滤的场景,利用kafka的库类可以轻松实现。go也有goka这个包支持流式操作。
而分布式,Kafka作为一个集群,运行在一台或者多台服务器上.
Kafka适合什么样的场景?
它可以用于两大类别的应用:
- 构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。 (相当于message queue)
- 构建实时流式应用程序,对这些流数据进行转换或者影响。 (就是流处理,通过kafka stream topic和topic之间内部进行变化)
关于数据
- Kafka 通过 topic 对存储的流数据进行分类。
- 每条记录中包含一个key,一个value和一个timestamp(时间戳)。
所以说起来kafka是一个时序数据库,作为一个时序数据库,则存在时序数据的优化方案。
kafka概念
Topics
Topic 就是数据主题,是数据记录发布的地方,可以用来区分业务系统。Kafka中的Topics总是多订阅者模式,一个topic可以拥有一个或者多个消费者来订阅它的数据。
然后在kafka内部中,每个topic写成多个分区(partition),多个分区的内容都是同一个topic。
每个分区都是有序且顺序不可变的记录集,并且不断地追加到结构化的commit log文件。分区中的每一个记录都会分配一个id号来表示顺序,为offset,offset用来唯一的标识分区中每一条记录。
Kafka 集群保留所有发布的记录,无论他们是否已被消费,并通过一个可配置的参数—log.retention.hours来控制保存时长。 举个例子, 如果保留策略设置为2天,一条记录发布后2天内,可以随时被消费,2天过后这条记录会被抛弃并释放磁盘空间。Kafka的性能和数据大小无关,所以长时间存储数据没有什么问题(如果磁盘允许的话)。
在golang的sarama-cluster包中,我们可以设定config对象来确定从最新消费或者是把保存的消息全部消费。
import "github.com/bsm/sarama-cluster"
。。。
config := cluster.NewConfig()
config.Consumer.Offsets.Initial = sarama.OffsetNewest
cg, err := cluster.NewConsumer(zookeeperStrings, group, topicString, config)
。。。
而kafka对于每个消费者,唯一保存的数据只有一个,消费者消费到哪里了,也就是消费者的offset。从下面这种图可以看出每个消费者之间是不会相互影响的。
分布式
日志的分区partition (分布)在Kafka集群的服务器上。每个服务器在处理数据和请求时,共享这些分区。每一个分区都会在已配置的服务器上进行备份,确保容错性.
每个分区都有一台 server 作为 “leader”,零台或者多台server作为 follwers 。leader server 处理一切对 partition (分区)的读写请求,而follwers只需被动的同步leader上的数据。当leader宕机了,followers 中的一台服务器会自动成为新的 leader。每台 server 都会成为某些分区的 leader 和某些分区的 follower,因此集群的负载是平衡的。
kafka和zookeeper
说到了leader和follower,那他们的关系是什么确定的呢,是通过经常看到的选举模式吗?并不是,我们在部署kafka的时候,我们需要一个zookeeper,他们的主从关系是通过zookeeper去确定的,zookeeper 也保存着kafka集群的全部信息(所以我们在代码初始化kafka的时候,只需要填入zookeeper集群的ip和端口即可)
所有的Kafka Broker节点一起去Zookeeper上注册一个临时节点,因为只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为leader,其他的Kafka broker成为了 follower。
生产者
生产者可以将数据发布到所选择的topic(主题)中。生产者负责将记录分配到topic的哪一个 partition(分区)中。可以使用循环的方式来简单地实现负载均衡,也可以根据某些语义分区函数(例如:hash记录中的key%节点数)来完成。
消费者
消费者使用一个消费组名称来进行标识,发布到topic中的每条记录被分配给订阅消费组中的一个消费者实例。消费者实例可以分布在多个进程中或者多个机器上。
如果所有的消费者实例在同一消费组中,消息记录会负载平衡到每一个消费者实例,每个消费组只收到一次消息。
如果所有的消费者实例在不同的消费组中,每条消息记录会广播到所有的消费者进程。
在golang的sarama-cluster包中,我们可以设定group来确定消费组。
import "github.com/bsm/sarama-cluster"
。。。
config := cluster.NewConfig()
config.ClientID = group
cg, err := cluster.NewConsumer(zookeeperStrings, group, topicString, config)
。。。
kafka优势
传统的消息系统有两个模块: 队列和发布-订阅
在队列中,消息被消费就没有了,我们经常用redis去实现一些异步操作,这种的就算是队列消息。
在发布订阅中,依然以redis为例,但是redis并不能针对消息去做操作,只能广播(虽然新版本的redis支持了)。
kafka通过消息组,可以多用户广播,也可以对消息进行处理。
分区保证了消息的消费更加平衡,但是消费者组中的消费者实例个数不能超过分区的数量。
在我们的使用经验来讲,kafka做得最多的就是用户行为跟踪广播,虽然也可以做监控,但是我们监控是用prometheus去实现的,更加专业和舒服。有兴趣的同学也可以去了解一下。