storm小结

1、动态调整线程数:

storm rebalance topotest -w 10 -n 1 -e mybolt=2 -e myspout=1

可以提高任务平行度 ,从而提高CPU核利用率。

2、两个重要的分组策略:

shuffleGrouping 

fieldsGrouping

运用:词频统计,每个bolt开启三个线程

3、storm的drpc的运用:

说明:内置的spout发射的数据:0表示id号,1表示数据

自定义bolt发射数据通关过collector,发射数据有要求:第一个位置必须是id号(因为最后一个bolt是内置bolt,有规定),后面的参数是我们真正要发射的数据

出错:必须明确定义下一个bolt字段,即declare方法必须定义。

1)本地:直接在eclipse中运行。

2)远程:drpc host:在集群中配置drpc server主机地址,drpc port:默认是3773 ,并使用命令storm drpc开启drpc服务,之后去web页面访问。

4、storm的消息可靠性的实现

ack线程和fail

5、storm的消息可靠性的原理

6、storm与Trident

代码

第一个案例:文本行数统计

spout的任务:从kafka中接收文本数据,一条一条发射给bolt

bolt的任务:每接收一条数据,统计次数+1

这里指定bolt为两个线程,因此每个线程统计 了一半的数据,最终的数据是两个线程统计结果之和。

datacount结果图

第二个案例:词频统计

spout的任务:从kafka中接收文本数据,文本格式不一定,采取shuffleGrouping的策略一条一条发射给splitbolt

splitbolt的任务:将一行文本拆分成一个个单词,采取fieldsGrouping策略将数据分发给每一个countbolt线程,保证相同单词一定在同一个线程

countbolt的任务:统计每个单词的出现次数(提示:用hashMap),最终将结果打印在控制台。


wordcount结果图

注意:countbolt中的一个execute方法每次只会接收一条数据,因此hashmap集合必须定义在外面;

在execute方法中统计一个单词出现的次数,通过tuple获取单词,然后判断这个单词在map集合中是否出现过,没有出现,则count复制为0;然后将这个单词和它的count扔进map集合里,并进行累加。

如果要提交到集群上面,spout获取文件的路径必须是Linux系统路径

第三个案例:客户端传递参数spark,通过stormDRPC输出hello spark

bolt的任务:继承BaseRichBolt,execute方法中写需求

main方法:DRPC的实例和代理类等等。

第四个案例:storm第二天作业:统计每天的pv和UV,在此基础上实现消息的可靠性

思路和wordcount类似,关键是谁是Word,因为统计的是uv,因此最后一个字段是Word,统计不同Word的个数

每天pv:在datebolt中按照fieldsGrouping分离数据,这个field字段是日期字段,将日期相同的分到一组,然后在pvbolt中统计这个字段出现的次数,即pv,可以把这个结果写出到一个pv统计文件中;

每天uv,在每天pv的基础上统计uv,这一次的Word变成了最后一个字段,统计不同单词出现的次数,用Treeset,最后遍历set集合统计次数。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • storm的集群提交方式 StormSubmitter.subnitTopology()方法 问题一、如何把sto...
    夙夜M阅读 564评论 0 0
  • 目录 场景假设 调优步骤和方法 Storm 的部分特性 Storm 并行度 Storm 消息机制 Storm UI...
    mtide阅读 17,201评论 30 60
  • Clojure实战(5):Storm实时计算框架 | Ji ZHANG's Bloghttp://shzhangj...
    葡萄喃喃呓语阅读 1,324评论 0 2
  • 一. wordCount Topology开发: 1.spout数据收集器(SentenceSpout类): 有...
    奉先阅读 1,223评论 0 0
  • “大道合乎自然。”做人存在一定的通则和规律,那就是“物贵天然、人贵自然”。真诚不作秀、诚信不作假、崇高不媚俗、包容...
    漫游家阅读 783评论 2 6