前言
storm是分布式实时大数据处理框架(流计算),此文是学习过程中的笔记,分享出来供有缘人参考~
一、基础知识
- 与Hadoop的对比
主节点 | 工作节点 | 作业 | |
---|---|---|---|
Storm | Nimbus | Supervisor | topology |
Hadoop | JobTracker | TaskTracker | Mapper和Reducer |
- 应用场景不同,storm强调实时性,hadoop强调批处理,常用于对已经存在的大量数据进行数据挖掘、分析
- MapReduce的设计目标是服务于那些只需要数分钟或者数小时即可以完成的作业,hadoop高延迟
- hadoop着重于计算和数据存储,storm只着重于计算
- MapReduce任务执行完成就结束,storm的任务会一直运行下去,不断的处理最新的记录
- 术语
- topology:一个任务,一个Topology运行起来就不能停止会无限的运行下去,除非手动干预或意外故障
- Nimbus:主节点上的守护进程,负责在集群中分发代码,为工作节点分配任务,并监控故障
- Supervisor:工作节点
- Spout:任务组件,跟mapper功能类似,用于获取数据,消息生产的源头
- Bolt:任务组件,跟reducer功能类似,用于数据处理,Bolt可以接收一个或多个Spout的Tuple消息,也可以来自多个其他的Bolt的Tuple消息,也可能是Spout和其他Bolt组合发送的Tuple消息
- Stream Grouping:用来定义各个计算组件之间流的连接、分组、分发关系。流的分组策略
* Shuffle Grouping(随机分组):随机分发tuple到Bolt的任务,保证每个任务获得相等数量的tuple
* Fields Grouping(按字段分组):根据指定的字段分割数据流并分组
* All Grouping(广播分组)
* Global Grouping(全局分组)
* Non Grouping(不分组)
* Direct Grouping(直接分组)
* Local or Shuffle Grouping(本地/随机分组) - 并发
-
Workers:工作进程
config.setNumWorkers(6);
-
Executor:工作线程 (parallelism_hint 代表executor的数量)
builder.setSpout("kafkaSpout", kafkaSpout,2).setNumTasks(3);
表示两个executor,3个task
Task:每个组件需要执行的任务数,执行数据处理的最小工作单元,并不是线程
-
关系图
-
- 特性
- 容错性高:
- 可扩展
- 可靠性高:Storm保证每个消息至少能得到一次完整处理。任务失败时,它会负责从消息源重试消息
- 快速
- 事务性
二、Topology组件
-
简单示例
- 一个topology中,必须同时存在Spout和Bolt,Spout和Bolt的数据是可以任意的
- Spout是从外部数据源中获取数据,以一定的格式将数据传递给Bolt处理
- Spout需要不断的检测外部数据源有没有最新的数据
-
复杂示例
三、安装
-
下载,官网
http://mirror.bit.edu.cn/apache/storm/apache-storm-1.1.1/apache-storm-1.1.1.tar.gz
-
配置文件,查看默认配置文件
storm.zookeeper.servers: - "10.1.0.221" - "10.1.0.222" - "10.1.0.223" nimbus.seeds: ["SMILE-DEV3", "SMILE-DEV4"] storm.local.dir: "/home/storm_data" storm.log.dir: "/home/storm_data/logs" #可用端口号配置,每个对应一个worker,最好设置成OS核数的整数倍 #Slot数不要大于每台物理机可运行Slot个数:(物理内存-虚拟内存)/单个Java进程最大可占用内存数 supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703
-
启动
bin/storm nimbus > /dev/null 2>&1 & bin/storm supervisor > /dev/null 2>&1 & bin/storm ui > /dev/null 2>&1 & #或者使用以下命令 bin/storm nimbus & bin/storm supervisor & bin/storm ui &
四、开发
-
构建maven
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.1.1</version> <scope>provided</scope> </dependency>
spout
//Spout被实例化时会调用一次,一般作一些初始化操作
//conf:保存配置信息及一些默认参数
//context:保存上下文信息
//collector:Spout输出采集器,emit方法将数据以Tuple的形式发送给Bolt
void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {}
void nextTuple() {}
//声明由这个Spout输出的流中的每一个Tuple,包含哪些字段。
void declareOutputFields(OutputFieldsDeclarer declarer) {}
-
bolt
//处理自己业务逻辑的方法 execute()
-
设置并发
conf.setNumWorkers() 配置 worker 的数量 builder.setBolt("NAME", new Bolt(), 并行度) 设置 executor 数量 spout/bolt.setNumTask() 设置 spout/bolt 的 task 数量
默认情况下,一个 worker 分配 768M 的内存,外加 64M 给 logwriter 进程;因此一个 worker 会耗费 832M 内存;
-
提交Topology
storm jar *.jar com.yjq.*
单机模式下UI界面中Topology Summary无信息,只有提交distributed mode才能显示
-
Storm UI 常用参数
- execute latency:消息的平均处理时间
- process latency:消息从收到 到 被ack掉所花费的时间
- capacity:处理能力是否已饱和,数据代表占用资源的百分比
-
storm reblance 重新设置并发,每个设置可以单独使用设置
重新配置拓扑 “mytopology” 使用5个worker进程。
spout “blue-spout” 使用3个executor
bolt “yellow-bolt” 使用10个executorstorm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
-
Config
//Storm处理消息的超时时间 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS //设置acker的数量,也占有线程数 Config.TOPOLOGY_ACKER_EXECUTORS
五、参考网站
最后
求赞~