1. 关于Hadoop YARN
1.1 Hadoop YARN基本情况
Hadoop YARN是一个分布式的资源管理和任务调度的框架,其架构如下图所示(图片来源:Apache Hadoop 3.3.1 – HDFS Architecture
):
YARN采用的是主从架构,RM为主节点(Master)、NM为从节点(Worker)。RM负责集群的组织协调,NM负责执行RM分配的工作。为了更好的理解YANR架构,我们通过下图的应用提交流程来理解各个组件的职责。
- 首先Client向RM发起应用提交请求
- RM审核通过之后,会基于调度策略协调相应的NM提供一个1号Contaienr启动运行该应用的AM
- AM启动后会向RM进行注册
- 在向AM注册的同时,AM还可以向RM申请额外的资源执行应用所需要执行的任务,比如Flink应用会向RM申请资源运行Flink的TaskManager,而AM则是作为Flink集群的JobManager。Spark应用则会向RM申请资源启动TaskExecutor进程。
- RM结合当前集群的资源使用情况,判定可以陪陪资源后,会同样基于调度策略协调相应的NM创建相应的Container运行AM请求需要执行的进程。
- 分配的应用Container会于AM建立通信
- Client可以直接同AM进行通信以获取状态、进度等信息
- 当应用执行完成之后,AM向RM发送停止应用的通知,则资源会被回首。
通过上述流程,可以总结各个组件的职责:
- RM:
- 接收Client的应用提交请求
- 协调 NM 创建AM容器进程
- 接收来自AM的资源分配请求
- 接收来自AM及NM的状态报告
- NM:
- 基于RM的指示提供Container运行应用
- 将NM当前的状态以心跳的方式发送给RM
- AM:
- 向RM申请资源执行并管理相应的Container
- 向RM汇报应用状态
- 与Client通信
- Container:
- 资源的提供形式,负责提供计算能力
- Client:
- 与RM通信,提交应用请求
基于职责单一的涉及原则,RM内部包含两个组件:ApplicationManager和Scheduler。ApplicationManager负责管理提交到Yarn的应用,并监控AM状态,如果AM挂掉,需要负责将其重新拉起(基于策略)。Scheduler则负责资源调度,YARN提供了多种类型的Scheduler,包括默认的CapacityScheduler和FairScheduler。
1.2 YARN相关执行命令
1.2.1 启停yarn集群
- 启动:
start-yarn.sh
- 停止:
stop-yarn.sh
1.2.2 启停RM
- 启动:
yarn --daemon start resourcemanager
- 停止:
yarn --daemon stop resourcemanager
1.2.3 启停NM
- 启动:
yarn --daemon start nodemanager
- 停止:
yarn --daemon stop nodemanager
1.2.4 启停timeline server
- 启动:
yarn --daemon start timelineserver
- 停止:
yarn --daemon stop timelineserver
1.3 YARN的关键配置
1.3.1 yarn-site.xml
HA相关配置
RM具有单点故障问题,可以通过开启HA配置,通过主备(热备)架构实现RM的HA。RM提供了基于ZK的异常恢复机制,在主节点发生故障之后,能够自动实现主到备的切换。
属性 | 值 | 说明 |
---|---|---|
yarn.resourcemanager.zk-address | <zk1-address>:<zk1-port>,<zk2-address>:<zk2-port>,... | 配置RM高可用ZK地址【已废弃】,现在应用core-site.xml中的hadoop.zk.address属性进行配置。 |
yarn.resourcemanager.ha | true | 开启RM高可用 |
yarn.resourcemanager.ha.automatic-failover.zk-base-path | /yarn/yarn-leader-election | 基于ZK的HA策略的ZNODE路径 |
yarn.resourcemanager.ha.rm-ids | rm1,rm2 | 开启RM高可用后,通过该参数配置RM节点的清单 |
yarn.resourcemanager.ha.id | 可选配置,声明每个RM节点的ID;如果不配置系统将默认按照hostname/address生成 | |
yarn.resourcemanager.hostname.{rm-id} | <rm-id.hostname> | 配置每个rm-id对应的RM节点的主机名 |
yarn.resourcemanager.address.{rm-id} | <rm-id.ip-address> | 配置每个rm-id对应的RM节点的IP地址,会覆盖yarn.resourcemanager.hostname.{rm-id}的值 |
yarn.resourcemanager.scheduler.address.{rm-id} | ${yarn.resourcemanager.hostname.{rm-id}}:8030 | 声明每个RM节点的scheduler服务地址,用于AM获取资源的通信 |
yarn.resourcemanager.resource-tracker.address.{rm-id} | ${yarn.resourcemanager.hostname.{rm-id}}:8031 | 声明每个RM节点的NM通信地址 |
yarn.resourcemanager.admin.address.{rm-id} | ${yarn.resourcemanager.hostname.{rm-id}}:8033 | 声明每个RM节点的管理命令连接地址 |
yarn.resourcemanager.webapp.address.{rm-id} | ${yarn.resourcemanager.hostname.{rm-id}}:8088 } 声明每个RM节点的web应用地址 | |
yarn.resourcemanager.webapp.https.address.{rm-id} | ${yarn.resourcemanager.hostname.{rm-id}}:8090 | 声明每个RM节点的HTTPS web应用访问地址 |
yarn.resourcemanager.ha.automatic-failover.enable | true | 开启自动错误恢复 |
yarn.resourcemanager.ha.automatic-failover.embeded | true | 开启内嵌的leader选举实现 |
yarn.resourcemanager.cluster-id | <cluster-id> | YARN集群ID |
RM Recovery
RM Restart机制能够实现RM的下线对客户无感,实现RM重新上线后,能够自动恢复在下线之前的应用的状态。
RM Restart的有两种策略:一种策略会将应用kill掉然后再拉起;另一种策略则不需要kill应用,而是通过重新获取NM及AM相关信息,从而重新构建RM的元数据。
属性 | 值 | 说明 |
---|---|---|
yarn.resourcemanager.recovery.enabled | true | 开启RM的Restart Recovery功能 |
yarn.resourcemanager.store.class | org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore | 配置RM状态存储实现类,ZKRMStateStore能够解决RM HA的脑裂问题 |
hadoop.zk.address | <zk-address>:<zk-port> | 这个是复用hadoop的core-site.xml中的配置 |
yarn.resourcemanager.zk-state-store.parent-path | /yarn/rmstore | 配置RM状态存储的ZK路径 |
yarn.resourcemanager.zk-state-store.root-node.acl | 无 | ZK节点访问控制配置 |
hadoop.zk.num-retries | 1000 | ZK连接尝试次数 |
hadoop.zk.retry-interval-ms | 1000 | 重新连接ZK的时间间隔 |
hadoop.zk.timeout-ms | 10000 | ZK会话超时时间。ZK的超时机制是由ZK Server出发的,当超过所配置时间的周期内没有接收到来自client的心跳时,即触发超时 |
yarn.resourcemanager.state-store.max-completed-applications | ${yarn.resourcemanager.max-completed-applications}(默认1000) | 配置在RM state中存储的最大完成app信息数量,该值需要小于等于${yarn.resourcemanager.max-completed-applications},以保证与YARN保持的应用状态一致。该值影响RM恢复的效率 |
yarn.app.attempt.diagnostics.limit.kc | 64 | 定义APP尝试诊断信息长度限制 |
yarn.resourcemanager.work-preserving-recovery.scheduling-wait-ms | 10000 | 设置RM在工作保存恢复上分配新容器之前的等待时间。在为应用程序分配新容器之前,这样的等待时间让RM有机会在恢复时与集群中的NMs重新同步。 |
资源容量配置
属性 | 值 | 说明 |
---|---|---|
yarn.nodemanager.resource.memory-mb | xxx | 配置NodeManager可分配的物理内存大小,如果配置为-1,同时${yarn.nodemanager.resource.detect-hardware-capabilities}为true,则YARN进行自动检测,默认默认为8G |
yarn.nodemanager.vmem-pmem-ratio | 2.1 | 配置虚拟内存与物理内存的占比,用于约束容器占用的虚拟内存的大小,即 |
yarn.nodemanager.container-executor.class | org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor | YARN支持三种内存控制模型:轮询机制(DefaultContainerExecutor)、通过CGroups进行严格内存控制以及通过CGoup进行弹性内存控制。其中轮询机制是一种遗留特性,是通过定期检视每个container的内存使用情况来决定是否执行kill操作的,这个官方文档说如果存在delay可能导致node挂掉;而第二种则是利用Linux的内核特性提供的OOM killer对超出内存限制的容器进行回收;第三种弹性内存机制则不会限制单个container的内存超用,只有当整个NM的内存使用情况超过了 |
yarn.nodemanager.resource.memory.enforced | true | 开启CGroups内存严格检查 |
yarn.nodemanager.pmem-check-enabled | true | 开启物理内存检查 |
yarn.nodemanager.resource.cpu-vcores | xxx | 配置NM能够使用的虚拟核数 |
资源调度配置
属性 | 值 | 说明 |
---|---|---|
yarn.resourcemanager.scheduler.class | org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler | 制定RM使用的调度器。YARN提供了两种调度器Capacity和Fair。他们的区别还没太搞清楚。所以暂时采用默认的。 |
yarn.resourcemanager.scheduler.address | ${yarn.resourcemanager.hostname}:8030 | 声明调度接口地址 |
yarn.resourcemanager.scheduler.client.thread-count | 50 | 声明处理调度请求的线程数量 |
yarn.scheduler.minimum-allocation-mb | 1024 | 声明每个Container请求的最小内存 |
yarn.scheduler.maximum-allocation-mb | 8192 | 声明每个Container请求的最大内存 |
yarn.scheduler.minimum-allocation-vcores | 1 | 声明每个Container请求的最小vcore数量 |
yarn.scheduler.maximum-allocation-vcores | 4 | 声明每个Container请求的最大vcore数量 |
超时时间控制
属性 | 值 | 说明 |
---|---|---|
yarn.am.liveness-monitor.expiry-interval-ms | 600000 | AM报告超时时间 |
yarn.resourcemanager.container.liveness-monitor.interval-ms | 600000 | 检查Container是否存活的间隔时间 |
yarn.nm.liveness-monitor.expiry-interval-ms | 600000 | NM监控超时时间 |
yarn.resourcemanager.rm.container-allocation.expiry-interval-ms | 600000 | 分配Container的超时时间 |
日志
YARN提供了日志聚合能力,能够将YARN上运行的Container产生的日志收集到指定位置,这样便于日志的查看。另外,YARN还提供了Timeline Server,提供了应用运行历史的查看及运行日志的链接。YARN提供了两个版本的Timeline Server。V1版本具有单点问题,V2解决了单点问题,但是需要依赖HBASE,而且依赖的HBase版本比较混乱,因此我们在实际验证中,没有进行验证,选择保留应用Timeline Server V1版本。虽然V1有单点问题,但是实际上应用运行的日志已经收集到了HDFS上,也就是日志数据是可靠的,即使Timeline Server V1服务挂掉了,我们依然可以基于一定规则找到相应的日志数据。
关于日志的配置如下。
属性 | 值 | 说明 |
---|---|---|
yarn.nodemanager.log-dirs | ${yarn.log.dir} | 指定NM的日志文件位置 |
yarn.log-aggregation-enabled | true | 开启日志聚合 |
yarn.log-aggregation.retain-seconds | 259200 | 配置聚合日志保留时长 |
yarn.nodemanager.remote-app-log-dir | hdfs://<hdfs-path> | 配置聚合日志存放位置 |
yarn.log.server.url | http://<ip>:<port>/applicationhistory/logs | 配置日志链接的地址 |
yarn.timeline-service.enabled | true | 开启timeline server服务 |
yarn.system-metrics-publisher.enabled | true | 开启发布系统指标 |
yarn.timeline-service.generic-application-history.enabled | true | 向客户端指定是否查询应用历史信息 |
yarn.timeline-service.version | 1.0f | 指定使用的timeline server版本号 |
yarn.timeline-service.hostname | 170.0.50.16 | 指定timeline服务地址 |
yarn.timeline-service.address | ${yarm.timeline-service.hostname}:10200 | 指定timeline server的RPC服务地址 |
yarn.timeline-service.webapp.address | ${yarn.timeline-service.hostname}:8188 | 配置Timeline Server Web应用地址 |
yarn.timeline-service.webapp.https.address | ${yarn.timeline-service.hostname}:8190 | 配置Timeline Server的HTTPS服务地址 |
yarn.timeline-service.bind-host | ${yarn.timeline-service.hostname} | 配置Timeline Server服务绑定地址【好像没啥用,需要验证】 |
yarn.timeline-service.store-class | org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore | 配置timeline server元数据存储实现类 |
yarn.timeline-service.leveldb-timeline-store.path | <file-path> | 配置timeline leveldb存储路径 |
yarn.timeline-service.recovery.enabled | true | 开启Timeline Server重启自动恢复 |
yarn.timeline-service.state-store-class | org.apache.hadoop.yarn.server.timeline.recovery.LeveldbTimelineStateStore | 配置Timeline Server State存储实现类 |
yarn.timeline-service.leveldb-state-store.path | <file-path> | 配置timeline server state存储路径 |
Label
Node Label能够实现将YARN节点进行分区管理,从而实现在提交应用时指定在哪里执行。通过Node Label同时能够实现ACL控制,以及与调度队列组合实现资源占比的配比。
在我们这个项目中,我们期望通过Node Label来实现在指定机器上执行Flink Job,但是经过验证后发现,虽然Node Label能够满足这种需要,但是由于我们的任务是long running的实时JOB,需要保证所有JOB必须能够运行起来,而如果给Node打上了Label,如果Label相关分区资源不足,虽然整个集群还有剩余资源,但是该应用还是没办法正常启动,所以在实际结论上,不会使用Label。
一下是关于Label的一些配置
属性 | 值 | 说明 |
---|---|---|
yarn.node-labels.enabled | true | 是否开启Node的label属性 |
yarn.node-labels.fs-store.root-dir | <hdfs-path>/<file-path> | 配置Node Label信息的存储路径 |
Label相关的命令:
Add cluster node labels list:
- Executing
yarn rmadmin -addToClusterNodeLabels "label_1(exclusive=true/false),label_2(exclusive=true/false)"
to add node label. - If user don’t specify “(exclusive=…)”, exclusive will be true by default.
- Run
yarn cluster --list-node-labels
to check added node labels are visible in the cluster.
Remove cluster node labels:
- To remove one or more node labels, execute the following command:
yarn rmadmin -removeFromClusterNodeLabels "<label>[,<label>,...]"
. The command argument should be a comma-separated list of node labels to remove. - It is not allowed to remove a label which has been associated with queues, i.e., one or more queues have access to this label.
- To verify if specified node labels have been successfully removed, run
yarn cluster --list-node-labels
.
Configuring nodes to labels mapping in Centralized NodeLabel setup
- Executing
yarn rmadmin -replaceLabelsOnNode “node1[:port]=label1 node2=label2” [-failOnUnknownNodes]
. Added label1 to node1, label2 to node2. If user don’t specify port, it adds the label to all NodeManagers running on the node. If option-failOnUnknownNodes
is set, this command will fail if specified nodes are unknown.
其他
属性 | 值 | 说明 |
---|---|---|
yarn.nodemanager.local-dirs | <file-path> | 配置NM本地文件路径,用于进行文件数据缓存 |
yarn.nodemanager.delete.debug-delay-sec | 600 | 配置应用完成后删除应用相关文件的延时时间,默认是0.在系统调试阶段,可以通过配置这个参数来保留应用产生的相关文件,这样便于进行问题排查。应用相关的文件位于 ${yarn.nodemanager.local-dirs}指定的路径下。 |
yarn.nodemanager.env-whitelist | JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_HOME,LANG,TZ,LD_LIBRARY_PATH,... | 指定YARN应用系统变量白名单,需要增加的系统变量可以在yarn-env.sh中添加 |
1.3.2 capacity-scheduler.xml
YARN提供了两种Scheduler:CapacityScheduler和FairScheduler。由于本次验证的Flink on YARN的场景是实时运行的Long-running的JOB,特点是必须保证所有的任务都能启动起来,所以核心逻辑还是需要底层硬件资源池能够cover住这些任务,所以跟调度策略不太匹配。本次只验证了一些简单的CapacityScheduler配置,如下。
属性 | 值 | 说明 | |
---|---|---|---|
yarn.scheduler.capacity.maximum-applications | 10000 | 配置最大并行应用数量 | |
yarn.scheduler.capacity.maximum-am-resource-percent | 1 | 配置AM容器能够使用的资源占比,也就是能够控制并行运行的应用数量 | |
yarn.scheduler.capacity.resource-calculator | org.apache.hadoop.yarn.util.resource.DominantResourceCalculator | 配置资源计算器的实现类,用于在调度器内进行资源比较计算。默认是DefaultResourceCalculator只使用内存;DominantResourceCalculator使用内存和CPU进行计算 | |
yarn.scheduler.capacity.root.queues | default | 声明调度器提供的队列 | |
yarn.scheduler.capacity.root.default.capacity | 100 | 声明default队列使用的资源占比 | |
yarn.scheduler.capacity.root.default.user-limit-factor | 1 | 声明默认队列针对每个用户使用资源的限制比例 | |
yarn.scheduler.capacity.root.default.maximum-capacity | 100 | 声明默认队列的最大资源占比 | |
yarn.scheduler.capacity.root.default.state | RUNNING | 声明默认队列的状态 | |
yarn.scheduler.capacity.root.default.acl_submit_applications | * | 配置向队列提交任务的ACL策略 | |
yarn.scheduler.capacity.root.default.acl_administer_queue | * | 配置管理队列的ACL策略 | |
yarn.scheduler.capacity.root.default.acl_application_max_priority | * | 配置带有优先级配置的提交应用操作的ACL策略,例如 [user={name} group={name} max_priority={priority} default_priority={priority}] | |
yarn.scheduler.capacity.root.default.maximum-application-lifetime | -1 | 配置每个应用的最大生存时间,如果该值小于等于0,则禁用该特性。如果应用存活的时间超过了这个时间(包含排队时间),则将会将杀掉这些应用 | |
yarn.scheduler.capacity.root.default.default-application-lifetime | -1 | 声明应用的默认存活时长,如果值小于等于0,则禁用该特性。该值不能超过${yarn.scheduler.capacity.root.default.maximum-application-lifetime} | |
yarn.scheduler.capacity.node-locality-delay | 40 | 声明进行rack-local调度尝试次数 | |
yarn.scheduler.capacity.rack-locality-additional-delay | -1 | 配置进行rack-local的额外尝试次数 | |
yarn.scheduler.capacity.queue-mappings | 配置队列映射规则,语法为 [u | g]:[name]:[queue_name][,next mapping]* | |
yarn.scheduler.capacity.queue-mappings-override.enable | false | 是否开启queue映射规则的覆写 | |
yarn.scheduler.capacity.per-node-heartbeat.maximum-offswitch-assignments | 1 | 控制OFF_SWITCH的数量【还不知道干什么用】 | |
yarn.scheduler.capacity.application.fail-fast | false | 当RM重启恢复之前的队列不可用时,是否强制RM恢复失败 | |
yarn.scheduler.capacity.workflow-priority-mappings | 配置应用优先级规则,语法为:[workflowId]:[full_queue_name]:[priority][,next_mapping]* | ||
yarn.scheduler.capacity.workflow-priority-mappings-override.enable | false | 是否开启优先级映射 | |
yarn.scheduler.capacity.root.accessible-node-labels | <label> | 配置队列能够访问的label | |
yarn.scheduler.capacity.root.accessible-node-labels.<lable>.capacity | 100 | 配置队列能够访问的label的资源比例 | |
yarn.scheduler.capacity.root.default.accessible-node-labels.<label>.capacity | 100 | 配置对立额能够访问的label的资源比例 |