第0讲 课程大纲
1、掌握开源大数据技术的基础理论
- 文件系统:HDFS
- 资源管理框架:YARN
- 计算框架:MapReduce、Spark Core • ETL工具:Sqoop、Flume、Kafka
- 数仓和SQL引擎:Hive
- NoSQL数据库:HBase
- 全文搜索引擎:ElasticSearch
- 大数据平台组件:inceptor slipstream&search
第1讲 大数据技术综述
什么是大数据 - 4v特征
- 数据规模巨大(Volume)
传统数据 GB->TB
大数据 TB->PB
- 数据类型多样(Variety)
传统数据:结构化数据
大数据:结构化、半结构化、非结构化数据
- 生成和处理速度极快(Velocity)
传统数据:数据量稳定,增长不快
大数据:实时产生处理、年增长率超60%
- 价值巨大但密度较低(value)
传统数据:统计报表
大数据:机器学习,深度学习
什么是大数据技术
以Hadoop/类Hadoop为代表的分布式技术体系
通过一系列大规模分布式集群技术,实现大数据处理的每个
环节(采集->存储->管理->计算->分析)
• 集群之间精密分工、高度协同
• 大数据技术体系的核心和基础
大数据 vs 传统数据技术
- 大数据技术:以Hadoop/类Hadoop为代表的分布式技术体系
- 传统数据技术:RDB+MPP
RDB(relative database) 关系型数据库
MPP(Massively Parallel Processor)
~ 并行关系数据库(大规模并行处理)
~ 技术基因与Hadoop有本质不同
~ GreenPlum、Teradata、Vertica、Postgre-XL
大数据的发展趋势
- 核心特征:AI + BigData + Cloud 走向深度融合 -->智能化 + 云服务化 + 融合化
- 智能化:分布式人工智能算法;机器学习全流程支持
- 云服务化:容器化的弹性资源管理和调度,为大数据上云奠定了基础;在云端提供完整的大数据产品线
- 融合化:架构、平台融合,统一了数据湖、数据仓库和数据集市;服务融合,分析及服务,统一弹性的分析服务调度和管理;管理融合,统一的数据、模型和应用管理;开发方式融合,SQL + R/Python。。。
第2讲 分布式文件系统HDFS
什么是HDFS
Hadoop分布式文件系统(Hadoop Distributed File System)
HDFS设计目标
• 运行在大量廉价商用机器上:硬件错误是常态,提供容错机制
• 简单一致性模型:一次写入多次读取,支持追加,不允许修改,保证数据一致性
• 流式数据访问:批量读而非随机读,关注吞吐量而非时间
• 存储大规模数据集:典型文件大小GB~TB,关注横向线性扩展
(大文件而不是小文件)
优缺点
优点
• 高容错、高可用、高扩展 -数据冗余,多Block多副本,副本丢失后自动恢复 -NameNode HA、安全模式(核心管理结点放在内存里)
-10K节点规模
• 海量数据存储(大文件可以使磁盘寻道时间不超过读取时间)
-典型文件大小GB~TB,百万以上文件数量, PB以上数据规模
• 构建成本低、安全可靠
-构建在廉价的商用服务器上
-提供了容错和恢复机制
• 适合大规模离线批处理
-流式数据访问
-数据位置暴露给计算框架
缺点
• 不适合低延迟数据访问
• 不适合大量小文件存储
-元数据占用NameNode大量内存空间
1.每个文件、目录和Block的元数据都要占
用150Byte
2.存储1亿个元素,大约需要20GB内存
3.如果一个文件为10KB,1亿个文件大小仅
有1TB,却要消耗掉20GB内存
-磁盘寻道时间超过读取时间
• 不支持并发写入
-一个文件同时只能有一个写入者
• 不支持文件随机修改
-仅支持追加写入
(一个节点一个块,不会把相同的block放在同一个的上面)
基本组成及概念
Active NameNode(AN)(一般只有一个,但可能会存在两个,脑裂)
• 活动Master管理节点(集群中唯一)
• 管理命名空间
• 管理元数据:文件的位置、所有者、权限、数据块等 • 管理Block副本策略:默认3个副本(元数据没有了就是没有了)
• 处理客户端读写请求,为DataNode分配任务
Standby NameNode(SN)
• 热备Master管理节点(Active NameNode的热备节点)
-Hadoop 3.0允许配置多个Standby NameNode
• Active NameNode宕机后,快速升级为新的Active
• 同步元数据,即周期性下载edits编辑日志,生成fsimage镜像检查点文件 (从AN备份)
NameNode元数据文件
- edits(编辑日志文件):保存了自最新检查点(Checkpoint)之后的所有文件更新操作
- fsimage(元数据检查点镜像文件):保存了文件系统中所有的目录和文件信息,如:某个目
录下有哪些子目录和文件,以及文件名、文件副本数、文件由哪些Block组成等- Active NameNode内存中有一份最新的元数据(= fsimage + edits)
- Standby NameNode在检查点定期将内存中的元数据保存到fsimage文件中
DataNode
• Slave工作节点(可大规模扩展)
• 存储Block和数据校验和
• 执行客户端发送的读写操作
• 通过心跳机制定期(默认3秒)向NameNode汇报运行状态和Block列表信息 • 集群启动时,DataNode向NameNode提供Block列表信息
Block数据块
• HDFS最小存储单元
• 文件写入HDFS会被切分成若干个Block
• Block大小固定,默认为128MB,可自定义
• 若一个Block的大小小于设定值,不会占用整个块空间 • 默认情况下每个Block有3个副本
Client
• 将文件切分为Block
• 与NameNode交互,获取文件访问计划和相关元数据 • 与DataNode交互,读取或写入数据
• 管理HDFS
Block存储
• Block是HDFS的最小存储单元
• 如何设置Block大小 -目标:最小化寻址开销,降到1%以下 -默认大小:128M -块太小:寻址时间占比过高 -块太大:Map任务数太少,作业执行速度变慢
• Block和元数据分开存储:Block存储于DataNode,元数据存储于NameNode
• Block多副本
-以DataNode节点为备份对象 -机架感知:将副本存储到不同的机架上,实现数据的高容错 -副本均匀分布:提高访问带宽和读取性能,实现负载均衡
Block副本放置策略
• 副本1:放在Client所在节点 -对于远程Client,系统会随机选择节点
- 副本2:放在不同的机架节点上
- 副本3:放在与第二个副本同一机架的不同
节点上- 副本N:随机选择
- 节点选择:同等条件下优先选择空闲节点
元数据的两种存储形式
• 内存元数据(NameNode)
• 文件元数据(edits + fsimage)
edits(编辑日志文件)
- Client请求变更操作时,操作首先被写入edits,
再写入内存- edits文件名通过前/后缀记录当前操作的Transaction Id
fsimage(元数据镜像检查点文件)
- 不会为文件系统的每个更新操作进行持久化,
因为写fsimage的速度非常慢- fsimage文件名会标记对应的Transaction Id
(先写数据再写日志!!!)
什么是安全模式
- 安全模式是HDFS的一种特殊状态,在这种状态下,HDFS只接收读数据请求,而不接收写入、
删除、修改等变更请求- 安全模式是HDFS确保Block数据安全的一种保护机制
- Active NameNode启动时,HDFS会进入安全模式,DataNode主动向NameNode汇报可用Block
列表等信息,在系统达到安全标准前,HDFS一直处于“只读”状态 "
何时正常离开安全模式
• Block上报率:DataNode上报的可用Block个数 / NameNode元数据记录的Block个数 • 当Block上报率 >= 阈值时,HDFS才能离开安全模式,默认阈值为0.999
• 不建议手动强制退出安全模式
触发安全模式的原因
• NameNode重启
• NameNode磁盘空间不足
• Block上报率低于阈值
• DataNode无法正常启动
• 日志中出现严重异常
• 用户操作不当,如:强制关机(特别注意!)
故障排查
• 找到DataNode不能正常启动的原因,重启DataNode • 清理NameNode磁盘
(法定人数机制,上课重点讲解了!!)
Active NN与Standby NN的主备切换
利用QJM实现元数据高可用
• QJM机制(Quorum Journal Manager) -只要保证Quorum(法定人数)数量的操作成功,就认为这是一次最终成功的操作
• QJM共享存储系统
-部署奇数(2N+1)个JournalNode
-JournalNode负责存储edits编辑日志 -写edits的时候,只要超过半数(>=N+1)的JournalNode返回成功,就代表本次写入成功
-最多可容忍N个JournalNode宕机
-基于Paxos算法实现
利用ZooKeeper实现Active节点选举
第三讲
MapReduce缺陷
• 身兼两职:计算框架 + 资源管理系统
• JobTracker
既做资源管理,又做任务调度 、任务太重,开销过大 、存在单点故障
• 资源描述模型过于简单,资源利用率较低
仅把Task数量看作资源,没有考虑CPU和内存 、强制把资源分成Map Task Slot和Reduce Task Slot
• 扩展性较差,集群规模上限4K
• 源码难于理解,升级维护困难
所以设计了yarn去分担任务?yarn(另一种资源管理)
让mapreduce和spark一样只计算框架?
设计目标:聚焦资源管理、通用(适用各种计算框架)、高可用、高扩展
- yarn也是master/slave架构(计算跟着数据走,每个datanode都会跟着一个yarn,两个进程是呆在一块的)
- 将JobTracker的资源管理、任务调度功能分离
(Job Tracker是Map-reduce框架的中心)- 三种角色:ResourceManager(Master)(相当于namenode?)、NodeManager(Slave)(datanode?)、ApplicationMaster(新角色)(管理应用程序实例, 包括任务调度和资 源申请)
三种角色概述
ResourceManager(RM)
• 主要功能
-统一管理集群的所有资源
-将资源按照一定策略分配给各个应用(ApplicationMaster) -接收NodeManager的资源上报信息
• 核心组件
-用户交互服务(User Service)
-NodeManager管理
-ApplicationMaster管理
-Application管理
-安全管理
-资源管理
NodeManager(NM)
• 主要功能
-管理单个节点的资源
-向ResourceManager汇报节点资源使用情况
-管理Container的生命周期
• 核心组件
-NodeStatusUpdater
-ContainerManager
-ContainerExecutor
-NodeHealthCheckerService
-Security
-WebServer
ApplicationMaster(AM)
• 主要功能
-管理应用程序实例
-向ResourceManager申请任务执行所需的资源
-任务调度和监管
• 实现方式
-需要为每个应用开发一个AM组件
-YARN提供MapReduce的ApplicationMaster实现
-采用基于事件驱动的异步编程模型,由中央事件调度器统一管理所有事件
-每种组件都是一种事件处理器,在中央事件调度器中注册
ps: Container封装了节点上进程的相关资源,是YARN中资源的抽象
它分为运行ApplicationMaster的Container 、运行应用任务的Container
ResourceManager高可用
• 1个Active RM、多个Standby RM
• 宕机后自动实现主备切换
• ZooKeeper的核心作用
-Active节点选举
-恢复Active RM的原有状态信息
• 重启AM,杀死所有运行中的Container
• 切换方式:手动、自动
YARN资源调度策略
- FIFO Scheduler(先进先出调度器)
- Capacity Scheduler(容量调度器)(重点???)
• 核心思想:提前做预算,在预算指导下分享集群资源
• 调度策略
-集群资源由多个队列分享
-每个队列都要预设资源分配的比例(提前做预算)
-空闲资源优先分配给“实际资源/预算资源”比值最低的队列
-队列内部采用FIFO调度策略
•特点
-层次化的队列设计:子队列可使用父队列资源
-容量保证:每个队列都要预设资源占比,防止资源独占
-弹性分配:空闲资源可以分配给任何队列,当多个队列争用时,会按比例进行平衡
-支持动态管理:可以动态调整队列的容量、权限等参数,也可动态增加、暂停队列
-访问控制:用户只能向自己的队列中提交任务,不能访问其他队列 -多租户:多用户共享集群资源
- Fair Scheduler(公平调度器)
• 调度策略
-多队列公平共享集群资源
-通过平分的方式,动态分配资源,无需预先设定资源分配比例
-队列内部可配置调度策略:FIFO、Fair(默认)
• 资源抢占 -终止其他队列的任务,使其让出所占资源,然后将资源分配给占用资源量少于最小资源量限制的队列
• 队列权重
-当队列中有任务等待,并且集群中有空闲资源时,每个队列可 以根据权重获得不同比例的空闲资源
(见面分一半)
第4讲 分布式计算框架(mapreduce spark)
MapReduce
概念
• 面向批处理的分布式计算框架
• 一种编程模型:MapReduce程序被分为Map(映射)阶段和Reduce(化简)阶段
核心思想
• 分而治之,并行计算
• 移动计算,而非移动数据
特点
• 计算跟着数据走
• 良好的扩展性:计算能力随着节点数增加,近似线性递增 • 高容错
• 状态监控
• 适合海量数据的离线批处理
• 降低了分布式编程的门槛
适用场景
• 数据统计,如:网站的PV、UV统计 • 搜索引擎构建索引
• 海量数据查询
• 复杂数据分析算法实现
不适用场景
• OLAP -要求毫秒或秒级返回结果
• 流计算 -流计算的输入数据集是动态的,而MapReduce是静态的
• DAG计算
-多个任务之间存在依赖关系,后一个的输入是前一个的输出,构成有向无环图DAG
-每个MapReduce作业的输出结果都会落盘,造成大量磁盘IO,导致性能非常低下
基本概念(也可以理解为上图的步骤解释)
Job&Task(input)
• 作业是客户端请求执行的一个工作单元 -包括输入数据、MapReduce程序、配置信息
• 任务是将作业分解后得到的细分工作单元 -分为Map任务和Reduce任务
Spilt(切片)
• 输入数据被划分成等长的小数据块,称为输入切片(Input Split),简称切片
• Split是逻辑概念,仅包含元数据信息,如:数据的起始位置、长度、所在节点等 • 每个Split交给一个Map任务处理,Split的数量决定Map任务的数量
• Split的大小默认等于Block大小
• Split的划分方式由程序设定,Split与HDFS Block没有严格的对应关系
• Split越小,负载越均衡,但集群开销越大;Split越大,Map任务数少,集群的计算并发度降低
Map阶段(mapping)
• 由若干Map任务组成,任务数量由Split数量决定
• 输入:Split切片(key-value),输出:中间计算结果(key-value)
reduce阶段(化简)
• 由若干Reduce任务组成,任务数量由程序指定
• 输入:Map阶段输出的中间结果(key-value),输出:最终结果(key-value)
Shuffle(洗牌)(重点!!!)
• Map、Reduce阶段的中间环节,是虚拟阶段
• 负责执行Partition(分区)、Sort(排序)、Spill(溢写)、Merge(合并)、 Fetch(抓取)
等工作
• Partition决定了Map任务输出的每条数据放入哪个分区,交给哪个Reduce任务处理
• Reduce任务的数量决定了Partition数量
• Partition编号 = Reduce任务编号 =“key hashcode % reduce task number”(%为取模/取余数) • 哈希取模的作用:将一个数据集随机均匀分成若干个子集
• 避免和减少Shuffle是MapReduce程序调优的重点
shuffle详解
• Map端
-Map任务将中间结果写入专用内存缓冲区Buffer(默认100M),同时进行Partition和Sort(先按“key
hashcode % reduce task number”对数据进行分区,分区内再按key排序)
-当Buffer的数据量达到阈值(默认80%)时,将数据溢写(Spill)到磁盘的一个临时文件中,文件内
数据先分区后排序
-Map任务结束前,将多个临时文件合并(Merge)为一个Map输出文件,文件内数据先分区后排序
• Reduce端
-Reduce任务从多个Map输出文件中主动抓取(Fetch)属于自己的分区数据,先写入Buffer,数据量达到阈值后,溢写到磁盘的一个临时文件中
-数据抓取完成后,将多个临时文件合并为一个Reduce输入文件,文件内数据按key排序
意思是shuffle在map和reduce两端都有作用,要记住它是一个虚拟过程??双向箭头
JobTracker/TaskTracker模式
在没有yarn的hadoop时代,job Tracker居然是master,task tracker居然是slave (主从模式again,hadoop都是master/slave?)
• JobTracker节点(Master)
调度任务在TaskTracker上运行
-若任务失败,指定新TaskTracker重新运行
• TaskTracker节点(Slave) -执行任务,发送进度报告
• 存在的问题
-JobTracker存在单点故障
-JobTracker负载太重(上限4000节点)
-JobTracker缺少对资源的全面管理 -TaskTracker对资源的描述过于简单 -源码很难理解
spark
产生背景
1、MapReduce有较大的局限性
• 仅支持Map、Reduce两种语义操作
• 执行效率低,时间开销大
• 主要用于大规模离线批处理
• 不适合迭代计算、交互式计算、实时流处理等场景
2、计算框架种类多,选型难,学习成本高
• 批处理:MapReduce
• 流处理:Storm、Flink
• 交互式计算:Impala、Presto
• 机器学习:Mahout
3、统一计算框架,简化技术选型
• 在一个统一框架下,实现批处理、流处理、交互式计算、机器学习
特点(高吞吐、低延时、通用易扩展、高容错)
1、计算高效
• 利用内存计算、Cache缓存机制,支持迭代计算和数据共享,减少数据读取的IO开销 • 利用DAG引擎,减少中间计算结果写入HDFS的开销
• 利用多线程池模型,减少任务启动开销,避免Shuffle中不必要的排序和磁盘IO操作
2、通用易用
• 适用于批处理、流处理、交互式计算、机器学习算法等场景
• 提供了丰富的开发API,支持Scala、Java、Python、R等
3、运行模式多样
• Local模式
• 单机运行,通常用于测试
• Spark程序以多线程方式直接运行在本地
• Standalone模式
• Spark集群独立运行,不依赖于第三方资源
管理系统,如:YARN、Mesos
• 采用Master/Slave架构
• Driver在Worker中运行,Master只负责集群
管理
• ZooKeeper负责Master HA,避免单点故障
• 适用于集群规模不大,数据量不大的情况
• YARN/Mesos模式
• YARN-Client模式:适用于交互和调试
• YARN-Cluster模式:适用于生产环境
编程模型(重点!!!)
弹性分布式数据集(Resilient Distributed Datesets)(RDD)(spark是基于RDD进行计算的)
- 分布在集群中的只读对象集合
- 由多个Partition组成
- 通过转换操作构造
- 失效后自动重构(弹性) -存储在内存或磁盘中
RDD操作
• Transformation(转换)
-将Scala集合或Hadoop输入数据构造成一个新RDD -通过已有的RDD产生新RDD
-惰性执行:只记录转换关系,不触发计算
-例如:map、filter、flatmap、union、distinct、sortbykey
• Action(动作)
-通过RDD计算得到一个值或一组值
-真正触发计算
-例如:first、count、collect、foreach、saveAsTextFile
RDD依赖
• 窄依赖(Narrow Dependency)(一对一)
-父RDD中的分区最多只能被一个子RDD的一个 分区使用
-子RDD如果有部分分区数据丢失或损坏,只需 从对应的父RDD重新计算恢复
-例如:map、filter、union
• 宽依赖(Wide/Shuffle Dependency )(多对多)
-子RDD分区依赖父RDD的所有分区
-子RDD的部分或全部分区数据丢失或损坏,从所有父RDD分区重新计算,必须进行Shuffle
-相对于窄依赖,宽依赖付出的代价要高很多,
尽量避免使用
-例如:groupByKey、reduceByKey、sortByKey
抽象模式
• Driver
-一个Spark程序有一个Driver,一个Driver创建一个SparkContext,程序的main函数运行在Driver中
-负责解析Spark程序、划分Stage、调度任务到Executor上执行
• SparkContext
-负责加载配置信息,初始化运行环境,创建DAGScheduler和TaskScheduler
• Executor
-负责执行Driver分发的任务,一个节点可以启动多个Executor,每个Executor通过多线程运行多个任务
• Task
-Spark运行的基本单位,一个Task负责处理若干RDD分区的计算逻辑
DAG任务规划与调度
DAG(Directed Acyclic Graph)
• 有向无环图:一个有向图无法从任意顶点出发经过若干条边回到该点
• 受制于某些任务必须比另一些任务较早执行的约束,可排序为一个队列的任务集合,该队列
可由一个DAG图呈现
• Spark程序的内部执行逻辑可由DAG描述,顶点代表任务,边代表任务间的依赖约束
DAGScheduler
• 根据任务的依赖关系建立DAG
• 根据依赖关系是否为宽依赖,即是否存在Shuffle,将DAG划分为不同的阶段(Stage)
• 将各阶段中的Task组成的TaskSet提交到TaskScheduler
TaskScheduler
• 负责Application的任务调度
• 重新提交失败的Task
• 为执行速度慢的Task启动备用Task