mpi4py 进阶之 mpiutil

上一篇中我们介绍了 mpi4py 的若干使用技巧,并且简要介绍了 caput 及其 mpiutil 模块,下面我们将介绍 mpiutil 中提供的若干方便和易用的函数,这些函数可以使我们更加方便地进行 Python 并行编程,并且使我们的程序很容易地做到兼容非 MPI 编程环境。

函数接口

以下介绍的所有函数都可以兼容非 MPI 环境(此时 _comm 为 None),当 mpi4py 可用时 _comm 是 MPI.COMM_WORLD,也可以传递一个其它的通信子,此时将在该通信子上执行相应的操作。

mpilist(full_list, method='con', comm=_comm)

将一个序列 full_list 按照方法 method 划分成 n (n 为 comm 的 size,当 comm 为 None 时, 其 size = 1) 份并分配给每个进程,每个进程分得的元素数目相等(如果可以均分)或相差 1 (如果不能均分,rank 较小的进程会多 1 个),返回每个进程得到的子序列。当 fullist 的元素数目少于进程数目时,后面的进程会因为分不到元素而返回一个空序列。method 的取值可以为:

  • 'con':连续划分,即 rank 较小的进程得到 full_list 中前面的元素,为默认值;
  • 'alt':交替划分,即 rank = 0 的进程得到位置为 [0, n, 2n, ...] 的元素, rank == 1 的进程得到位置为 [1, n+1, 2n+1, ...] 的元素, 等;
  • 'rand':随机划分,效果相当于对原系列随机重排后再按确定的方式分配给每个进程。

comm 可以是 None 或一个通信子对象,其默认值 _comm 为 None (mpi4py 不可用时) 或者 MPI.COMM_WORLD (mpi4py 可用时),也可以传递一个其它的通信子对象,此时将 full_list 划分给该通信子对象所包含的每个进程。

mpirange(*args, **kargs)

MPI 版本的 range 函数,参数同 range 函数,另外加可选的参数 method (默认值 con) 和 comm (默认值为 _comm),这两个参数的意思同上面介绍的 mpilist。这个函数的执行效果是 mpilist(range(*args), method, comm),即将 range 函数生成的序列作为 full_list 调用 mpilist 函数,返回每个进程得到的子序列。

barrier(comm=_comm)

栅障同步,参数 comm 的意义同函数 mpilist。

bcast(data, root=0, comm=_comm)

广播操作,数据从 root 进程广播到所有其它进程,参数 comm 的意义同函数 mpilist。

reduce(sendobj, root=0, op=None, comm=_comm)

规约操作,将数据按照方法 op (默认值 None 会执行 MPI.SUM) 规约到 root 进程,参数 comm 的意义同函数 mpilist。

allreduce(sendobj, op=None, comm=_comm)

全规约操作,将数据按照方法 op 作全规约,参数 comm 的意义同函数 mpilist。

gather_list(lst, root=None, comm=_comm)

将各个进程的列表 lst 收集到 root 进程中并合并成一个新的列表,如果 root 为一个整数,则只有 rank 为该整数的进程会收集到数据并返回合并的列表,其它进程返回 None,如果 root 为 None,则所有进程都会收集(全收集操作),每个进程都会返回合并的列表。参数 comm 的意义同函数 mpilist。

parallel_map(func, glist, root=None, method='con', comm=_comm)

将序列 glist 按照方法 method 划分给每个进程,然后将函数 func 作用到每个进程所得的子序列的每个元素上,函数的返回值会被收集到 root 中经合并后返回。如果 root 为一个整数,则只有 rank 为该整数的进程会收集到数据并返回合并的列表,其它进程返回 None,如果 root 为 None,则所有进程都会收集(全收集操作),每个进程都会返回合并的列表。参数 methodcomm 的意义同函数 mpilist。

split_all(n, comm=_comm)

将一个长度为 n 的序列顺序连续地划分给每个进程,返回一个三元组 (num, start, end),其中 num,start,end 都为长度为 comm 的 size (1 如果 comm 为 None)的 numpy 数组,分别给出每个进程分配到的元素数目,每个进程分配到的元素在原系列中的起始和结束位置。参数 comm 的意义同函数 mpilist。

split_local(n, comm=_comm)

将一个长度为 n 的序列顺序连续地划分给每个进程,返回一个三元组 (num, start, end),其中 num,start,end 都为整数,分别给出该进程自身分配到的元素数目,该进程分配到的元素在原系列中的起始和结束位置。参数 comm 的意义同函数 mpilist。

gather_local(global_array, local_array, local_start, root=0, comm=_comm)

