mpi4py 快速上手

上一篇中我们介绍了如何安装和使用 mpi4py,下面我们以几个简单的例子来展示怎么使用 mpi4py 来进行并行编程,以使读者能够快速地上手使用 mpi4py。这些例子来自 mpi4py 的 Document,有些做了一些适当的改动。

点到点通信

传递通用的 Python 对象(阻塞方式)

这种方式非常简单易用,适用于任何可被 pickle 系列化的 Python 对象,但是在发送和接收端的 pickle 和 unpickle 操作却并不高效,特别是在传递大量的数据时。另外阻塞式的通信在消息传递时会阻塞进程的执行。

# p2p_blocking.py

from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    data = {'a': 7, 'b': 3.14}
    print 'process %d sends %s' % (rank, data)
    comm.send(data, dest=1, tag=11)
elif rank == 1:
    data = comm.recv(source=0, tag=11)
    print 'process %d receives %s' % (rank, data)

运行结果如下:

$ mpiexec -n 2 python p2p_blocking.py
process 0 sends {'a': 7, 'b': 3.14}
process 1 receives {'a': 7, 'b': 3.14}

传递通用的 Python 对象(非阻塞方式)

这种方式非常简单易用,适用于任何可被 pickle 系列化的 Python 对象,但是在发送和接收端的 pickle 和 unpickle 操作却并不高效,特别是在传递大量的数据时。非阻塞式的通信可以将通信和计算进行重叠从而大大改善性能。

# p2p_non_blocking.py

from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    data = {'a': 7, 'b': 3.14}
    print 'process %d sends %s' % (rank, data)
    req = comm.isend(data, dest=1, tag=11)
    req.wait()
elif rank == 1:
    req = comm.irecv(source=0, tag=11)
    data = req.wait()
    print 'process %d receives %s' % (rank, data)

运行结果如下:

$ mpiexec -n 2 python p2p_non_blocking.py
process 0 sends {'a': 7, 'b': 3.14}
process 1 receives {'a': 7, 'b': 3.14}

传递 numpy 数组(高效快速的方式)

对类似于数组这样的数据,准确来说是具有单段缓冲区接口(single-segment buffer interface)的 Python 对象,如 numpy 数组及内置的 bytes/string/array 等,可以用一种更为高效的方式直接进行传递,而不需要经过 pickle 系列化和恢复。以这种方式传递数据需要使用通信子对象的以大写字母开头的方法,如 Send(),Recv(),Bcast(),Scatter(),Gather() 等。

# p2p_numpy_array.py

import numpy
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()

# passing MPI datatypes explicitly
if rank == 0:
    data = numpy.arange(10, dtype='i')
    print 'process %d sends %s' % (rank, data)
    comm.Send([data, MPI.INT], dest=1, tag=77)
elif rank == 1:
    data = numpy.empty(10, dtype='i')
    comm.Recv([data, MPI.INT], source=0, tag=77)
    print 'process %d receives %s' % (rank, data)

# automatic MPI datatype discovery
if rank == 0:
    data = numpy.arange(10, dtype=numpy.float64)
    print 'process %d sends %s' % (rank, data)
    comm.Send(data, dest=1, tag=13)
elif rank == 1:
    data = numpy.empty(10, dtype=numpy.float64)
    comm.Recv(data, source=0, tag=13)
    print 'process %d receives %s' % (rank, data)

运行结果如下:

$ mpiexec -n 2 python p2p_numpy_array.py
process 0 sends [0 1 2 3 4 5 6 7 8 9]
process 1 receives [0 1 2 3 4 5 6 7 8 9]
process 0 sends [0. 1. 2. 3. 4. 5. 6. 7. 8. 9.]
process 1 receives [0. 1. 2. 3. 4. 5. 6. 7. 8. 9.]

集合通信

广播(Broadcast)

广播操作将根进程的数据复制到同组内其他所有进程中。

广播通用的 Python 对象

# bcast.py

from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    data = {'key1' : [7, 2.72, 2+3j],
            'key2' : ( 'abc', 'xyz')}
    print 'before broadcasting: process %d has %s' % (rank, data)
else:
    data = None
    print 'before broadcasting: process %d has %s' % (rank, data)

