Kafka producer 解析

前言

Kafka 作为一个消息系统,其中很大的一个用途就是作为业务上的解耦,而它实现的模式就是经典的生产者消费者模式。毫无疑问,就出现了producer、consumer。然后消息总得有地方存放啊,然后就有了具体的broker,那在broker上是如何进行组织和存放的,就出现了partition。对应的为保证消息不丢失,也就出现了消息备份组这样一个概念(ISR,in-sync replica)再加上消息的topic也就形成了,kafka的 topic-partition-message 的三级负载结构。到这里Kafka中比较核心的几个概念就都有了,下面开始详细介绍。

producer

producer也就是生产者,是kafka中消息的产生方,产生消息并提交给kafka集群完成消息的持久化,这个过程中主要涉及ProducerRecord对象的构建、分区选择、元数据的填充、ProducerRecord对象的序列化、进入消息缓冲池、完成消息的发送、接受broker的响应。
具体的流程是这样的:


image.png

1、确定topic信息
2、确定value信息
3、然后进行消息的序列化处理
4、由分区选择器确定对应的分区信息
5、将消息写入消息缓冲区
6、完成消息请求的发送
7、完成消息响应的处理

ProducerRecord:

ProducerRecord 对象比较核心的信息有:topic、partition(这个信息是根据分区选择器来确定的)、key、value、timestamp

PS:时间戳信息是默认当前时间的,但是用户可以指定时间戳信息,但是不推荐这么做,broker中大体有这么几种log也就是消息存放文件普通日志文件,时间索引文件,普通索引文件。如果强行指定时间戳很有可能导致时间索引失效。

元数据:

元数据信息主要包括offset消息在分区日志中的位移信息、timestamp、topic/partition topic及对应的分区信息、checksum 消息对应的CRC32码、serializedKeySize 序列化后的key的字节数、serializedValueSize 序列化后的Value的字节数

Partition:

分区选择器,默认是murmur2 对于key进行hash计算然后对于总分区数求模以此得到被发送的分区号,当然我们实现producer时可以自定义partition,或者指定特定分区。

serializer:

serializer是kafka实现的自己的序列化工具用于将消息对象序列化成字节序列,Kafka中提供了ByteArraySerializer、ByteBufferSerializer、BytesSerializer、Long(Double Integer String)Serializer等几种序列化方法,用户也可以使用自定义的或者第三方的序列化工具。只需要使用指定对应参数即可(切记Kafka中指定对应的工具类时都是使用权限定名称来做的)

序列化相关的参数有如下:

key.serializer 针对各个部分做序列化方式

key.deserializer key.serializer对应解序列化方式

value.serializer对value部分指定的序列化方式

value.deserializer value.serializer对应解序列化方式

可以简单的理解为key要比value的应用范围广。

batch:

buffer.memory 指定producer待发送消息缓冲区的内存大小,默认32m,如果需要更改就使用这个参数进行修改。这里需要注意的是当producer端写消息的速度超过了专属IO线程发送消息的速度,并且缓冲区的消息数量超过buffer.memory指定的大小时,producer会抛出异常通知用户介入处理,这个缓冲区的大小需要根据实际场景来确定。

batch.size 指一个batch的大小,它直接决定了一个batch中存在的消息数量,这个直接与producer的吞吐量及延时等直接相关,因为所谓的micr-batch 是指原本应该串行一条条发送的消息更改为缓存一部分消息,等达到对应的消息规模时一次性发送,也不会像批处理规模那么大(主要为了平衡延时与性能,这个会有专门的篇章来介绍micr-batch)

linger.size

producer端会专门划出一部分内存用于待发送消息的缓存,batch.size决定了发送消息数量,同时间接决定了消息缓存时存在的延时。linger.size 就是针对这一点设计出来的,它决定了消息被投放进缓冲区时是否立马被发送,默认参数是0(立即发送),这个大多数情况下是合理的,但是会很大程度上拉低kafka的吞吐量。具体要根据实际的使用场景来确定了。

通信协议:

kafka 并没有使用现有的http协议等,而是在TCP 协议之上实现了自己的通信协议。单个client会创建多个socket链接与多个broker进行交互,Kafka 原生Java client使用类似于epoll的方式在单个连接上不停的轮训传数据,但是每个broker上只需要维护一个Scoket链接,保证了消息的请求的顺序处理,所以很清晰的可以看到在client端就需要我们自己去维护这个顺序了。

整体来说Kafka 中大约有三类连接:client与broker之间消息传输、controller 与所有broker之间的交互、client 获取元数据&rebalance的通信过程。

同其他协议类似,Kafka的通信协议的请求和响应也都是格式化的。由 固定长度初始类型(int8、int16、int32、int64)、可变长度类型(bytes、string)、数组。请求头由 api_key(int16,请求类型)、api_version(int16,请求版本号)、correlation_id(int32,与请求响应的关联号,这个字段就是给响应用的)、client_id(client id)

经常接触到的Kafka请求类型有:PRODUCE请求(生产消息请求)、FETCH请求(服务于消费消息,并不一定是clients向broker拉消息,也可能是follower副本向leader副本索要消息)、METADATA请求(获取指定topic的元数据信息:[topics]+allow_auto_topic_creation)

PS:这里有一点需要说明,clients与broker是单向兼容的,这个在生产环境中如果不注意是格外容易发生问题的。这个兼容性是指,高版本broker可以兼容低版本clients,但是低版本broker无法兼容高版本clients,所以说升级clients版本,尤其是对接新的consumer时一定要格外注意。这个问题主要针对非Java client的,对于Java client来说,会自动判断连接的broker端所支持的client请求的最高版本。

producer interceptor

拦截器是新版本才出现的一个特性,并且是非必须的,interceptor 核心的函数有onSend(在消息序列化计算分区之前就被调用)、onAcknowleagement(被应答前或者说在发送失败时,这个方法是运行在producer的I/O线程中的,所以说如果存在很多重逻辑的话会导致严重影响处理消息的速率)、close。通常是通过为clients定制一部分通用且简单的逻辑时才会使用的。

压缩算法

Kafka支持的压缩算法还是很可观的:GZIP、Snappy、LZ4,默认情况下不进行消息压缩,毕竟会消耗很大一部分cpu时间,导致send方法处理时间变慢。启动LZ4 进行消息压缩的producer的吞吐量是最高的。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,496评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,407评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,632评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,180评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,198评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,165评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,052评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,910评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,324评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,542评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,711评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,424评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,017评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,668评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,823评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,722评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,611评论 2 353

推荐阅读更多精彩内容