消息中间件的定义
消息中间件是指利用高效可靠的消息传输机制进行平台无关的数据交流,并且基于数据通信来进行分布式系统的集成。
消息中间件能做什么
消息中间件主要解决的就是分布式系统之间消息传递的问题,它能够屏蔽各种平台以及协议之间的特性,实现应用程序之间的协同。
比如以下的场景
没有消息中间件场景一:
有消息中间件场景二:
通过上述的2个场景,很容易发现一个直观的问题,就是同步调用 到 异步调用 的过程的转变,这样使得我们的程序拥有了效率更高,解耦等等优点
常见的消息中间件
kafka、ActiveMQ、RabbitMQ、RocketMQ等等,那么这次的主角是 kafka
Kafka的定义
kafka是一款分布式消息发布和订阅系统,它的特点是高性能、高吞吐量。 最早设计的目的是作为LinkedIn的活动流和运营数据的处理管道。这些数据主要是用来对用户做用户画像分析以及服务器性能数据的一些监控,所以kafka一开始设计的目标就是作为一个分布式、高吞吐量的消息系统,所以适合运用在大数据传输场景。
Kafka的应用场景
由于kafka具有更好的吞吐量、内置分区、冗余及容错性的优点(kafka每秒可以处理几十万消息),让 kafka成为了一个很好的大规模消息处理应用的解决方案。所以在企业级应用上,主要会应用于如下几个方面:
行为跟踪:kafka可以用于跟踪用户浏览页面、搜索及其他行为。通过发布-订阅模式实时记录到对应的 topic中,通过后端大数据平台接入处理分析,并做更进一步的实时处理和监控
-
日志收集:日志收集方面,有很多比较优秀的产品,比如Apache Flume,很多公司使用kafka代理日志聚合。日志聚合表示从服务器上收集日志文件,然后放到一个集中的平台(文件服务器)进行处理。在实际应用开发中,我们应用程序的log都会输出到本地的磁盘上,排查问题的话通过linux命令来搞定, 如果应用程序组成了负载均衡集群,并且集群的机器有几十台以上,那么想通过日志快速定位到问题, 就是很麻烦的事情了。所以一般都会做一个日志统一收集平台管理log日志用来快速查询重要应用的问 题。所以很多公司的套路都是把应用日志集中到kafka上,然后分别导入到es和hdfs上,用来做实时检 索分析和离线统计数据备份等。而另一方面,kafka本身又提供了很好的api来集成日志并且做日志收集
Kafka的架构
一个典型的kafka集群包含若干Producer(可以是应用节点产生的消息,也可以是通过Flume收集日志产生的事件),若干个Broker(kafka支持水平扩展)、若干个Consumer Group,以及一个zookeeper集群(在最近的2.8版本中,使用Kafka内部的仲裁(Quorum)控制器来取代ZooKeeper)。kafka通过zookeeper管理集群配置及服务协同。Producer使用push模式将消息发布到broker,Consumer通过监听使用pull模式从broker订阅并消费消息。多个broker协同工作,Producer和Consumer部署在各个业务逻辑中。三者通过zookeeper管理协调请求和转发。这样就组成了一个高性能的分布式消息发布和订阅系统。
图上有一个细节是和其他mq中间件不同的点,producer 发送消息到broker的过程是push,而 consumer从broker消费消息的过程是pull,主动去拉数据。而不是broker把数据主动发送给consumer
kafka中的角色
Broker
kafka集群包含一个或多个服务器,这种服务器被称为broker。broker端不维护数据的消费状态,提升了性能。直接使用磁盘进行存储,线性读写,速度快:避免了数据在JVM内存和系统内存之间的复制(零拷贝的概念), 减少耗性能的创建对象和垃圾回收。
Producer
消息生产者,负责发布消息到kafka broker。
Consumer
消息消费者,从kafka broker读取消息的客户端,consumer从broker拉取(pull)数据并进行处理。
Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定 group name则属于默认的group)
Topic
每条发布到kafka集群的消息都有一个类别,这个类别被称为topic(主题)。topic是一个虚拟的概念,并不是真实存在的,可以认为是一个消息集合。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)每个topic可以有多个生产者向它发送消息,也可以有多个消费者去消费其中的消息。
Partition
分区,Parition是物理上的概念,每个topic包含一个或多个Partition.(可以说是topic的具体表现),同一topic下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka通过offset保证消息在分区内的顺序,offset的顺序不跨分区,即kafka只保证在同一个分区内的消息是有序的。
下图中,对于名字为test的topic,做了3个分区,分别是p0、p1、p2.
每一条消息发送到broker时,会根据partition的规则选择存储到哪一个partition。如果partition规则设置合理,那么所有的消息会均匀的分布在不同的partition中,这样就有点类似数据库的分库分表的概念,把数据做了分片处理。
Replication
- 副本,Replication是Kafka架构中一个比较重要的概念,是系统高可用的一种保障。Replication逻辑上是作用于Topic的,但实际上是体现在每一个Partition上。
offset
- 每个消息在被添加到分区时,都会被分配一个offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka通过offset保证消息在分区内的顺序,offset的顺序不跨分区,即kafka只保证在同一个分区内的消息是有序的
Topic & Partition
- topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得kafka的吞吐率可以线性提高,物理上把topic分成一个或多个partition,每个partition在物理上对应一个文件夹,该文件夹下存储这个partition的所有消息和索引文件。若创建topic1和topic2两个topic,且分别有13个和19个分区,则整个集群上会相应会生成共32个文件夹.
Partition是以文件的形式存储在文件系统中,比如创建一个名为firstTopic的topic,其中有3个partition,那么在kafka的数据目录(/tmp/kafka-log)中就有3个目录,firstTopic-0~3, 命名规则是 <topic_name>-<partition_id>
Kafka的参数配置
Producer端
batch.size
生产者发送多个消息到broker上的同一个分区时,为了减少网络请求带来的性能开销,通过批量的方式来提交消息,可以通过这个参数来控制批量提交的字节数大小,默认大小是16384byte,也就是16kb, 意味着当一批消息大小达到指定的batch.size的时候会统一发送.(中心思想是 容量上限提交概念)
linger.ms
Producer默认会把两次发送时间间隔内收集到的所有Requests进行一次聚合然后再发送,以此提高吞吐量,而linger.ms就是为每次发送到broker的请求增加一些delay,以此来聚合更多的Message请求。 这个有点想TCP里面的Nagle算法,在TCP协议的传输中,为了减少大量小数据包的发送,采用了Nagle 算法,也就是基于小包的等-停协议。(中心思想是 时间间隔提交概念)
batch.size和linger.ms这两个参数是kafka性能优化的关键参数,很多同学会发现batch.size和 linger.ms这两者的作用是一样的,如果两个都配置了,那么怎么工作的呢?实际上,当二者都配置的时候,只要满足其中一个要求,就会发送请求到broker上
Consumer端
group.id
consumer group是kafka提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费.如下图所示,分别有三个消费者,属于两个不同的group,那么对于firstTopic这个topic来说,这两个组的消费者都能同时消费这个topic中的消息,对于此时的架构来说,这个firstTopic就类似于ActiveMQ中的topic概念。如下图所示,如果3个消费者都属于同一个group,那么此事firstTopic就是一个Queue的概念
这边需要理解 每个分区只能由同一个消费组内的一个consumer来消费 这句话, 例如:firstTopic中有 p0,p1,p2 3个分区, 这边的消费group不变 那么对于group=1来说,很直接他消费3个分区,那么对于group=2来说,第一个Consumer会消费2个分区,Consumer只会消费1个分区,这是有规则的,后续会展开说,具体的例图如下:
enable.auto.commit
消费者消费消息以后自动提交,只有当消息提交以后,该消息才不会被再次接收到,还可以配合auto.commit.interval.ms
控制自动提交的频率。当然,我们也可以通过consumer.commitSync()
的方式实现手动提交
auto.offset.reset
这个参数是针对新的groupid中的消费者而言的,当有新groupid的消费者来消费指定的topic时,对于该参数的配置,会有不同的语义
auto.offset.reset=latest情况下,新的消费者将会从其他消费者最后消费的offset处开始消费topic下的消息
auto.offset.reset=earliest情况下,新的消费者会从该topic最早的消息开始消费
auto.offset.reset=none情况下,新的消费者加入以后,由于之前不存在offset,则会直接抛出异常。
max.poll.records
此设置限制每次调用poll返回的消息数,这样可以更容易的预测每次poll间隔要处理的最大值。通过调整此值,可以减少poll间隔