下述文章是参考知乎“枯芒草”写的"ray框架学习笔记"所写
原文网址:[Modern Parallel and Distributed Python: A Quick Tutorial on Ray | by Robert Nishihara | Towards Data Science]
(https://towardsdatascience.com/modern-parallel-and-distributed-python-a-quick-tutorial-on-ray-99f8d70369b8) [需要科学上网浏览]
2019年2月11日,Robert Nishihara 发表于Towards Data Science。
[水平有限,翻译的并不十分准确,如果有条件还是看原文比较好]
Modern Parallel and Distributed Python: A Quick Tutorial on Ray
A fast, simple framework for distributed applications
Ray 是一个用于并行和分布式 Python 的开源项目。
平行和分布式计算是现代应用的主要特征。我们需要利用多个核心或多台机器来加速应用程序或大规模运行它们。用于抓取网络和响应搜索查询的基础设施不是运行在某人笔记本电脑上的单线程程序,而是相互通信和交互的服务集合。
[pic.1]:云承诺在所有方向(内存、计算、存储等)都具有无限的可伸缩性。实现这一承诺需要新的工具来编写云和构建分布式应用程序
这篇文章将描述如何使用 Ray 轻松地构建可以从笔记本电脑扩展到大型集群的应用程序。
Why Ray?
许多教程解释了如何使用 Python 的多处理模块。遗憾的是,多处理模块在处理现代应用程序的需求方面受到严重限制。这些要求包括:
• 在多台计算机上运行相同的代码
• 构建具有状态且可以通信的微服务和参与者
• 优雅地处理机器故障
• 高效处理大型项目和海量数据
Ray 解决了所有这些问题,使简单的事情变得简单,使复杂的行为成为可能。
Necessary Concepts
传统编程依赖于两个核心概念: 函数和类。使用这些构建块,编程语言允许我们构建无数的应用程序。
但是,当我们将应用程序迁移到分布式设置时,概念通常会发生变化。
一方面,我们有 OpenMPI、 Python 多处理和 ZeroMQ 等工具,它们提供用于发送和接收消息的低级原语。这些工具非常强大,但它们提供了不同的抽象,因此必须从头开始重写单线程应用程序才能使用它们。
另一方面,我们有专门针对领域的工具,比如用于模型培训的 TensorFlow、用于数据处理和 SQL 的 Spark 以及用于流处理的 Flink。这些工具提供更高层次的抽象,如神经网络、数据集和流。但是,由于它们与串行编程所使用的抽象不同,因此必须重新编写应用程序以利用它们。
Ray 占据了一个独特的中间地带。而不是引入新的概念。Ray 获取函数和类的现有概念,并将它们作为任务和参与者转换为分布式设置。这种 API 选择允许串行应用程序并行化,而不需要进行重大修改。
Starting Ray
ray.init()
命令启动所有相关的 Ray 进程。在集群上,这是唯一需要更改的行(我们需要传递集群地址)。这些程序包括:
• 许多用于并行执行 Python 函数的工作进程(大约每个 CPU 内核一个工作进程)
• 为 workers (和其他机器)分配“任务”的调度程序进程。任务是 Ray 调度的工作单元,对应于一个函数调用或方法调用
• 共享内存对象存储,用于在工作线程之间高效共享对象(无需创建副本)
• 存储在机器出现故障时重新运行任务所需的元数据的内存数据库
与线程相比,Ray workers 是独立的进程,因为由于全局解释器锁 GIL 的限制,Python 对多线程的支持非常有限。
Parallelism with Tasks
为了将 Python 函数 f
转换为“远程函数”(可以远程异步执行的函数) ,我们使用@ray.remote
声明该函数。然后,通过 f.remote ()
进行的函数调用将立即返回未来(future 是对最终输出的引用) ,实际的函数执行将在后台进行(我们将此执行称为任务)。
'''在 Python 中运行并行任务的代码'''
import ray
import time
# Start Ray.
ray.init()
@ray.remote
def f(x):
time.sleep(1)
return x
# Start 4 tasks in parallel.
result_ids = []
for i in range(4):
result_ids.append(f.remote(i))
# Wait for the tasks to complete and retrieve the results.
# With at least 4 cores, this will take 1 second.
results = ray.get(result_ids) # [0, 1, 2, 3]
因为对 f.remote (i)
的调用立即返回,所以只需要运行该行四次,就可以并行执行 f
的四个副本。
Task Dependencies
任务还可以依赖于其他任务。接下来,multi_ Matrix
任务使用两个 create _ Matrix
任务的输出,因此在前两个任务执行完毕之前它不会开始执行。前两个任务的输出将自动作为参数传递给第三个任务,期货将被替换为相应的值)。通过这种方式,可以将任务与任意 DAG 依赖关系组合在一起。
'''说明三个任务的代码,其中第三个任务取决于前两个任务的输出'''
import numpy as np
@ray.remote
def create_matrix(size):
return np.random.normal(size=size)
@ray.remote
def multiply_matrices(x, y):
return np.dot(x, y)
x_id = create_matrix.remote([1000, 1000])
y_id = create_matrix.remote([1000, 1000])
z_id = multiply_matrices.remote(x_id, y_id)
# Get the results.
z = ray.get(z_id)
Aggregating Values Efficiently
任务依赖关系可以以更复杂的方式使用。例如,假设我们希望将8个值聚合在一起。此示例使用整数相加,但在许多应用程序中,跨多台机器聚合大型向量可能是一个瓶颈。在这种情况下,更改单行代码可以将聚合的运行时间从线性变为聚合值数量的对数。
如上所述,要将一个任务的输出作为输入提供给后续任务,只需将第一个任务返回的将来作为参数传递给第二个任务。Ray 的调度程序将自动考虑这个任务依赖关系。第二个任务在第一个任务完成之前不会执行,第一个任务的输出将自动发送到正在执行第二个任务的机器上。
'''以线性方式与以树结构方式聚合值的代码'''
import time
@ray.remote
def add(x, y):
time.sleep(1)
return x + y
# Aggregate the values slowly. This approach takes O(n) where n is the
# number of values being aggregated. In this case, 7 seconds.
id1 = add.remote(1, 2)
id2 = add.remote(id1, 3)
id3 = add.remote(id2, 4)
id4 = add.remote(id3, 5)
id5 = add.remote(id4, 6)
id6 = add.remote(id5, 7)
id7 = add.remote(id6, 8)
result = ray.get(id7)
# Aggregate the values in a tree-structured pattern. This approach
# takes O(log(n)). In this case, 3 seconds.
id1 = add.remote(1, 2)
id2 = add.remote(3, 4)
id3 = add.remote(5, 6)
id4 = add.remote(7, 8)
id5 = add.remote(id1, id2)
id6 = add.remote(id3, id4)
id7 = add.remote(id5, id6)
result = ray.get(id7)
上面的代码非常明确,但是请注意,可以使用 while
循环以更简洁的方式实现这两种方法。
'''
两个聚合方案的更简洁的实现。
这两个代码块之间的唯一区别是“ add.remote”的输出是放在列表的前面还是后面
'''
# Slow approach.
values = [1, 2, 3, 4, 5, 6, 7, 8]
while len(values) > 1:
values = [add.remote(values[0], values[1])] + values[2:]
result = ray.get(values[0])
# Fast approach.
values = [1, 2, 3, 4, 5, 6, 7, 8]
while len(values) > 1:
values = values[2:] + [add.remote(values[0], values[1])]
result = ray.get(values[0])
From Classes to Actors
在不使用类的情况下编写有趣的应用程序是具有挑战性的,在分布式设置中,这一点与在单个核上一样正确。
Ray 允许您使用 Python 类并使用@Ray.remote
声明它。远程装潢师。每当类被实例化时,Ray 就创建一个新的“ actor”,这是一个在集群中某处运行并保存对象副本的进程。对该参与者的方法调用转换为在该参与者进程上运行的任务,并且可以访问和更改该参与者的状态。通过这种方式,参与者允许在多个任务之间共享可变状态,而远程函数则不允许。
各个参与者按顺序执行方法(每个方法都是原子的) ,因此不存在竞态条件。并行可以通过创建多个参与者来实现。
'''将 Python 类实例化为参与者的代码示例'''
@ray.remote
class Counter(object):
def __init__(self):
self.x = 0
def inc(self):
self.x += 1
def get_value(self):
return self.x
# Create an actor process.
c = Counter.remote()
# Check the actor's counter value.
print(ray.get(c.get_value.remote())) # 0
# Increment the counter twice and check the value again.
c.inc.remote()
c.inc.remote()
print(ray.get(c.get_value.remote())) # 2
上面的例子是参与者最简单的可能用法。Counter.remote ()
行创建一个新的 Actor 进程,该进程具有 Counter
对象的副本。对 c.get_value.remote()
和 c.inc.remote()
的调用在远程执行组件进程上执行任务并改变执行组件的状态。
Actor Handles
在上面的示例中,我们只从主 Python 脚本调用了角色上的方法。角色最强大的一个方面是,我们可以将句柄传递给角色,这允许其他角色或其他任务调用同一个角色上的所有方法。
下面的示例创建一个存储消息的参与者。几个工作任务重复地将消息推送给参与者,主 Python 脚本定期读取消息。
'''从多个并发任务调用参与者方法的代码'''
import time
@ray.remote
class MessageActor(object):
def __init__(self):
self.messages = []
def add_message(self, message):
self.messages.append(message)
def get_and_clear_messages(self):
messages = self.messages
self.messages = []
return messages
# Define a remote function which loops around and pushes
# messages to the actor.
@ray.remote
def worker(message_actor, j):
for i in range(100):
time.sleep(1)
message_actor.add_message.remote(
"Message {} from worker {}.".format(i, j))
# Create a message actor.
message_actor = MessageActor.remote()
# Start 3 tasks that push messages to the actor.
[worker.remote(message_actor, j) for j in range(3)]
# Periodically get the messages and print them.
for _ in range(100):
new_messages = ray.get(message_actor.get_and_clear_messages.remote())
print("New messages:", new_messages)
time.sleep(1)
# This script prints something like the following:
# New messages: []
# New messages: ['Message 0 from worker 1.', 'Message 0 from worker 0.']
# New messages: ['Message 0 from worker 2.', 'Message 1 from worker 1.', 'Message 1 from worker 0.', 'Message 1 from worker 2.']
# New messages: ['Message 2 from worker 1.', 'Message 2 from worker 0.', 'Message 2 from worker 2.']
# New messages: ['Message 3 from worker 2.', 'Message 3 from worker 1.', 'Message 3 from worker 0.']
# New messages: ['Message 4 from worker 2.', 'Message 4 from worker 0.', 'Message 4 from worker 1.']
# New messages: ['Message 5 from worker 2.', 'Message 5 from worker 0.', 'Message 5 from worker 1.']
Actor 是非常强大的。它们允许您获取一个 Python 类,并将其实例化为一个微服务,可以从其他角色和任务甚至其他应用程序查询这个微服务。
任务和参与者是 Ray 提供的核心抽象。这两个概念非常普遍,可以用来实现复杂的应用程序,包括Ray的内置强化学习库、超参数调优、加速Pandas等等。
Learn More About Ray
- Check out the code on GitHub.
- View the Ray documentation.
- Ask and answer questions on the Ray forum.
- Check out libraries built with Ray for scaling reinforcement learning, scaling hyperparameter tuning, scaling model serving, and scaling data processing.