Kafka攻略(入门篇)

一、环境搭建

kafka的搭建依赖zookeeper,所以我们先配置zookeeper

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/home/mq/zkdata
# the port at which the clients will connect
clientPort=2181

server.1=172.18.182.32:2888:3888
server.2=172.18.182.33:2888:3888
server.3=172.18.182.31:2888:3888

zookeeper正常启动后用zkServer.sh status查看状态。
然后配置Kafka的server.properties文件.

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
delete.topic.enable=true
############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://172.18.182.32:9092

port=9092
host.name=172.18.182.32

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=172.18.182.32:2181,172.18.182.33:2181,172.18.182.31:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

之后启动三台kafka

/bin/kafka-server-start.sh -daemon kafka_2.12-2.4.0/config/server.properties

如果jps发现没启动,前台启动查看报错日志,或者取kafka/logs里面查看.

二、常用命令

# 启动kafka
bin/kafka-server-start.sh -daemon config/server.properties
# 创建一个topic
bin/kafka-topics.sh --create --bootstrap-server 172.18.182.32:2181,172.18.182.33:2181,172.18.182.31:2181 --replication-factor 2 --partitions 2 --topic queuing-user-create
# topic列表
bin/kafka-topics.sh --list --zookeeper 172.18.182.32:2181,172.18.182.33:2181,172.18.182.31:2181
# topic描述,如果加--topic则可以看某个topic
# PartitionCount:分区数量,ReplicationFactor:副本数量,Partition:分区编号,Leader:是否是负责读写的分区,如果只有一个则为None,Replicas:这个分区的副本在哪些节点,Isr:数据已同步的节点。
bin/kafka-topics.sh --describe --zookeeper 172.18.182.32:2181,172.18.182.33:2181,172.18.182.31:2181
Topic: queuing-survey-answer-status     PartitionCount: 2   ReplicationFactor: 2    Configs: 
    Topic: queuing-survey-answer-status Partition: 0    Leader: 1   Replicas: 1,2   Isr: 1
    Topic: queuing-survey-answer-status Partition: 1    Leader: 0   Replicas: 2,0   Isr: 0
Topic: queuing-survey-anwser    PartitionCount: 1   ReplicationFactor: 1    Configs: 
    Topic: queuing-survey-anwser    Partition: 0    Leader: none    Replicas: 2 Isr: 2
# 开启一个调试生产者
bin/kafka-console-producer.sh --broker-list 172.18.182.32:2181,172.18.182.33:2181,172.18.182.31:2181 --topic queuing-survey-anwser
# 开启一个调试消费者,也可以9092
bin/kafka-console-consumer.sh --bootstrap-server 172.18.182.32:2181,172.18.182.33:2181,172.18.182.31:2181  --topic queuing-user-create --from-beginning
# 日志查看
bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/queuing-survey-answer-status-1/00000000000000000000.log --print-data-log
# 消费组查看
bin/kafka-consumer-groups.sh --bootstrap-server 172.18.182.32:9092,172.18.182.33:9092,172.18.182.31:9092 --group UserCenterProd --describe
# 手动设置offset
bin/kafka-consumer-groups.sh --bootstrap-server 172.18.182.32:9092,172.18.182.33:9092,172.18.182.31:9092 --group UserCenterProd --reset-offsets --topic queuing-user-add --to-offset 20 --execute

三、基础概念

  • broker:即节点
  • partition:分区,消息会根据key进入不同的分区,由不同的消费者(也可以相同,自动分配)消费。
    --replication-factor:备份,一般小于等于broker的数量。
  • ISR:leader会追踪和维护ISR中所有follower的滞后状态。如果滞后太多(数量滞后和时间滞后两个维度,replica.lag.time.max.ms和replica.lag.max.message可配置),leader会把该replica从ISR中移除。被移除ISR的replica一直在追赶leader。如下图,leader写入数据后并不会commit,只有ISR列表中的所有folower同步之后才会commit,把滞后的follower移除ISR主要是避免写消息延迟。设置ISR主要是为了broker宕掉之后,重新选举partition的leader从ISR列表中选择。(转自 https://blog.csdn.net/dshf_1/article/details/82467558
  • consumer group: 同一个消费组中的不同消费者负责topic的一部分partition;如果是不同消费组,则独立计算offset,即对一个topic消费多次。
  • zookeeper:分布式协调框架,负责协调管理并保存kafka的元数据,比如哪些broker在运行,创建了哪些topic,它们有哪些分区,leader在哪。

四、集群环境建议

因素 考量点 建议
操作系统 操作系统I/O模型 Linux
磁盘 磁盘I/O性能 普通机械硬盘,不需要RAID
磁盘容量 根据消息量和留存时间预估 建议多预留30%空间
带宽 根据业务 如果千兆贷款,建议按照700Mbps来计算

五、最重要的集群参数配置

1)broker端参数

参数 设置建议 概述
log.dir 可以不设置 一般设置log.dirs
log.dirs /home/kafka1,/home/kafka2 设置在不同的物理磁盘上可以提升读写性能,实现failover
zookeeper.connect zk1:2181,zk2:2181,zk2:2181/kafka1 可以多个kafka集群共用一个zk集群
listeners 协议://HOST_NAME:9092 外部连接要通过什么协议访问指定主机名和端口开放的kafka服务
advertised.listeners 协议://HOST_NAME:9092 这组监听器是Broker用于对外发布的,会存在zookeeper中
host.name/port 不要填这两个,早就不用了
auto.create.topics.enable false 不要开启自动创建
unclean.leader.election.enable false 不要unclean选举
auto.leader.rebalance.enable false 关闭定时选举
log.retention.hours 168 数据保存7天
log.retention.bytes 根据空闲硬盘设定 防止服务器爆
message.max.bytes 1000120 kafka最大消息大小,默认是100012

2)topic端参数

参数 设置建议 概述
retention.ms 数据保存时间
retention.bytes 预留空间
max.message.bytes 1000120 kafka最大消息大小,默认是100012
replica.fetch.max.bytes 1000120 根据max.message.bytes设置保证复制
fetch.message.max.bytes 1000120 根据max.message.bytes设置保证消费

3)JVM参数

在启动kafka前配置java的默认堆大小

$> export KAFKA_HEAP_OPTS=--Xms6g  --Xmx6g
$> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
$> bin/kafka-server-start.sh config/server.properties

4)操作系统参数

ulimit -n 1000000

文件系统可以选择XFS
swappiess设置为1
落盘时间可以适当延长

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350