kafka shell Spark

这里又碰到了一个问题,从consumer offsets到leader latest offsets中间延迟了很多消息,在下一次启动的时候,首个batch要处理大量的消息,会导致spark-submit设置的资源无法满足大量消息的处理而导致崩溃。因此在spark-submit启动的时候多加了一个配置:--conf spark.streaming.kafka.maxRatePerPartition=10000。限制每秒钟从topic的每个partition最多消费的消息条数,这样就把首个batch的大量的消息拆分到多个batch中去了,为了更快的消化掉delay的消息,可以调大计算资源和把这个参数调大。
zookeeper-client -server 10.20.8.18:2181

  1. 在没有kafka 指令脚本的机器上下载kafka执行代码,获得bin文件里的执行文件,注意服务器java版本
source /home/hbyang/opt/env.source
# 以stat_ads-log_show-click 为例
cd /home/hbyang/opt/kafka_2.10-0.10.1.1/bin
  1. 查看kafka中有什么topic
./kafka-topics.sh --list --zookeeper 10.20.8.18:2181,10.20.8.19:2181,10.20.8.20:2181

查看topic信息
bin/kafka-topics.sh --zookeeper 10.20.8.18:2181,10.20.8.19:2181,10.20.8.20:2181 --describe --topic t_cdr
创建topic
./kafka-topics.sh --create --zookeeper 10.20.8.18:2181,10.20.8.19:2181,10.20.8.20:2181 --replication-factor 2 --partitions 20 --topic ng_rtst.browser.showclick.ai

  1. 查看topic 样本数据
./kafka-console-consumer.sh --zookeeper 10.20.8.18:2181,10.20.8.19:2181,10.20.8.20:2181 --from-beginning --topic stat_ads-ai_request_ctr | more
  1. 查看zookeeper 中的groupid
zookeeper-client -server 10.20.8.18:2181
ls /consumers
rmr /consumers/test_ads_yhb_1
  1. 删除topic
./kafka-topics.sh --delete --zookeeper 10.20.8.18:2181,10.20.8.19:2181,10.20.8.20:2181 --topic stat_ads-user_ctr_features
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容