Kafka是由美国的职业网站LinkedIn创造,作为一个社交企业,LinkedIn 有非常多的IT 系统而且日活量非常大, 由此产生的数据被许多系统产生及使用。刚开始,LinkedIn使用ActiveMQ作为数据通道去处理这些数据,但是后来发现经常会出现阻塞及服务不可用,然后LinkedIn就开发了kafka。由于Kafka解决的是生产环境中上下游系统的耦合问题,所以kafka不仅是一个消息中间件,还是一个数据引擎,或者分布式实时流处理平台。
使用场景
1. 消息传递
消息传递就是发送消息,可以把kafka作为一个MQ(ActiveMQ、RabbitMQ...),实现异步、解耦、削峰,而且吞吐量比这些消息中间件更大。
2. 网站活动追踪
可以把用户活动信息,比如登录、点击、浏览等各种行为进行监控、追踪,然后分析并发送到下游系统,给用户提供更加精确的内容推荐。
3. 日志聚合
Kafka可以实现日志聚合,这样就不用把日志记录到磁盘或者数据库,实现分布式的日志聚合。
4. 应用指标监控
还可以用来做运维系统监控。比如监控交易系统的订单信息,用户的年龄分布,的确分布,购买偏好等。或者监控应用服务器的内存、CPU、磁盘、网络等的使用情况,并进行紧急情况提醒。
5. 数据集成+流计算
kafka 内置的kafka Streams 可以更加方便的进行数据流的处理,把数据导入的离线数据库比如Hadoop、Hbased等,实现数据分析。所以Kafka不仅仅是一个MQ中间件,还是一个流处理平台。在kafka中,消息就是日志。日志就是消息的数据文件。
简单使用
首先需要搭建一套Kafka的环境,在这就不过多介绍了,网上一大堆教程。但是要说明一点,kafka需要ZK的服务,Zookeeper做了什么?因为Zookeeper的有序节点、临时节点和监听机制,所以ZK帮kafka做了这些事情:配置中心(管理Broker、Topic、Partition、Consumer的信息,包括元数据的变动)、负载均衡、集群管理和选举、分布式锁等。
1. Jave Basic Demo。
导入Kafka的Maven依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
创建一个Counsumer类,先初始化Consumer需要的配置,然后创建一个KafkaConsumer对象并订阅MyTopic
主题, 然后隔一段时间从Kafka中拉取消息。
public class Consumer {
public static void main(String[] args) {
Properties props = new Properties();
// 绑定KafkaServer
props.put("bootstrap.servers","192.168.1.91:9092");
// 绑定一个Consumer Group
props.put("group.id","yq-test-group");
// 消费Messgae之后自动提交到Kafka
props.put("enable.auto.commit","true");
// 消费Messgae之后自动提交的时间间隔
props.put("auto.commit.interval.ms","1000");
// 新来的消费者从哪里开始消费,值有:earlist / lastest / none
props.put("auto.offset.reset","earliest");
// key 和 value的序列化工具
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// 创建一个Consumer对象
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Arrays.asList("MyTopic"));
try {
while (true){
// 从服务器隔一段时间去拉取最新消息
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String,String> record : records) {
System.out.printf("offset = %d, key = %s, value= %s, partition = %s\n",
record.offset(),record.key(),record.value(),record.partition());
}
}
}finally {
// 关闭消费者
consumer.close();
}
}
}
然后创建一个Producer类,初始化producer配置,并创建一个KafkaProducer对象,并发送MyTopic主题的消息.
public class Producer {
public static void main(String[] args) {
Properties props = new Properties();
// 绑定服务器
props.put("bootstrap.servers","192.168.1.91:9092");
// 重发次数
props.put("ack","1");
// 批量发送的大小
props.put("batch.size",16384);
// 发送
props.put("group.id","yq-test-group");
props.put("buffer.memory",33554432); props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i <= 100; i++) {
producer.send(new ProducerRecord<String, String>("MyTopic",Integer.toString(i),Integer.toString(i)));
}
producer.close();
}
}
先运行Consumer类,Consumer开始监听服务器,然后运行Producer类,然后在Consumer的Console就可以看到类似下面的log:
offset = 592, key = 88, value= 88, partition = 0
offset = 592, key = 88, value= 88, partition = 0
offset = 593, key = 89, value= 89, partition = 0
offset = 594, key = 90, value= 90, partition = 0
offset = 595, key = 91, value= 91, partition = 0
offset = 596, key = 92, value= 92, partition = 0
offset = 597, key = 93, value= 93, partition = 0
offset = 598, key = 94, value= 94, partition = 0
offset = 599, key = 95, value= 95, partition = 0
offset = 600, key = 96, value= 96, partition = 0
offset = 601, key = 97, value= 97, partition = 0
offset = 602, key = 98, value= 98, partition = 0
offset = 603, key = 99, value= 99, partition = 0
2. SpringBoot Demo
创建一个SpringBoot Project,同上先导入Maven依赖,然后再application.properties中添加如下配置
server.port=7271
spring.kafka.bootstrap-servers=192.168.1.91:9092
spring.kafka.group.id=yq-test-group
spring.kafka.enable.auto.commit=true
spring.kafka.auto.commit.interval.ms=1000
spring.kafka.auto.offset.reset=earliest
spring.kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.ack=1
spring.kafka.retries=1
spring.kafka.batch.size=16384
spring.kafka.linger.ms=5
spring.kafka.buffer.memory=33554432
spring.kafka.max.block.ms=3000
spring.kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
Consumer类:
@Component
public class Consumer {
@KafkaListener(topics = "SpringBootTopics",groupId = "SpringBootTopic-group")
public void onMessage(String msg){
System.out.println("------ 收到消息 : " + msg);
}
}
Producer类
@RestController
public class Producer {
@Autowired
private KafkaTemplate template;
@GetMapping("/send")
public String sent(@RequestParam("msg") String msg) {
template.send("SpringBootTopics", msg);
return "Ok";
}
}
再浏览器打开如下链接:http://localhost:7271/send?msg=12313,在编辑器控制台就可以看到如下log:
2021-03-02 21:24:44.378 INFO 62448 --- [nio-7271-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2021-03-02 21:24:44.378 INFO 62448 --- [nio-7271-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1614691484378
2021-03-02 21:24:44.386 INFO 62448 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: i815xO16TgeNlXHhQPfEzA
------ 收到消息 : 12313
上面就是Kafka的简单使用。
Kafka架构
这个经典的图是kafka的data flow,下面就来分析一下这个图中的信息。
- 有2个生产者,Producer0生产主题为
Topic0
的消息 ,Producer1生产Topic0
和Topic1
两种主题的消息。 - 有3个
Broker
,可以认为是有三台服务器。 -
Topic0
有2个partition,3个replica(副本),Topic1
有一个partition,也有3个replica。红色字体代表是该分区的leader节点,剩下两个代表该主题的follower节点。生产者向leader节点中写入数据,follower节点向leader节点同步最新数据。 - 有2个Consumer Group,
Consumer group0
中的Consumer 0
消费Topic0
;Consumer group1
中的Consumer 0
消费Topic0
和Topic1
,Consumer 1
消费Topic0
,Consumer 2
什么也不消费。
1. Producer
Producer就是生产者,就是发送消息的一方。Kafka发消息不是一条一条发送的,是批量发送的,这样就能够提高发送效率。
由这个字段决定:
props.put("batch.size",16384);
2. Consumer
Consumer就是消费者,就是消费消息的一方。Kafka使用pull来获取消息,这样消费者就可以控制消费的速率,就不会出现消息太多消费不了的情况。
3. Broker
Broker就是kafka的服务。生产者和消费者都需要连接broker才能实现消息的转发。
4. Message
客户端之间传输的数据就叫做message,也叫做record。生产者中为ProducerRecord,消费者中为ConsumerRecord。
消息在传输过程中需要进行序列化,可以使用kafka内置的几种序列化工具,不满足需求也可以创建自定义的序列化工具。
5. Topic
生产者和消费者需要一个队列才能关联起来,生产者要把消息发送到某个主题的队列中,消费者接受消息,也要接收这个队列的消息。这个队列在kafka中叫做Topic。一个生产者可以生产多种不同topic的消息,一个消费这个也可以消费不同topic的消息。
如果要删除一个topic,就要把auto.create.topics.enable 设置为false,默认为true。
6. Partition
如果一个topic中消息过多,就会产生下面的问题:
并发或者负载的问题,如果客户端的操作都指向一个topic,在高并发情况下性能就会下降。
还有不方便横向扩展,如果想把数据分散到不同的机器上做集群,而不是升级硬件,这个topic就无法在物理上拆分到各个机器上。
为了解决这些问题kafka就提出了Partition的概念,其实就是把Topic分割为不同的分区(分片思想,类似数据库中的分库分表)。
举个例子, Topic有3个Partition,发送了9条消息,第一个分区存储1、4、7,第二个分区存储2、5、8,第三个分区存储3、6、9。这样就实现了负载均衡。
在服务器中的/tmp/kafka-logs/下面就可以看到每个Partition的数据,而且kafka的数据文件是顺序追加写入的,这也是为啥kafka吞吐量很大的一个原因。
7. Replica机制
如果Partition的数据只有一份,如果这个broker 挂掉了,就会出现服务不可用,甚至数据丢失不可恢复。这个副本机制就是为了提高kafka的容错率而设计的。
8. Segment
kafka的消息是放在.log文件中,如果一个Partition只有一个log文件,如果写入很多数据后,检索效率也会变得很差,因此这时候kafka就会重新拆分Partition文件,切分处理后的每一个.log文件就是一个segment。segment的大小可以由log.segment.bytes
控制。
比如这个图中就有2个segment, 每一个segment都有2个索引文件和一个数据文件。
xxx.index:记录的Consumer当前读取消息的偏移量
xxx.log: 数据文件
xxx.timeindex : 记录的是数据创建或者写入的时间
9. Consumer group
Consumer group是为了确定这个组内的消费者是不是消费同一个topci然后引入的。消费同一topic的消费者不一定在同一个组。只有group id相同才是同一个消费者组。
注意:同一group的消费者不能同时消费相同的Partition,这样会造成重复消费的问题,这也是为什么上面那个data flow图中 Consumer 3 没有消费任何topic的原因,因为只有2个Partition,但是有3个Consumer,而最多能让2个消费者去消费。
10. Consumer offset
因为kafka是顺序写入的,如果Consumer挂掉了,后来起来后我么如何知道它上一次消费到哪里了呢? kafka对消息者进行了编号,而且存储了每个消费者消费每个partition的offset, 这个offset就存储在/tmp/kafka-logs/下的类似这样__consumer_offsets-46的folder中。总共有50个,会根据cousumer id的hash code 与50取模,然后放到相应的Consumer 索引文件中,这样即使服务挂掉,只要这个文件不丢失,依然可以确定上次消费到那个offset。
进阶功能
1. 消息幂等性
如果生产者无法确定消费者有没有收到或者消费消息,如果要重发消息,这就可能造成消息重复消费的情况。消息重复消费要在消费者端去处理,kafka干脆在broker端实现了消息的重复问题处理,这样就大大减轻了消费者的工作。
在Producer端设置enable.idempotence 为true后,这个producer就是幂等性的producer,kafka会自动去重。主要是通过这两个值去确定唯一消息。
1. PID(Producer ID, 幂等性的每个客户端都有一个唯一的ID
2. Sequence Number, 幂等性的每个生产者发送的消息都有一个唯一的Seq number
Broker 就是根据上面这两个值来确保数据是否重复。但是这也只能保证在单partition上幂等性或者单一回话上的幂等性,即重启producer后的幂等性就不能保证了。如果想要保证全局数据的幂等性,就要用到kafka的事物机制了。
2. 生产者事务处理
通过Kafka的事物机制就可以保证全局消息的幂等性。在下面这几种情况就需要使用事务来处理了。
- 只有1个broker, 1个topic 1个副本,发送一组消息,要么全部失败,要么全部成功。
- 如果发送到多个partition或者多个topic,它们可能分布在不同的服务器上,需要它们要么全部失败,要么全部成功。
- 消费者后生产者在同一块代码中(Consume-Process-Produce),即先消费然后再作为生产者发送给下游系统,就需要保证接受消息和发送消息全部成功。
使用方式:
// 开启事务一定要设置transaction.id
props.put("transactional.id", UUID.randomUUID().toString());
Producer<String,String> producer = new KafkaProducer<String,String>(props);
// 初始化事务
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<String,String>("transaction-test","1","1"));
producer.send(new ProducerRecord<String,String>("transaction-test","2","2"));
// Integer i = 1/0;
producer.send(new ProducerRecord<String,String>("transaction-test","3","3"));
// 提交事务
producer.commitTransaction();
} catch (KafkaException e) {
// 中止事务
producer.abortTransaction();
}
因为消息可能跨服务器,所以这里的事务就是分布式事务。采用的是2PC(2阶段提交)。大家都可以commit才commit,否则就abort。kafka会场景一个Coordinator,并把事务的日志用topic_transaction_state记录下来,就跟消费的offset一样。这样生产者挂了之后下次重启后还能根据这个offset继续上次的事务。
总结
kafka作为一个流处理平台,有许多优点。
- 高吞吐、低延迟:kakfa的优点就是收发消息特别快,每秒可以处理几十万条消息。
- 高伸缩性:通过分区partition的机制,不同的分区可以处在不同的broker中。然后通过ZK去管理,实现负载。
- 持久性、可靠性:通过把消息持久化在磁盘中,并在不同broker中的replica机制,保证数据可靠性。
- 容错性:集群中的某个节点挂掉,整体服务也可用。
- 高并发:同时支持数千个客户端同时读写。