kafka配置


producer


bootstrap.servers

kafka server地址,设置一个节点虽然仍然可以拿到集群信息但是避免当前节点挂掉,建议设置多个.

建议: 当设置一个host时只要这个host不挂是不影响使用的,但是为了高可用建议把所有server都配置上.

batch.size

批量发送到服务端的字节数大小,批量发送可以提高整体的吞吐量,但是如果设置过大会导致内存浪费.

根据应用实际情况设置,如果要求高吞吐量建议开启.默认关闭.

acks

server端返回ack的个数.

  • =0那么不需要任何确认,添加到socket buffer中后就当作成功处理,这时retries设置就会无效了因为默认就发送成功不存在重试情况;
  • =1那么只需要该partition的master确认后才算发送成功.存在的问题是当写入master后并未同步到ISR中的follower那么该条消息会丢失;
  • =ALL || =-1需要ISR中的所有节点都确认收到才算发送成功.安全级别最高;

默认值=1master确认后才算发送成功,满足大部分需求,如果消息量不大或者对消息准确性要求高建议=ALL

linger.ms

发送消息的延迟时间,比如设置5ms那么消息会逗留5ms来等待更多消息批量发送,和上面batch.size组合使用,当满足batch.sizelinger.ms条件之一就会发送,默认值是0,未设置的情况下消息直接发送出去了.

默认值=0,直接发送.根据应用实际情况调节.收益类似batch.size.

client.id

应用标识,消息溯源时可以使用,设置后server端的log中会同时记录,host/ip/clientid.

建议添加该配置.

send.buffer.bytes

设置tcp send buffer,如果设置为-1则使用操作系统默认值.建议跟随系统默认值,错误设置会导致TCP协议层出现问题.

默认128K,原理不清楚不建议调整.

receive.buffer.bytes

设置tcp receive buffer,如果设置为-1则使用操作系统默认值.建议跟随系统默认值,错误设置会导致TCP协议层出现问题.

默认32K,原理不清楚不建议调整.

max.request.size

单次请求最大字节数的限制,影响batch.size配置,另外这个参数服务端也有自己的配置,存在服务端与客户端不同的情况.

默认1Mb, 根据应用实际情况调整.如果开启批量可能需要调整此参数.

reconnect.backoff.ms

连接在重试之前等待时间,避免短时间频繁的连接服务端.

reconnect.backoff.max.ms

重试连接的最大等待时间,和上一个参数相关联,如果设置了这个参数,那么当连接失败后时间会成指数增加避免connection storm,比如第一次失败,重连等待1s,又失败了重连等待3s(只是举例,具体值不是这个).

max.block.ms

当缓冲区满了或者metadata获取不到时,调用KafkaProducer.send()KafkaProducer.partitionsFor()阻塞的最大时间.

默认50ms,不建议调整.

buffer.memory

设置缓冲区大小,如果消息产生速度大于发送到broker的速度,缓冲区用量就会增加如果满了就会阻塞max.block.ms设置的时间,阻塞后缓冲区仍然是满的将会抛异常.

默认32Mb,可以通过线上JMX MBean观察,如果消息量比较小一般用不了这么大.

retry.backoff.ms

设置请求失败重试延后时间,同样避免短时内大量重试的问题.

默认100ms,不建议修改

compression.type

压缩类型,默认没有压缩.提供none, gzip, snappy, lz4压缩算法.

默认无压缩,消息量大建议压缩.节省空间,节省网络传输(带宽是稀缺资源).留心压缩带来的CPU消耗.

metrics.sample.window.ms

指标计算的时间窗口

metrics.num.samples

计算指标抽样的数量

metrics.recording.level

记录指标的级别

metric.reporters

设置reporter类型,需要实现org.apache.kafka.common.metrics.MetricsReporter,如果有事件发生则会触发对应的调用.

max.in.flight.requests.per.connection

一个connection上未ack的请求数,如果这个值大于1,重试时就会导致顺序是乱的.

retries

当发送错误时重试次数,max.in.flight.requests.per.connection与这个参数有关系,如果max.in.flight.requests.per.connection大于1则存在乱序的可能.举例:两批消息,第一批发送失败,第二批发送成功,第一批重试.那么第二批反而在前面.

key.serializer

序列化方式,需要实现org.apache.kafka.common.serialization.Serializer接口.

value.serializer

序列化方式,需要实现org.apache.kafka.common.serialization.Serializer接口.

connections.max.idle.ms

空闲多久之后会关闭这个connection.

partitioner.class

需要实现org.apache.kafka.clients.producer.Partitioner这个接口,作topic partition的路由规则.

request.timeout.ms

针对请求的响应时间超时时间设置,如果超过了这个时间则进行重试,如果重试次数到了上限则失败.该参数的值应该大于broker端的配置replica.lag.time.max.ms, 避免因为数据备份导致的重试.

interceptor.classes

拦截器配置,需要实现org.apache.kafka.clients.producer.ProducerInterceptor这个接口,默认无.

enable.idempotence

开启幂等,新功能0.11版本引入的.如果设置为true则保证每条消息只被写入一次.并且要求max.in.flight.requests.per.connection<=5,retries>0,acks=all.如果这些参数没有明确设置那么将自适应,如果配置了但是参数之间不兼容则会抛ConfigException异常. 幂等只对单partition生效.

transaction.timeout.ms

transaction coordinator等待事务状态更新的超时时间,如果改值比broker中的transaction.max.timeout.ms大,那么请求将发生InvalidTransactionTimeout错误.

transactional.id

