Kafka发行包里自带的样本配置可以用来安装单机服务,但并不能满足大多数安装场景的要求。Kafka有很多配置选项,涉及安装和调优的方方面面。不过大多数调优选项可以使用默认配置,除非对调优有特别需求。有一些配置选项,在单机安装时可以直接使用默认配置,下面看一些常规配置。
常规配置
broker.id
每个broker都需要有一个标识符,使用broker.id来表示。它的默认值是0,也可以被设置成其它任意整数,这个值在整个kafka集群里必须是唯一的。这个值可以任意选定,如果出于维护的需要,可以在服务器节点间交换使用这些ID。建议把它们设置成与机器名具有相关性的整数,这样在进行维护时,将ID号映射到机器名就没有那么麻烦了。例如,如果机器名包含唯一性的数字(例如name1.example.com,name2.example.com),那么用这些数字来设置broker.id就再好不过了。
port
如果使用配置样本来启动Kafka,他会监听9092端口。修改port配置参数可以把它设置成其他任意可以用的端口。要注意,如果使用1024以下的端口,需要使用root权限启动kafka,不过不建议这么做。
zookeeper.connect
用于保存broker元数据的zookeeper地址是通过zookeeper.connect来指定的。localhost:2181表示这个zookeeper是运行在本地的2181端口上。该配置参数是用冒号分割的一组hostname:port/path列表,每一部分的含义如下:
hostname是zookeeper服务器的机器名或ip地址
port是zookeeper的客户端连接端口
/path是可以选的zookeeper路径,作为kafka集群的chroot环境,如果不指定,默认使用根路径
如果指定的chroot路径不存在,broker会在启动的的时候创建它。
为什么使用chroot路径?在kafka集群里使用chroot路径是一种最佳实践。zookeeper集群可以共享给其他应用程序,即使还有其他kafka集群存在,也不会产生冲突。最好是在配置文件里指定一组zookeeper服务器,用分号把它们分隔开。一旦有一个zookeeper服务器宕机,broker可以连接到zookeeper群组的另一个节点上。
log.dirs
kafka把所有消息都保存在磁盘上,存放这些日志片段的目录是通过log.dirs指定的。它是一组用逗号分隔的本地文件系统路径。它是一组用逗号分隔的本地文件系统路径。如果指定了多个路径,那么broker会根据“最少使用”原则,把同一个分区的日志片段保存到同一个路径下。要注意,broker会往拥有最少数目分区的路径新增分区,而不是往拥有最小磁盘空间的路径新增分区。
num.recovery.threads.per.data.dir
对于如下3种情况,kafka会使用可以配置的线程池来处理日志片段:
服务器正常启动,用于打开每个分区的日志片段;
服务器崩溃后重启,用于检查和截短每个分区的日志片段;
服务器正常关闭,用于关闭日志片段
默认情况下,每个日志目录只使用一个线程。因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到并行操作的目的。特别是对于包含大量分区的服务器来说,一旦发生崩溃,在进行恢复时使用并行操作可能会省下数小时的时间。设置此参数时需要注意,所配置的数字对应的是log.dirs指定的单个日志目录。也就是说,如果num.recovery.threads.per.data.dir被设置为8,并且log.dir指定了3个路径,那么总共需要24个线程。
auto.create.topics.enable
默认情况下,kafka会在如下几种情况下自动创建主题:
当一个生产者开始往主题写入消息时
当一个消费者开始从主题读取消息时
当任意一个客户端向主题发送元数据请求时
很多时候,这种行为都是非预期的。而且,根据kafka协议,如果一个主题不先被创建,根本无法知道它是否已经存在。如果显式的创建主题,不管是手动创建还是通过其他系统创建,都可以把auto.create.topics.enable设置为false。
主题配置
kafka为新创建的主题提供了很多默认配置参数。可以通过管理工具为每个主题单独配置一部分数据,比如分区个数和数据保留策略。服务器提供的默认配置可以作为基准,它们适用于大部分主题。
注意,新版本要对主题配置进行覆盖,需要使用管理工具。
num.partitions
num.partitions参数指定了新创建的主题将包含多少个分区。如果启用了主题自动创建功能(该功能默认启动),主题分区个数就该参数指定的值。该参数的默认值是1。要注意,主题分区的个数可以增加,但是不能减少。所以,如果要让一个主题的分区个数小于num.partitions制定的值,需要手动创建该主题。
前面提过Kafka集群通过分区对主题进行横向扩展,所以当有新的broker加入集群时,可以通过分区个数来实现集群的负载均衡。当然这并不是说,在存在多个主题的情况下(它们分布在多个broker上),为了能让分区分布到所有broker上,主题分区的个数必须要大于broker个数。不过,拥有大量消息的主题如果要进行负载分散,就需要大量的分区。
如何选定分区的数量?为主题选定分区数量并不是一件可有可无的事,在进行数量选择时需要考虑以下几个因素:
1 主题需要达到多大的吞吐量?是希望每秒写入100K还是1G?
2 从单个分区读取数据的最大吞吐量是多少?每个分区一般都会有一个消费者,如果消费者写入数据库的速度不会超过每秒50M,那么从一个分区读取数据的吞吐量也不需要超过每秒50M
3 生产者的速度一般比消费者快得多,所以最好为生产者多估算一些吞吐量
4 每个broker包含的分区数,可用的磁盘空间,网络带宽都是考虑因素
5 如果消息按照不同的键写入分区的,那么为已有的主题新增分区就会很困难
6 单个broker对分区个数是有限制的,因为分区越多占用内存越多,完成首领选举需要的时间也越长
综合以上考虑,分区个数要适量的多,但是不能太多,可以根据估算出的吞吐量,用主题吞吐量除以消费者吞吐量算出分区的个数。也就是说,如果每秒要从主题上读写1G数据,并且每个消费者每秒可以处理50M,那么至少需要20个分区,这样就可以让20个消费者同时读取这些分区,从而达到预期吞吐量。如果信息不明,根据经验,把分区的大小限制在25GB以内可以得到比较理想的效果。
log.retention.ms
kafka通常根据时间来决定数据被保留多久。默认使用log.retention.hours参数来配置时间,默认值为168小时,也就是一周。此外,还有两个参数log.retention.minutes和log.retentions.ms。这三个参数的作用是一样的,都是设置消息多久后被删除,推荐使用log.retentions.ms。如果制定了多个,会优先使用单位最小的。
根据时间保留数据是通过检查磁盘上日志片段文件的最后修改时间来实现的。通常最后修改时间指的就是日志片段的关闭时间。也即是文件里最后一个消息的时间戳。不过,如果使用管理工具在服务器间移动分区,最后修改时间就不准确了,时间误差可能导致这些分区过多的保留数据。
log.retention.bytes
通过保留的消息字节数也可以判断消息是否过期,它的值通过参数log.retention.bytes来指定,作用在每一个分区上,也就是说,如果有一个包含8个分区的主题,并且log.retention.bytes被指定为1G,那么此主题最多可保留8G数据,所以,当主题的分区数增加时,整个主题可以保留的数据也随之增加。同时指定了保留时间和保留字节数,只要任意 一个条件达到指定值,消息就会被删除。
log.segment.bytes
以上设置都是作用在日志片段而不是单个消息上。当消息到达broker时,它们被追加到分区的当前日志片段上。当日志片段大小到达log.segment.bytes指定的上限(默认1G)时,当前日志片段就会被关闭,一个新的日志片段就会被打开。如果一个日志片段被关闭,就开始等待过期。这个参数值越小,就会越频繁的关闭和分配新文件,会降低磁盘写入的整体效率。
如果主题的消息量不大,那么如何调整这个参数大小尤为重要。例如一个主题每天只接收100M消息,而log.segment.bytes使用默认值,那么需要10天才能写满一个日志片段,因为日志片段被关闭前消息是不会过期的,所以如果log.retention.ms被设置为604 800 000(也就是一周),那么日志片段最多需要17天才会过期,这是因为关闭需要10天,过期需要七天。
还可以使用时间戳获取偏移量。日志片段的大小会影响使用时间戳获取偏移量。在使用时间戳获取日志偏移量时,Kafka会检查分区里最后修改时间大于指定时间戳的日志片段(已经被关闭的),该日志片段的前一个文件的最后修改时间小于指定时间戳,然后Kafka返回该日志片段(也就是文件名)开头的偏移量。对于使用时间戳获取偏移量来说,日志片段越小结果越准确。
log.segment.ms
另一个可以控制日志片段关闭的参数是log.segment.ms。它指定了多长时间之后,日志片段会被关闭。就像log.retention.bytes和log.retention.ms这两个参数一样,log.segment.bytes和log.segment.ms也可以共同使用,日志片段会在大小或时间达到上限时被关闭,就看哪个条件先满足。默认log.segment.ms没有默认值,所以只根据大小关闭。
在使用基于时间的日志片段时,要着重考虑并行关闭多个日志片段对磁盘性能的影响,如果多个分区的日志片段永远不能达到大小上限,就会发生这种情况,因为broker在启动之后就开始计算日志片段的过期时间,对于那些数据量小的分区来说,日志片段的关闭操作总是同时发生。
message.max.bytes
broker通过设置message.max.bytes参数限定单个消息的大小,默认值时1 000 000。也即是1M。如果生产者尝试发送的消息超过这个大小,不仅消息不会被接收,还会收到broker返回的错误信息。跟其他与字节相关的配置参数一样,该参数指定的时压缩后的消息大小,也就是说,只要压缩后消息小于message.max.bytes指定的值,消息的实际大小可以远大于这个值。
这个值对性能有显著的影响。值越大,负责处理网络连接和请求的线程就需要花更多的时间处理。还会增加磁盘写入块的大小,从而影响IO吞吐量。
消费者客户端设置的fetch.message.max.bytes必须与服务端设置的消息大小进行协调。如果这个值比message.max.bytes小,消费者就无法读取比较大的消息,导致出现消费者被阻塞的情况。在为集群里的broker设置replica.fetch.max.bytes时,也遵循同样原则。