Ray——Python并行学习框架

下述文章是参考知乎“枯芒草”写的"ray框架学习笔记"所写

原文网址:[Modern Parallel and Distributed Python: A Quick Tutorial on Ray | by Robert Nishihara | Towards Data Science]

作者信息

(https://towardsdatascience.com/modern-parallel-and-distributed-python-a-quick-tutorial-on-ray-99f8d70369b8) [需要科学上网浏览]

2019年2月11日,Robert Nishihara 发表于Towards Data Science

[水平有限,翻译的并不十分准确,如果有条件还是看原文比较好]

Modern Parallel and Distributed Python: A Quick Tutorial on Ray

A fast, simple framework for distributed applications

Ray 是一个用于并行和分布式 Python 的开源项目

平行和分布式计算是现代应用的主要特征。我们需要利用多个核心或多台机器来加速应用程序或大规模运行它们。用于抓取网络和响应搜索查询的基础设施不是运行在某人笔记本电脑上的单线程程序,而是相互通信和交互的服务集合。

[pic.1]:云承诺在所有方向(内存、计算、存储等)都具有无限的可伸缩性。实现这一承诺需要新的工具来编写云和构建分布式应用程序

这篇文章将描述如何使用 Ray 轻松地构建可以从笔记本电脑扩展到大型集群的应用程序。

Why Ray?

许多教程解释了如何使用 Python 的多处理模块。遗憾的是,多处理模块在处理现代应用程序的需求方面受到严重限制。这些要求包括:

• 在多台计算机上运行相同的代码

• 构建具有状态且可以通信的微服务和参与者

• 优雅地处理机器故障

• 高效处理大型项目和海量数据

Ray 解决了所有这些问题,使简单的事情变得简单,使复杂的行为成为可能。

pic.2

Necessary Concepts

传统编程依赖于两个核心概念: 函数和类。使用这些构建块,编程语言允许我们构建无数的应用程序。

但是,当我们将应用程序迁移到分布式设置时,概念通常会发生变化。

一方面,我们有 OpenMPI、 Python 多处理和 ZeroMQ 等工具,它们提供用于发送和接收消息的低级原语。这些工具非常强大,但它们提供了不同的抽象,因此必须从头开始重写单线程应用程序才能使用它们。

另一方面,我们有专门针对领域的工具,比如用于模型培训的 TensorFlow、用于数据处理和 SQL 的 Spark 以及用于流处理的 Flink。这些工具提供更高层次的抽象,如神经网络、数据集和流。但是,由于它们与串行编程所使用的抽象不同,因此必须重新编写应用程序以利用它们。

从低级原语到高级抽象的分布式轴计算工具

Ray 占据了一个独特的中间地带。而不是引入新的概念。Ray 获取函数和类的现有概念,并将它们作为任务和参与者转换为分布式设置。这种 API 选择允许串行应用程序并行化,而不需要进行重大修改。

Starting Ray

ray.init()命令启动所有相关的 Ray 进程。在集群上,这是唯一需要更改的行(我们需要传递集群地址)。这些程序包括:

• 许多用于并行执行 Python 函数的工作进程(大约每个 CPU 内核一个工作进程)

• 为 workers (和其他机器)分配“任务”的调度程序进程。任务是 Ray 调度的工作单元,对应于一个函数调用或方法调用

• 共享内存对象存储,用于在工作线程之间高效共享对象(无需创建副本)

• 存储在机器出现故障时重新运行任务所需的元数据的内存数据库

与线程相比,Ray workers 是独立的进程,因为由于全局解释器锁 GIL 的限制,Python 对多线程的支持非常有限。

Parallelism with Tasks

为了将 Python 函数 f 转换为“远程函数”(可以远程异步执行的函数) ,我们使用@ray.remote 声明该函数。然后,通过 f.remote () 进行的函数调用将立即返回未来(future 是对最终输出的引用) ,实际的函数执行将在后台进行(我们将此执行称为任务)。

'''在 Python 中运行并行任务的代码'''
import ray
import time

# Start Ray.
ray.init()

@ray.remote
def f(x):
    time.sleep(1)
    return x

# Start 4 tasks in parallel.
result_ids = []
for i in range(4):
    result_ids.append(f.remote(i))
    
# Wait for the tasks to complete and retrieve the results.
# With at least 4 cores, this will take 1 second.
results = ray.get(result_ids)  # [0, 1, 2, 3]

因为对 f.remote (i)的调用立即返回,所以只需要运行该行四次,就可以并行执行 f 的四个副本。

Task Dependencies

任务还可以依赖于其他任务。接下来,multi_ Matrix 任务使用两个 create _ Matrix 任务的输出,因此在前两个任务执行完毕之前它不会开始执行。前两个任务的输出将自动作为参数传递给第三个任务,期货将被替换为相应的值)。通过这种方式,可以将任务与任意 DAG 依赖关系组合在一起。