事务ID,需要enable.idempotence=true,如果未设置则不开启事务功能

使用producerkafka时建议都添加callback,即便什么都不做也要打印行日志.可以参考:org.apache.kafka.clients.producer.internals.ErrorLoggingCallback


Consumer


group.id

标识当前消费者属于哪个消费组

建议配置,管理offset的必要参数

max.poll.records

调用一次poll()返回的记录数量

建议设置,和其他参数有关联,如果单次拉取条数过多导致消费时间过长未commit,可能会导致该consumer被判定为down掉了,从而触发rebalance后其他consumer会重复消费.

max.poll.interval.ms

调用poll()方法的最大间隔时间,如果在间隔时间内没有调用poll()那么这个consumer被判定为失败,触发rebalance.

session.timeout.ms

该参数用来判定consumer是否失效,consumer会周期性的发送心跳到broker,如果在这个配置的周期内并未发送心跳那么判定该consumer失效,触发rebalance.

这个参数配置一定要在broker两个配置数值之间group.min.session.timeout.msgroup.max.session.timeout.ms.

heartbeat.interval.ms

两次心跳的间隔时间,心跳的作用有两方面:

  1. 确保kafka知道当前的consumer是存活的
  2. 当consumer group发生变化时(新增或者减少),能确保触发rebalance

建议配置的值要小于session.timeout.ms/3

bootstrap.servers

kafka server地址,设置一个节点虽然仍然可以拿到集群信息但是避免当前节点挂掉,建议设置多个.

建议: 当设置一个host时只要这个host不挂是不影响使用的,但是为了高可用建议把所有server都配置上.

enable.auto.commit

如果设置成true,那么将异步周期性的提交offset

尽量设置成false,由自动变手动更加能控制消息的消费情况.

auto.commit.interval.ms

自动提交的周期时间. 开启enable.auto.commit后有效

partition.assignment.strategy

partition的分配关系,也就是确定partition与consumer之间关系的方式

默认org.apache.kafka.clients.consumer.RangeAssignor

auto.offset.reset

offset重置的规则,如果新的topic kafka没有对应的offset信息,或者当前的offset无效了(历史数据被删除),那么需要指定新的offset是什么,提供几种类型:

  1. earliest: 队列中能找到的最早的offset
  2. latest: 加入时最新的offset
  3. none: 找不到就报错
  4. anything else: 直接报错

默认latest

fetch.min.bytes

fetch一次返回的最小字节数,如果不够这个字节数就等待.

默认值为1字节.调大这个参数可以增加server端的吞吐量

fetch.max.bytes

fetch一次返回的最大字节数,如果第一条消息的大小超过了这个限制仍然会继续拉取保证consumer的正常运行.因此并不是一个绝对的配置,消息的大小还需要受到broker的message.max.bytes限制,以及topic的max.message.bytes的限制.

默认值是50Mb,

fetch.max.wait.ms

如果没达到fetch.min.bytes配置的值,fetch请求阻塞的最长时间

默认500ms

metadata.max.age.ms

周期性的拉取metadata,即便服务端没发生变化

默认3分钟

max.partition.fetch.bytes

fetch.max.bytes类似,只不过这个是限制单partition

默认值1Mb

send.buffer.bytes/receive.buffer.bytes/client.id/reconnect.backoff.ms/reconnect.backoff.max.ms/retry.backoff.ms/metrics.sample.window.ms/metrics.num.samples/metrics.recording.level/metric.reporters

同producer

check.crcs

使用crc32的方式校验消息是否准确,避免磁盘等其他原因导致的消息错误,该功能有一定的性能损失,追求极致的性能需要关闭

默认true

key.deserializer/value.deserializer

同producer

request.timeout.ms

同producer

default.api.timeout.ms

指定consumer所有操作的超时时间

interceptor.classes

consumer拦截器,需要实现org.apache.kafka.clients.consumer.ConsumerInterceptor .

默认为空

exclude.internal.topics

是否排除kafka内部队列,比如offset队列.

默认值true

internal.leave.group.on.close

当consumer关闭时是否离开group, 如果设置为false,则不离开组直到时间超过session.timeout.ms导致rebalance.

isolation.level

消息的隔离级别,类似mysql, 如果设置的是read_uncommitted那么调用consumer.poll()将读取到所有消息,如果设置了read_committed那么调用consumer.poll()将只能读取到已经commit的消息.

使用kafkaconsumer时建议commit offset的操作都修改成手动提交,来控制消息消费的情况.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,542评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,822评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,912评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,449评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,500评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,370评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,193评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,074评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,505评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,722评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,841评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,569评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,168评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,783评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,918评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,962评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,781评论 2 354

推荐阅读更多精彩内容

  • ############################# System ####################...
    snail_knight阅读 1,736评论 0 0
  • 1 生产者配置参数:kafka.producer.retries 当生产者发送失败的时候重试的次数,大多数情况下,...
    哈哈_dfde阅读 10,938评论 0 4
  • 不管是把 Kafka 作为消息队列、消息、总线还是数据存储平台来使用 ,总是需要有一个可以往 Kafka 写入数据...
    消失er阅读 11,243评论 1 5
  • 都说深夜是自我的独白,今夜喝茶喝多了的我有点难以入眠。 关灯后,我闭眼几分钟又爬起来开了床头小灯。心里不知名的思绪...
    零兰阅读 444评论 3 4
  • 放假抽空和同学打了一哈上海大学生的ctf,题目质量一般,脑洞太大,不过有两道代码审计题目还是不错的,同时自己也有好...
    yangc随想阅读 939评论 0 0