将各个进程中的 numpy 数组 local_array 收集到 root 进程的 global_array 中,local_start 指明 local_array 放置在 global_array 中的起始位置,是一个长度为 global_array 维数的 tuple,其每一个元素指明放置在该维的起始位置。如果 root 为一个整数,则 local_array 只会被收集到 rank 为该整数的进程中,其它进程可以设置 global_array 为 None,如果 root 为 None,则 local_array 会被收集到所有进程中。参数 comm 的意义同函数 mpilist。

gather_array(local_array, axis=0, root=0, comm=_comm)

将各个进程中的 numpy 数组 local_array 沿着轴 axis 收集到 root 进程,合并成一个大的 numpy 数组后返回。如果 root 为一个整数,则 local_array 只会被收集到 rank 为该整数的进程中,其它进程会返回 None,如果 root 为 None,则 local_array 会被收集到所有进程中。参数 comm 的意义同函数 mpilist。

scatter_local(global_array, local_array, local_start, root=None, comm=_comm)

将 numpy 数组 global_array 散发到各个进程的 local_array 中,local_start 指明从 global_array 散发的起始位置,是一个长度为 global_array 维数的 tuple,其每一个元素指明该维的起始位置。如果 root 为一个整数,则只会从 rank 为该整数的进程的 global_array 中散发数据到所有其它进程中,因此其它进程的 global_array 可以为 None,如果 root 为 None,则每个进程会从各自的 global_array 中获取对应的数据放置到 local_array 中,因此一般要求每个进程的 global_array 都相同(但也可以不同)。参数 comm 的意义同函数 mpilist。

scatter_array(global_array, axis=0, root=None, comm=_comm)

将 numpy 数组 global_array 按照轴 axis 散发到各个进程,各个进程返回所得到的子数组。global_arrayaxis 轴会尽量均分,如果不能均分,则 rank 较小的进程会多 1,如果不够分,则 rank 最大的若干进程会返回空数组。如果 root 为一个整数,则只会从 rank 为该整数的进程的 global_array 中散发数据到所有其它进程中,因此其它进程的 global_array 可以为 None,如果 root 为 None,则每个进程会从各自的 global_array 中获取对应的数据,因此一般要求每个进程的 global_array 都相同(但也可以不同)。参数 comm 的意义同函数 mpilist。

例程

下面给出以上介绍的若干函数的使用例程。

# mpiutil_funcs.py

"""
Demonstrates the usage of mpilist, mpirange, bcast, gather_list, parallel_map,
split_all, split_local, gather_array, scatter_array.

Run this with 4 processes like:
$ mpiexec -n 4 python mpiutil_funcs.py
"""

import sys
import time
import numpy as np
from caput import mpiutil


rank = mpiutil.rank
size = mpiutil.size

sec = 5 # seconds to wait

def separator(sec, tag):
    # sleep, sync, and flush to avoid output of different parts being mixed
    time.sleep(sec)
    mpiutil.barrier()
    sys.stdout.flush()

    if rank == 0:
        print
        print '-' * 35 + ' ' + tag + ' ' + '-' * 35


# mpilist
separator(sec, 'mpilist')
full_list = [1, 2.5, 'a', True, (3, 4), {'x':1}]
local_list = mpiutil.mpilist(full_list)
print "rank %d has %s with method = 'con'" % (rank, local_list)
local_list = mpiutil.mpilist(full_list, method='alt')
print "rank %d has %s with method = 'alt'" % (rank, local_list)
local_list = mpiutil.mpilist(full_list, method='rand')
print "rank %d has %s with method = 'rand'" % (rank, local_list)

# mpirange
separator(sec, 'mpirange')
local_ary = mpiutil.mpirange(1, 7)
print "rank %d has %s with method = 'con'" % (rank, local_ary)
local_ary = mpiutil.mpirange(1, 7, method='alt')
print "rank %d has %s with method = 'alt'" % (rank, local_ary)
local_ary = mpiutil.mpirange(1, 7, method='rand')
print "rank %d has %s with method = 'rand'" % (rank, local_ary)

# bcast
separator(sec, 'bcast')
if rank == 0:
    sendobj = 'obj'
else:
    sendobj = None
sendobj = mpiutil.bcast(sendobj, root=0)
print 'rank %d has sendobj = %s after bcast' % (rank, sendobj)

# gather_list
separator(sec, 'gather_list')
if rank == 0:
    lst = [0.5, 2]
elif rank == 1:
    lst = ['a', False, 'xy']
elif rank == 2:
    lst = [{'x': 1}]
else:
    lst = []
lst = mpiutil.gather_list(lst, root=None)
print 'rank %d has %s after gather_list' % (rank, lst)

