这里又碰到了一个问题,从consumer offsets到leader latest offsets中间延迟了很多消息,在下一次启动的时候,首个batch要处理大量的消息,会导致spark-submit设置的资源无法满足大量消息的处理而导致崩溃。因此在spark-submit启动的时候多加了一个配置:--conf spark.streaming.kafka.maxRatePerPartition=10000。限制每秒钟从topic的每个partition最多消费的消息条数,这样就把首个batch的大量的消息拆分到多个batch中去了,为了更快的消化掉delay的消息,可以调大计算资源和把这个参数调大。
zookeeper-client -server 10.20.8.18:2181
- 在没有kafka 指令脚本的机器上下载kafka执行代码,获得bin文件里的执行文件,注意服务器java版本
 
source /home/hbyang/opt/env.source
# 以stat_ads-log_show-click 为例
cd /home/hbyang/opt/kafka_2.10-0.10.1.1/bin
- 查看kafka中有什么topic
 
./kafka-topics.sh --list --zookeeper 10.20.8.18:2181,10.20.8.19:2181,10.20.8.20:2181
查看topic信息
bin/kafka-topics.sh --zookeeper 10.20.8.18:2181,10.20.8.19:2181,10.20.8.20:2181 --describe --topic t_cdr
创建topic
./kafka-topics.sh --create --zookeeper 10.20.8.18:2181,10.20.8.19:2181,10.20.8.20:2181 --replication-factor 2 --partitions 20 --topic ng_rtst.browser.showclick.ai
- 查看topic 样本数据
 
./kafka-console-consumer.sh --zookeeper 10.20.8.18:2181,10.20.8.19:2181,10.20.8.20:2181 --from-beginning --topic stat_ads-ai_request_ctr | more
- 查看zookeeper 中的groupid
 
zookeeper-client -server 10.20.8.18:2181
ls /consumers
rmr /consumers/test_ads_yhb_1
- 删除topic
 
./kafka-topics.sh --delete --zookeeper 10.20.8.18:2181,10.20.8.19:2181,10.20.8.20:2181 --topic stat_ads-user_ctr_features