并行框架ray
ray使用的是共享内存的并行模型,ray使用自主研发的Plasma来管理共享内存,用Redis来管理对象,可以在代码或服务器中初始化集群。
一、ray的基本用法
-
启停
ray集群ray允许多台相同操作系统的电脑组合成一个大的并行集群,支持物理机、AWS等云服务器以及docker容器等方式搭建成集群。需要注意的是,每台机子的ray、Python的版本必须对应,否则会出现很多奇怪的问题。 但是,给每一台机器都装上相同的环境无疑是痛苦的,最优的解决方法是使用共享NFS服务器下的环境,可以实现机器规模的随意增删。- Python脚本启动或者连接已存在集群的方式
import ray # 启动或者连接集群 ray.init( address='auto', # 集群地址,只开了一个集群的话一般用 auto。当存在多个集群时,不能用auto,必须指定一个确定的地址与端口。例如:xxx.xxx.xxx.xxx:xxxx log_to_driver=False, # 设定不发送log给终端 dashboard_host='0.0.0.0', # dashboard提供了集群资源、并行任务对应的函数和状态 dashboard_port=8265, ) # 断开与集群的连接 ray.shutdown() # 如果是程序开启的集群,则会被关闭;如果是命令行开启的集群,则仅仅是当前程序退出集群。 - 命令行方式启动
# 命令行形式启动 ray start --head --port=6379 --include-dashboard=true --dashboard-host=0.0.0.0 --dashboard-port=8265 # 其中`head`为头结点的参数,不是头结点的结点不需这个参数 # 命令行形式关闭 ray stop # 但是注意,这句命令会关闭当前节点的所有ray实例,无论是不是连接在同一集群的。 - yaml脚本启动
# yaml脚本给定了一种十分方便的启停ray集群的方式,可定制性极高,且能一次开启与关闭多个节点。可以在github中找到 ray up -y xxxx.yaml # 关闭 ray down -y xxxx.yaml
- Python脚本启动或者连接已存在集群的方式
-
task
以函数为任务单位的并行,使用方法十分简单,只需要加上ray的装饰器即可。但是注意,只要是ray修饰过的方法,都必须使用
.remote()的方式来调用。import ray # 简单的task例子(必须存在已启动的集群) # 使用该修饰器来装饰 @ray.remote def test_ray(i): i = i + 1 print(i) return i """ 1. 可以通过列表解析的方式来进行一批任务的分配 2. ray修饰过后的函数必须通过.remote()的形式来调用 3. 调用函数的返回值仅仅是一个ray object的ID,用于当做进程号的标识,并不是最终结果 4. 具体的返回值必须使用ray.get()才能获得 """ result = ray.get([test_ray.remote(i) for i in range(10)]) print(result) # 由于列表解析存储的ray对象是顺序的,所以结果是顺序的,但从打印值可以 # 看出,执行时是随机的 """ Output: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] (pid=49221) 9 (pid=49239) 6 (pid=49219) 5 (pid=49222) 4 (pid=49225) 3 (pid=49254) 2 (pid=49254) 7 (pid=49254) 10 (pid=49244) 1 (pid=49244) 8 """ # 未调用get的remote只会是一个ray object print(test_ray.remote(1)) """ Output: ObjectRef(c6953afc4a9f69e9ffffffffffffffffffffffff0100000001000000) """ -
Actors
ray可以将一个类当做远程类,从而进行并行。但是要注意的是,每一个Actors类之间是并行的,但每一个Actors内部的所有任务却是
串行的。""" 1. 实例化也需要使用.remote的方式进行 """ @ray.remote class paralleler: def add(self, index, a, b): print('processor {} cal a + b: {}'.format(index, a+b)) def sub(self, index, a, b): print('processor {} cal a - b: {}'.format(index, a-b)) # 通过多个actors来并行 actors = [paralleler.remote() for i in range(4)] # 此处不get是因为每个列表元素是由两个ray object组成的元组,无法get [(actor.add.remote(index, i, j), actor.sub.remote(index, i, j)) for i, j in zip(range(2), range(2)) for index, actor in enumerate(actors)] # !! 要注意!! 当log_to_driver=False时,默认每个进程的输出都不会打印出来 """ Output: (pid=41311) processor 3 cal a + b: 0 (pid=41311) processor 3 cal a - b: 0 (pid=41311) processor 3 cal a + b: 2 (pid=41311) processor 3 cal a - b: 0 (pid=41307) processor 2 cal a + b: 0 (pid=41307) processor 2 cal a - b: 0 (pid=41307) processor 2 cal a + b: 2 (pid=41307) processor 2 cal a - b: 0 (pid=41319) processor 1 cal a + b: 0 (pid=41319) processor 1 cal a - b: 0 (pid=41319) processor 1 cal a + b: 2 (pid=41319) processor 1 cal a - b: 0 (pid=41305) processor 0 cal a + b: 0 (pid=41305) processor 0 cal a - b: 0 (pid=41305) processor 0 cal a + b: 2 (pid=41305) processor 0 cal a - b: 0 """ -
资源请求
可以向ray指定请求多少
cpu、GPU、内存、共享内存plasma、对象数据库redis。但后两者都只能在最初创建集群时设定,其余的可以在后面使用到时才设定。- 在初始化时指定计算资源
# 命令行启动 ray start --head --port=6379 --num-cpus=20# Python初始化 ray.init( address='auto', num_cpus=10 )- 代码计算中指定计算资源
# 在函数定义时请求资源,其中memory单位是Byte @ray.remote(num_cpus=0.5, num_gpus=1, memory=1024*21024*1024) def test_ray(i): i = i + 1 print(i) return i # 或者在远程调用时设置 ray.get([test_ray.options(num_cpus=0.5, num_gpus=1, memory=1024*21024*1024).remote(i) for i in range(10)])- 分配策略
from ray.util.placement_group import placement_group bundle1 = {"GPU": 2} bundle2 = {"extra_resource": 2} """ 策略一共有4种: 1. STRICT_PACK: 只在一个结点上完成运算 2. PACK: 所有提供的包都尽最大努力打包到一个节点上。如果严格打包不可行(即,某些捆绑包不适合该节点),则可以将捆绑包放置到其他节点节点上。 3. STRICT_SPREAD: 在多个节点上发放捆绑包 4. SPREAD:尽可能多地在多个节点上发放捆绑包 """ pg = placement_group([bundle1, bundle2], strategy="STRICT_PACK") @ray.remote(num_cpus=2) def f(): return True ray.get(f.options(placement_group=pg).remote()) # 使用资源分配策略 -
共享变量
ray的共享变量是基于
plasma和redis数据库的,前者将活跃的变量放于共享内存,而后者会将对象写入磁盘以供读取。两者均可以在初始化时被指定位置和大小。需要注意的是:共享变量的机制是,同一节点的共享变量只保留一份,可以多个任务随时访问,而不同节点间的数据(集群可见),则需要访问的结点会从数据存储的相应结点下载一份数据,再存入其本地的plasma,以供共享。
# 使用put和get就可以上传或者拉取变量 @ray.remote def test(num_id): print(num_id + 1) # ray id对象在远程函数可以当做正常变量使用 num = 30 num_id = ray.put(num) # 仅仅返回一个ray id print(num_id) print(ray.get(num_id)) # 取值 ray.get(test.remote(num_id)) # !! 但是,一旦被上传到共享变量的变量id,是read-only的,无论在外面还是 # 远程函数内都无法修改 """ Output: ObjectRef(ffffffffffffffffffffffffffffffffffffffff0100000001000000) 30 (pid=56473) 31 """
二、ray的优缺点:
-
优点:
- 代码及其简单,基本直接加上注解就可以并行了
- 集群管理相对方便很多,并且稳定
- 共享内存也减少了内存的使用量。
- 效率高,线程分配得当便会快很多。
-
缺点:
- 如果采用的是
task,只能并行函数,所以并行部分代码必须抽象为函数 - 采用
Actor类并行的话,每个对象内的函数操作是串行的,所以要提前设计分配好任务。 - 集群参数太多,而默认的最大内存和存储都不够,需要手动去调节
- 任务是分发式的,即会将任务分发给最有可能完成的结点去完成,而不会一整个函数它帮你识别需要并行的部分,这需要自己详细设计
- 缓存太多,会保存很多以前的缓存
- 共享内存的数据是read-only的,真正要修改还是只能通过返回值去接收,这致使需要额外的变量来存中间结果
- 大型任务发现加速并不明显。
- 所有环境的版本必须保持一致,否则无法正常运作
- 如果采用的是
三、细节:
-
分发任务是优先负载均衡地使用CPU:
- 当要求的CPU资源很多时,会尽量均衡地每个结点都分发接近数量的任务
- 如果要求资源很少时,优先使用工作节点的资源
plasma的共享object只会存储在当前的结点,同一节点的
ray.get()操作不会产生多于内存,不同节点则会传输完整的变量并上传到redis object stores。进行
ray.put()操作 得到的返回结果是一个ray对象的ID,其在注册了ray.remote解释器的函数中将会被当成普通对象使用;但是在正常函数中无法使用切片或索引,其余的操作还是可以的。ray进行ray.put()操作和return操作都会占用新的内存在同一结点中,只要已经存在了某个对象,那么对其
ray.get()并不会开辟新的内存。
四、问题记录:
-
[Errno 104]:
[Errno 104] Connection reset by peer ConnectionError: Error 104 while writing to socket. Connection reset by peer. """ 问题出现的时机:当单个对象的键或值大于512MB,尝试进行put操作,将对象存入redis数据库,超出了redis对象的限制时,会触发此错误 解决方案: 1. 拆分数据,使每个对象小于512MB(不好保证,但小了肯定正常) 2. 解除redis的限制(查看ray如何调用redis) """ -
如何打开dashboard来监测ray集群的使用情况:
# 必须主动设置包含dashboard,并且设置ip为0.0.0.0,表示所有ip都可以监测 ray start --head --port=6379 --include-dashboard=true --dashboard-host=0.0.0.0 --dashboard-port=8265 # !!! 注意:想要访问dashboard的结点在ray.init()时也必须包含dashboard,否则无法访问 ray.init( address='auto', # 集群地址,只开了一个的话一般用 auto log_to_driver=False, # 设定不发送log给终端 dashboard_host='0.0.0.0', dashboard_port=8265, ) -
如何指定plasma的object优先存在哪个结点呢:
- 只会保存在本地的plasma(所谓共享,指的都是单机的,不同的结点仍需要拷贝,只是说别的结点知道怎么拷贝、从哪里拷贝)
-
随着ray的运行,写入硬盘的内容逐渐增多,主要是以下原因:
- redis的对象写入了磁盘:可以试着指定对象位置
-
ray的对象在remote函数中传输:
- 明明已经将对象进行
ray.put()操作,得到了一个ID,但是在使用注册了ray.remote的函数时,如果次对象本身的占用内存很大,还是会传输很慢,甚至被redis拒绝通信,很想知道为什么会这样。
- 明明已经将对象进行