Kafka:
http://www.cnblogs.com/cyfonly/p/5954614.html
http://blog.csdn.net/u013256816/article/details/71091774
https://tech.meituan.com/kafka-fs-design-theory.html
1、内部存储结构
2、消息的写入与消费
消息写入:
producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)
producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition
写入流程:
1)producer 先从 zookeeper 的 "/brokers/.../state" 节点找到该 partition 的 leader
2)producer 将消息发送给该 leader
3)leader 将消息写入本地 log
4)followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
5)leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK
2、leader选举
选举算法的实现类似于微软的PacificA
思路:
最简单最直观的方案是,所有Follower都在ZooKeeper上设置一个Watch,一旦Leader宕机,其对应的ephemeral znode会自动删除,此时所有Follower都尝试创建该节点。
而创建成功者(ZooKeeper保证只有一个能创建成功)即是新的Leader,其它Replica即为Follower。
问题:partition很多时,会造成大量Watch事件被触发,Zookeeper负载会过重。
优化:
它在所有broker中选出一个controller,所有Partition的Leader选举都由controller决定。
controller会将Leader的改变直接通过RPC的方式(比ZooKeeper Queue的方式更高效)通知需为为此作为响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。
leader failover,选举流程:
1)controller 在 zookeeper 的 /brokers/ids/[brokerId] 节点注册 Watcher,当 broker 宕机时 zookeeper 会 fire watch
2)controller 从 /brokers/ids 节点读取可用broker
3)controller决定set_p,该集合包含宕机 broker 上的所有 partition(即获取broker上的所有partition)
4)对 set_p 中的每一个 partition(即读取ISR,并从ISR中任选一个可用的broker作为新的leader,并写回只state节点)
4.1 从/brokers/topics/[topic]/partitions/[partition]/state 节点读取 ISR
4.2 决定新 leader(从ISR任选一个可用的 broker 作为新的 leader)
4.3 将新 leader、ISR、controller_epoch 和 leader_epoch 等信息写入 state 节点
5)通过 RPC 向相关 broker 发送 leaderAndISRRequest 命令
当所有 replica 都不工作时,有两种可行的方案:
1)等待 ISR 中的任一个 replica 活过来,并选它作为 leader。可保障数据不丢失,但时间可能相对较长。
2)选择第一个活过来的 replica(不一定是 ISR 成员)作为 leader。无法保障数据不丢失,但相对不可用时间较短。
3、可靠性:
1)producer生产数据
min.insync.replicas ISR中的最小副本数
replication.factor 副本数
request.required.acks=0/1/-1:producer无需等待直接返回/leader接收成功并确认/需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成。
producer.type=sync
写入高可靠:
topic的配置:replication.factor>=3,即副本数至少是3个;2<=min.insync.replicas<=replication.factor
broker的配置:leader的选举条件unclean.leader.election.enable=false
producer的配置:request.required.acks=-1(all),producer.type=sync
2)consumer消费数据:
auto.commit.enable=false,手动提交,读完消息先处理再commit。
消息去重:建议业务消息本身具备幂等性。
消息顺序性:
Kafka中:发送1条消息的时候,可以指定(topic, partition, key) 3个参数。partiton和key是可选的。
如果你指定了partition,那就是所有消息发往同1个partition,就是有序的。并且在消费端,Kafka保证,1个partition只能被1个consumer消费。
或者你指定key(比如order id),具有同1个key的所有消息,会发往同1个partition。也是有序的。