# parallel_map
separator(sec, 'parallel_map')
glist = range(6)
result = mpiutil.parallel_map(lambda x: x*x, glist, root=0)
if rank == 0:
    print 'result = %s' % result

# split_all
separator(sec, 'split_all')
print 'rank %d has: %s' % (rank, mpiutil.split_all(6))

# split_local
separator(sec, 'split_local')
print 'rank %d has: %s' % (rank, mpiutil.split_local(6))

# gather_array
separator(sec, 'gather_array')
if rank == 0:
    local_ary = np.array([[0, 1], [6, 7]])
elif rank == 1:
    local_ary = np.array([[2], [8]])
elif rank == 2:
    local_ary = np.array([[3], [9]])
if rank == 3:
    local_ary = np.array([[4, 5], [10, 11]])
global_ary = mpiutil.gather_array(local_ary, axis=1, root=0)
if rank == 0:
    print 'global_ary = %s' % global_ary

# scatter_array
separator(sec, 'scatter_array')
local_ary = mpiutil.scatter_array(global_ary, axis=1, root=0)
print 'rank %d has local_ary = %s' % (rank, local_ary)

运行结果如下:

$ mpiexec -n 4 python mpiutil_funcs.py
Starting MPI rank=3 [size=4]
Starting MPI rank=2 [size=4]
Starting MPI rank=0 [size=4]
Starting MPI rank=1 [size=4]

----------------------------------- mpilist -----------------------------------
rank 1 has ['a', True] with method = 'con'
rank 1 has [2.5, {'x': 1}] with method = 'alt'
rank 2 has [(3, 4)] with method = 'con'
rank 2 has ['a'] with method = 'alt'
rank 0 has [1, 2.5] with method = 'con'
rank 0 has [1, (3, 4)] with method = 'alt'
rank 3 has [{'x': 1}] with method = 'con'
rank 3 has [True] with method = 'alt'
rank 0 has [{'x': 1}, 'a'] with method = 'rand'
rank 3 has [True] with method = 'rand'
rank 1 has [2.5, (3, 4)] with method = 'rand'
rank 2 has [1] with method = 'rand'

----------------------------------- mpirange -----------------------------------
rank 1 has [3, 4] with method = 'con'
rank 1 has [2, 6] with method = 'alt'
rank 0 has [1, 2] with method = 'con'
rank 0 has [1, 5] with method = 'alt'
rank 2 has [5] with method = 'con'
rank 2 has [3] with method = 'alt'
rank 3 has [6] with method = 'con'
rank 3 has [4] with method = 'alt'
rank 3 has [3] with method = 'rand'
rank 2 has [2] with method = 'rand'
rank 0 has [5, 1] with method = 'rand'
rank 1 has [4, 6] with method = 'rand'

----------------------------------- bcast -----------------------------------
rank 1 has sendobj = obj after bcast
rank 3 has sendobj = obj after bcast
rank 2 has sendobj = obj after bcast
rank 0 has sendobj = obj after bcast

----------------------------------- gather_list -----------------------------------
rank 1 has [0.5, 2, 'a', False, 'xy', {'x': 1}] after gather_list
rank 3 has [0.5, 2, 'a', False, 'xy', {'x': 1}] after gather_list
rank 2 has [0.5, 2, 'a', False, 'xy', {'x': 1}] after gather_list
rank 0 has [0.5, 2, 'a', False, 'xy', {'x': 1}] after gather_list

----------------------------------- parallel_map -----------------------------------
result = [0, 1, 4, 9, 16, 25]

----------------------------------- split_all -----------------------------------
rank 3 has: [[2 2 1 1]
[0 2 4 5]
[2 4 5 6]]
rank 0 has: [[2 2 1 1]
[0 2 4 5]
[2 4 5 6]]
rank 2 has: [[2 2 1 1]
[0 2 4 5]
[2 4 5 6]]
rank 1 has: [[2 2 1 1]
[0 2 4 5]
[2 4 5 6]]

----------------------------------- split_local -----------------------------------
rank 1 has: [2 2 4]
rank 0 has: [2 0 2]
rank 2 has: [1 4 5]
rank 3 has: [1 5 6]

----------------------------------- gather_array -----------------------------------
global_ary = [[ 0  1  2  3  4  5]
[ 6  7  8  9 10 11]]

----------------------------------- scatter_array -----------------------------------
rank 0 has local_ary = [[0 1]
[6 7]]
rank 1 has local_ary = [[2 3]
[8 9]]
rank 3 has local_ary = [[ 5]
[11]]
rank 2 has local_ary = [[ 4]
[10]]

以上我们介绍了 mpiutil 中提供的若干方便和易用的函数,在下一篇中我们将介绍建立在 numpy array 基础上的并行分布式数组 MPIArray。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容