'''说明三个任务的代码,其中第三个任务取决于前两个任务的输出'''
import numpy as np

@ray.remote
def create_matrix(size):
    return np.random.normal(size=size)

@ray.remote
def multiply_matrices(x, y):
    return np.dot(x, y)

x_id = create_matrix.remote([1000, 1000])
y_id = create_matrix.remote([1000, 1000])
z_id = multiply_matrices.remote(x_id, y_id)

# Get the results.
z = ray.get(z_id)

Aggregating Values Efficiently

任务依赖关系可以以更复杂的方式使用。例如,假设我们希望将8个值聚合在一起。此示例使用整数相加,但在许多应用程序中,跨多台机器聚合大型向量可能是一个瓶颈。在这种情况下,更改单行代码可以将聚合的运行时间从线性变为聚合值数量的对数。

左边的依赖关系图的深度为7。右边的依赖关系图的深度为3。计算得到了相同的结果,但是右边的计算要快得多

如上所述,要将一个任务的输出作为输入提供给后续任务,只需将第一个任务返回的将来作为参数传递给第二个任务。Ray 的调度程序将自动考虑这个任务依赖关系。第二个任务在第一个任务完成之前不会执行,第一个任务的输出将自动发送到正在执行第二个任务的机器上。

'''以线性方式与以树结构方式聚合值的代码'''
import time

@ray.remote
def add(x, y):
    time.sleep(1)
    return x + y

# Aggregate the values slowly. This approach takes O(n) where n is the
# number of values being aggregated. In this case, 7 seconds.
id1 = add.remote(1, 2)
id2 = add.remote(id1, 3)
id3 = add.remote(id2, 4)
id4 = add.remote(id3, 5)
id5 = add.remote(id4, 6)
id6 = add.remote(id5, 7)
id7 = add.remote(id6, 8)
result = ray.get(id7)

# Aggregate the values in a tree-structured pattern. This approach
# takes O(log(n)). In this case, 3 seconds.
id1 = add.remote(1, 2)
id2 = add.remote(3, 4)
id3 = add.remote(5, 6)
id4 = add.remote(7, 8)
id5 = add.remote(id1, id2)
id6 = add.remote(id3, id4)
id7 = add.remote(id5, id6)
result = ray.get(id7)

上面的代码非常明确,但是请注意,可以使用 while 循环以更简洁的方式实现这两种方法。

'''
两个聚合方案的更简洁的实现。
这两个代码块之间的唯一区别是“ add.remote”的输出是放在列表的前面还是后面
'''
# Slow approach.
values = [1, 2, 3, 4, 5, 6, 7, 8]
while len(values) > 1:
    values = [add.remote(values[0], values[1])] + values[2:]
result = ray.get(values[0])


# Fast approach.
values = [1, 2, 3, 4, 5, 6, 7, 8]
while len(values) > 1:
    values = values[2:] + [add.remote(values[0], values[1])]
result = ray.get(values[0])

From Classes to Actors

在不使用类的情况下编写有趣的应用程序是具有挑战性的,在分布式设置中,这一点与在单个核上一样正确。

