1.基本概念:scala编写、LinkedIn转交给Apache、版本格式kafka_<scala版本>-<kafka版本>、不属于CPU密集型,多核优于高时钟频率
1)基本命令:
zookeepler启动:bin/zookeeper-server-start.sh config/zookeeper.properties(端口2181)
kafka启动:bin/kafka-server-start.sh -daemon config/kafka.properties(端口9092)
创建Topic:bin/kafka-topic.sh --create --zookeeper localhost:2181 --topic test --partitions 1 --replication-factor 1(查看状态--describe)
发送消息:bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test hello world(回车发送)
消费消息:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
吞吐测试脚本:kafka-producer/consumer-perf-test.sh
2)Topic:每个Topic有若干个partition组成,每个partition有若干个副本,Kafka只能保证分区有序而不是topic有序
3)消息:每个消息由Key和Value组成,支持4种压缩(0:无压缩;1:GZIP;2:Snappy;3:LZ4)
4)副本:leader replica和follower replica,follower replica不提供服务只追随leader replica获取数据;AR(Assigned Replicas)、ISR(In-Sync Replicas)、OSR(Out-of Sync Replicas)
ISR(in-sync replica):与leader副本保持同步的动态replica集合
2.概要设计:
1)高吞吐/低延时:写操作(写入页缓存、io交给操作系统、追加顺序写、微批次),读操作(零拷贝)
大量使用操作系统页缓存,内存操作速度快和命中率高
不直接参与IO操作,交给操作系统
采用追加写入方式
使用以sendfile为代表的零拷贝技术加强网络间的数据传输效率
2)消息持久化:所有消息立即持久化
3)负载均衡和故障转移:负载均衡通过智能化的分区领导选举来实现;故障转移通过心跳和会话的机制来实现
4)伸缩性:启动新kafka服务器即可
3.Producer开发:
1)基本工作流程:消息、分区器(partitioner)、key、分区leader
producer首先使用线程(用户主线程)将待发送的消息封装进一个ProducerRecord实例,然后序列化后发送给partitioner,再由partitioner确定了目标分区后一同发送到位于producer程序中的一块内存缓存(RecordAccumulator)中,而producer的另外一个工作线程(I/O发送线程,也称sender线程)则负责实时的从缓冲区中提取准备就绪的消息封装到一个批次中,统一发送到对应的broker
备注:如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaPro ducer 的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms 的配置,此参数的默认值为60000,即60 秒
2)基本要素(Properties、KafkaProducer、send(ProducerRecord(topic,key,value))、close)
3)发送消息(同步、异步(Callback(onCompletion)))、异常(可重试异常RetriableException和不可重试异常)、重试次数异常
4)主要参数:
基础:bootstrap.servers、key.serializer、value.serializer;
基础准备:buffer.memory=32M;
消息准备:batch.size=16k、linger.ms=0ms;
发送准备:compression.type=0、max.request.size=1M、reties=0、retry.backoff.ms=100ms;
结果:arks(0,1,all)=1、request.session.timeout=30s
5)分区策略:murmurHash2(key)
6)Producer拦截器:ProducerInterceptor(onSend、onAcknowledgement、close)
7)多线程处理:KafkaProducer线程安全,多线程单KafkaProducer实例,多线程多KafkaProducer实例
4.Consumer开发:
1)基础概念:消费组(consumer group)消费组使用一个消费者组名来标识自己,topic的每条消息都只会发送到每个订阅它的消费者组的一个消费者实例上;分区分配策略(partition.assignment.strategy=range、round-robin、sticky);2种消息引擎模型;位移提交(topic _consumer_offsets(group.id+topic+分区号));消费者组重平衡;多线程(消费线程和心跳线程)
2)基本要素(Properties、KafkaConsumer、subscribe、poll、ConsumerRecord)
3)主要参数:
基础:bootstrap.servers、key.serializer、value.serializer、group.id
接受处理数据:max.poll.records=500、fetch.max.bytes、fetch.max.wait.ms (一次拉取等待的最大时间)=500ms、max.poll.interval.ms
处理完以后:enable.auto.commit、connection.max.idle.ms=9分
其他:sesstion.timeout.ms(检查组员崩溃时间)=10s、heartbeats.interval.ms、auto.offset.reset(earliest、latest、none)
4)poll返回条件:足够数据或超时
5)中断方式:一种设置volatile变量,一种wakeup抛出异常
6)提交方式:手动提交(同步异步),自动提交(每隔5秒)
7)重平衡(rebalance):本质是一组协议,规定了一个consumer group是如何达成一致来分配订阅topic的所有分区的
重平衡触发条件:组员发生变更、组订阅topic分区发生变更、组订阅topic数发生变更
rebalance generation
重平衡流程:consumer发起joingroup,coordinate收集,选择leader consumer,lc重新分配,发给coordinater,下发
8)多线程消费:KafkaConsumer非线程安全,一种每个线程维护一个KafkaConsumer,一种单kafkaConsumer+多worker线程
9)consumer拦截器:ConsumerInterceptor(onConsume、onCommit)
5.Kafka设计原理
1)副本和ISR设计:ISR引入是防止落后leader副本进度太多的follower副本被选为leader(参数replica.lag.time.max.ms=10s)
副本不同步原因:请求速度跟不上、进程卡住、新创建的副本
2)LEO更新机制:
follower副本LEO更新:FETCH请求获取数据写log后自动更新LEO
leader副本LEO更新:写log后自动更新LEO
leader副本内follower副本LEO更新:follower副本进行FETCH时上报自己LEO
3)HW更新:
follower副本HW更新:follower副本更新LEO后,比较获取到的leader副本HW和自身当前LEO大小,取较小值
leader副本更新HW更新:4种情况下尝试更新(producer向leader副本写入消息、leader副本处理follower副本FETCH请求、副本成为leader时、被踢出ISR时),通过比较自身包冲follower副本LEO取最小值更新
HW缺陷:数据丢失、数据不一致或离散,通过引入leader epoch解决
4)kafka日志:按照时间顺序在日志尾部追加记录,每个分区都有自己的日志(日志段和日志段索引(位移索引和时间戳索引))
日志留存:基于时间(log.retention=7天),基于大小(log.retention.bytes=-1)
5)精确语义(EOS):
生产端:幂等性producer(enable.idempotence=true(默认为false))和事务支持
幂等性:通过每批次消息赋予序列号,同时producer使用producer id形成Map<(PID,分区号),value>,发送序列号小于等于broker端保存的序列号则抛弃,只能保证单个producer实例的EOS
事务支持:可跨分区保证EOS
消费端:事务支持
6.kafka监控
1)常用工具:kafka-manager,kafka-monitor,cruiseControl(超大规模Kafka集群)