data = comm.bcast(data, root=0)
print 'after broadcasting: process %d has %s' % (rank, data)

运行结果如下:

$ mpiexec -n 2 python bcast.py
before broadcasting: process 0 has {'key2': ('abc', 'xyz'), 'key1': [7, 2.72, (2+3j)]}
after broadcasting: process 0 has {'key2': ('abc', 'xyz'), 'key1': [7, 2.72, (2+3j)]}
before broadcasting: process 1 has None
after broadcasting: process 1 has {'key2': ('abc', 'xyz'), 'key1': [7, 2.72, (2+3j)]}

广播 numpy 数组

# Bcast.py

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    data = np.arange(10, dtype='i')
    print 'before broadcasting: process %d has %s' % (rank, data)
else:
    data = np.zeros(10, dtype='i')
    print 'before broadcasting: process %d has %s' % (rank, data)

comm.Bcast(data, root=0)

print 'after broadcasting: process %d has %s' % (rank, data)

运行结果如下:

$ mpiexec -n 2 python Bcast.py
before broadcasting: process 0 has [0 1 2 3 4 5 6 7 8 9]
after broadcasting: process 0 has [0 1 2 3 4 5 6 7 8 9]
before broadcasting: process 1 has [0 0 0 0 0 0 0 0 0 0]
after broadcasting: process 1 has [0 1 2 3 4 5 6 7 8 9]

发散(Scatter)

发散操作从组内的根进程分别向组内其它进程散发不同的消息。

发散通用的 Python 对象

# scatter.py

from mpi4py import MPI


comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

if rank == 0:
    data = [ (i + 1)**2 for i in range(size) ]
    print 'before scattering: process %d has %s' % (rank, data)
else:
    data = None
    print 'before scattering: process %d has %s' % (rank, data)

data = comm.scatter(data, root=0)
print 'after scattering: process %d has %s' % (rank, data)

运行结果如下:

$ mpiexec -n 3 python scatter.py
before scattering: process 0 has [1, 4, 9]
after scattering: process 0 has 1
before scattering: process 1 has None
after scattering: process 1 has 4
before scattering: process 2 has None
after scattering: process 2 has 9

发散 numpy 数组

# Scatter.py

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

sendbuf = None
if rank == 0:
    sendbuf = np.empty([size, 10], dtype='i')
    sendbuf.T[:, :] = range(size)
print 'before scattering: process %d has %s' % (rank, sendbuf)

recvbuf = np.empty(10, dtype='i')
comm.Scatter(sendbuf, recvbuf, root=0)
print 'after scattering: process %d has %s' % (rank, recvbuf)

运行结果如下:

$ mpiexec -n 3 python Scatter.py
before scattering: process 0 has [[0 0 0 0 0 0 0 0 0 0]
[1 1 1 1 1 1 1 1 1 1]
[2 2 2 2 2 2 2 2 2 2]]
before scattering: process 1 has None
before scattering: process 2 has None
after scattering: process 0 has [0 0 0 0 0 0 0 0 0 0]
after scattering: process 2 has [2 2 2 2 2 2 2 2 2 2]
after scattering: process 1 has [1 1 1 1 1 1 1 1 1 1]

收集(Gather)

收集操作是发散的逆操作,根进程从其它进程收集不同的消息依次放入自己的接收缓冲区内。

收集通用的 Python 对象

# gather.py

from mpi4py import MPI


comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

data = (rank + 1)**2
print 'before gathering: process %d has %s' % (rank, data)

data = comm.gather(data, root=0)
print 'after scattering: process %d has %s' % (rank, data)

运行结果如下:

$ mpiexec -n 3 python gather.py
before gathering: process 0 has 1
after scattering: process 0 has [1, 4, 9]
before gathering: process 1 has 4
after scattering: process 1 has None
before gathering: process 2 has 9
after scattering: process 2 has None

收集 numpy 数组

# Gather.py

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

sendbuf = np.zeros(10, dtype='i') + rank
print 'before gathering: process %d has %s' % (rank, sendbuf)

recvbuf = None
if rank == 0:
    recvbuf = np.empty([size, 10], dtype='i')

comm.Gather(sendbuf, recvbuf, root=0)
print 'after gathering: process %d has %s' % (rank, recvbuf)