Ray 允许您使用 Python 类并使用@Ray.remote 声明它。远程装潢师。每当类被实例化时,Ray 就创建一个新的“ actor”,这是一个在集群中某处运行并保存对象副本的进程。对该参与者的方法调用转换为在该参与者进程上运行的任务,并且可以访问和更改该参与者的状态。通过这种方式,参与者允许在多个任务之间共享可变状态,而远程函数则不允许。

各个参与者按顺序执行方法(每个方法都是原子的) ,因此不存在竞态条件。并行可以通过创建多个参与者来实现。

'''将 Python 类实例化为参与者的代码示例'''
@ray.remote
class Counter(object):
    def __init__(self):
        self.x = 0
    
    def inc(self):
        self.x += 1
    
    def get_value(self):
        return self.x

# Create an actor process.
c = Counter.remote()

# Check the actor's counter value.
print(ray.get(c.get_value.remote()))  # 0

# Increment the counter twice and check the value again.
c.inc.remote()
c.inc.remote()
print(ray.get(c.get_value.remote()))  # 2

上面的例子是参与者最简单的可能用法。Counter.remote ()行创建一个新的 Actor 进程,该进程具有 Counter 对象的副本。对 c.get_value.remote()c.inc.remote() 的调用在远程执行组件进程上执行任务并改变执行组件的状态。

Actor Handles

在上面的示例中,我们只从主 Python 脚本调用了角色上的方法。角色最强大的一个方面是,我们可以将句柄传递给角色,这允许其他角色或其他任务调用同一个角色上的所有方法。

下面的示例创建一个存储消息的参与者。几个工作任务重复地将消息推送给参与者,主 Python 脚本定期读取消息。

'''从多个并发任务调用参与者方法的代码'''
import time


@ray.remote
class MessageActor(object):
    def __init__(self):
        self.messages = []
    
    def add_message(self, message):
        self.messages.append(message)
    
    def get_and_clear_messages(self):
        messages = self.messages
        self.messages = []
        return messages


# Define a remote function which loops around and pushes
# messages to the actor.
@ray.remote
def worker(message_actor, j):
    for i in range(100):
        time.sleep(1)
        message_actor.add_message.remote(
            "Message {} from worker {}.".format(i, j))


# Create a message actor.
message_actor = MessageActor.remote()

# Start 3 tasks that push messages to the actor.
[worker.remote(message_actor, j) for j in range(3)]

# Periodically get the messages and print them.
for _ in range(100):
    new_messages = ray.get(message_actor.get_and_clear_messages.remote())
    print("New messages:", new_messages)
    time.sleep(1)

# This script prints something like the following:
# New messages: []
# New messages: ['Message 0 from worker 1.', 'Message 0 from worker 0.']
# New messages: ['Message 0 from worker 2.', 'Message 1 from worker 1.', 'Message 1 from worker 0.', 'Message 1 from worker 2.']
# New messages: ['Message 2 from worker 1.', 'Message 2 from worker 0.', 'Message 2 from worker 2.']
# New messages: ['Message 3 from worker 2.', 'Message 3 from worker 1.', 'Message 3 from worker 0.']
# New messages: ['Message 4 from worker 2.', 'Message 4 from worker 0.', 'Message 4 from worker 1.']
# New messages: ['Message 5 from worker 2.', 'Message 5 from worker 0.', 'Message 5 from worker 1.']

Actor 是非常强大的。它们允许您获取一个 Python 类,并将其实例化为一个微服务,可以从其他角色和任务甚至其他应用程序查询这个微服务。

任务和参与者是 Ray 提供的核心抽象。这两个概念非常普遍,可以用来实现复杂的应用程序,包括Ray的内置强化学习库、超参数调优、加速Pandas等等。

Learn More About Ray

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,240评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,328评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,182评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,121评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,135评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,093评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,013评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,854评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,295评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,513评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,398评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,989评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,636评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,657评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352

推荐阅读更多精彩内容