运行结果如下:

$ mpiexec -n 3 python Gather.py
before gathering: process 0 has [0 0 0 0 0 0 0 0 0 0]
after gathering: process 0 has [[0 0 0 0 0 0 0 0 0 0]
[1 1 1 1 1 1 1 1 1 1]
[2 2 2 2 2 2 2 2 2 2]]
before gathering: process 1 has [1 1 1 1 1 1 1 1 1 1]
after gathering: process 1 has None
before gathering: process 2 has [2 2 2 2 2 2 2 2 2 2]
after gathering: process 2 has None

最后让我们比较一下以小写字母开头的 send()/recv() 方法与以大写字母开头的 Send()/Recv() 方法在传递 numpy 数组时的性能差异。

比较 send()/recv() 和 Send()/Recv()

# send_recv_timing.pu

import time
import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    data = np.random.randn(10000).astype(np.float64)
else:
    data = np.empty(10000, dtype=np.float64)

comm.barrier()

# use comm.send() and comm.recv()
t1 = time.time()
if rank == 0:
    comm.send(data, dest=1, tag=1)
else:
    comm.recv(source=0, tag=1)
t2 = time.time()
if rank == 0:
    print 'time used by send/recv: %f seconds' % (t2 - t1)

comm.barrier()

# use comm.Send() and comm.Recv()
t1 = time.time()
if rank == 0:
    comm.Send(data, dest=1, tag=2)
else:
    comm.Recv(data, source=0, tag=2)
t2 = time.time()
if rank == 0:
    print 'time used by Send/Recv: %f seconds' % (t2 - t1)

运行结果如下:

$ mpiexec -n 2 python send_recv_timing.py
time used by send/recv: 0.000412 seconds
time used by Send/Recv: 0.000091 seconds

可以看出在代码几乎一样的情况下,以大写字母开头的 Send()/Recv() 方法对 numpy 数组的传递效率要高的多,因此在涉及 numpy 数组的并行操作时,应尽量选择以大写字母开头的通信方法。

以上通过几个简单的例子介绍了怎么在 Python 中利用 mpi4py 进行并行编程,可以看出 mpi4py 使得在 Python 中进行 MPI 并行编程非常容易,也比在 C、C++、Fortran 中调用 MPI 的应用接口进行并行编程要方便和灵活的多,特别是 mpi4py 提供的基于 pickle 的通用 Python 对象传递机制,使我们在编程过程中完全不用考虑所传递的数据类型和数据长度。这种灵活性和易用性虽然会有一些性能上的损失,但是在传递的数据量不大的情况下,这种性能损失是可以忽略的。当需要传递大量的数组类型的数据时,mpi4py 提供的以大写字母开头的通信方法使得数据可以以接近 C、C++、Fortran 的速度在不同的进程间高效地传递。对 numpy 数组,这种高效性却并不损失或很少损失其灵活性和易用性,因为 mpi4py 可以自动推断出 numpy 数组的类型及数据长度信息,因此一般情况下不用显式的指定。这给我们利用 numpy 的数组进行高性能的并行计算编程带来莫大的方便。

后面我们将详细地介绍 mpi4py 所提供的各种方法及其具体的用法。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容

  • 前言 计算机编程语言很多,但是适合高性能数值计算的语言却并不多,在高性能计算的项目中通常会使用到的语言有 Fort...
    自可乐阅读 19,686评论 3 22
  • 来源:NumPy Tutorial - TutorialsPoint 译者:飞龙 协议:CC BY-NC-SA 4...
    布客飞龙阅读 32,748评论 6 96
  • http://python.jobbole.com/85231/ 关于专业技能写完项目接着写写一名3年工作经验的J...
    燕京博士阅读 7,562评论 1 118
  • 感动,那个尿了裤子的爸爸 王建军 早上看了一张照片,触动了我心中最柔软的部分。 美国南犹他州一位父亲接到学校电话说...
    东营王建军阅读 690评论 2 5
  • 前几天在唯品会上抢了几盒面膜,就在昨天快递师傅送来了。收到快递的时候,在包装盒子上看到了这句话:“珍惜一切,就算没...
    夏玫小墨阅读 1,945评论 0 1