mpi4py 中的非阻塞 I/O 操作

上一篇中我们介绍了 mpi4py 中读/写文件中数组的方法,下面我们将介绍 mpi4py 中的非阻塞 I/O 操作。

MPI 支持前面介绍过的所有读/写方法(包括使用独立文件指针和显式偏移地址的读/写方法,非集合和集合读/写方法)的非阻塞版本。非阻塞读/写方法的名称一般是在对应的阻塞读/写方法的名称前面加上 I,如 Iread/Iwrite,Iread_at/Iwrite_at 等。非阻塞读/写方法和非阻塞通信方法一样,调用完后会立即返回一个 MPI.Request 对象,可以使用其 Test,Wait 等方法来检测或等待其完成。使用非阻塞 I/O 操作的一大优势在于可以潜在地重叠或部分重叠 I/O 操作和同时发生的计算和通信。

非集合操作

下面是非集合的非阻塞读/写方法的使用接口,它们和对应的阻塞版本有几乎完全一样的参数(少了 status 参数,实际上是将 status 参数转移到了其返回的 MPI.Request 对象的相关方法上)。调用这些非阻塞方法会立即返回一个 MPI.Request 对象,其返回并不意味着本次 I/O 操作已经完成,必须使用其返回的 MPI.Request 对象的 Test,Wait 及其变种等方法来检测和等待其完成。

非阻塞独立文件指针读/写

MPI.File.Iread(self, buf)

MPI.File.Iwrite(self, buf)

非阻塞显式偏移地址

MPI.File.Iread_at(self, Offset offset, buf)

MPI.File.Iwrite_at(self, Offset offset, buf)

分步集合操作

对集合 I/O 操作,MPI 3.1 之前的版本只支持一种称作分步集合 I/O (split collective I/O) 的非阻塞集合 I/O 方法。这是一种带有一定限制性的非阻塞 I/O 操作。之所以称作分步集合 I/O,是因为其把集合访问文件拆分成了两个动作——启动集合访问和结束集合访问。要使用这种 I/O 称作,用户必须先调用一个 “begin” 方法,如 Read_all_begin,来启动集合 I/O 操作,然后在合适的地方调用一个匹配的 “恩典” 方法,如 Read_all_end,来完成该 I/O 操作。这种集合操作的限制在于,进程在某一时刻对某个文件句柄只允许有一个分步集合 I/O 操作处于活动状态。即使使用多线程,也不允许一个进程内同时有两个线程对同一个文件句柄并发地执行两个或更多个分步集合操作。也就是说当在某个文件句柄上调用了一个分步集合 I/O 的 “begin” 方法后,在没有调用其匹配的 “end” 方法结束该分步集合 I/O 操作之前,不能再在这个文件句柄上调用第二个分步集合 I/O 的 “begin” 方法。由于这一限制,一个分步集合 I/O 操作的 “begin” 方法不返回 MPI.Request 对象,也不返回其它结果(实际上其返回指为 None)。其紧接着的在同一个文件句柄上调用的 “end” 方法会匹配该分步集合 I/O 的 “begin”方法。MPI 标准允许 MPI 实现在 “begin” 调用时完成整个 I/O 操作,或是在 “end” 调用时完成整个 I/O 操作,也或者是在 “begin” 和 “end” 之间完成。另外,因为这些方法都是集合操作,因此必须由打开文件的进程组中的所有进程一起调用。分步集合 I/O 操作必须在 “begin” 和 “end” 方法中都指定相同的数据缓冲区参数(这样做的目的是为了防止某些编译器会优化运行时寄存器顺序而导致执行错误)。在分步集合 I/O 操作执行期间,即在 “begin” 和 “end” 方法之间,不能再对相同文件句柄并发地执行任何其他集合操作。如果使用多线程,必须在相同线程内调用一对匹配的 “begin” 和 “end” 操作。

下面是分步集合读/写方法的使用接口:

非阻塞独立文件指针读/写

MPI.File.Read_all_begin(self, buf)

MPI.File.Read_all_end(self, buf, Status status=None)

MPI.File.Write_all_begin(self, buf)

MPI.File.Write_all_end(self, buf, Status status=None)

非阻塞显式偏移地址

MPI.File.Read_at_all_begin(self, Offset offset, buf)

MPI.File.Read_at_all_end(self, buf, Status status=None)

MPI.File.Write_at_all_begin(self, Offset offset, buf)

MPI.File.Write_at_all_end(self, buf, Status status=None)

集合操作

在 MPI 3.1 中新增加了真正意义上的非阻塞集合 I/O 操作方法。这些方法的引入旨在最终替换掉上面介绍的分步集合 I/O 操作方法,因为它们克服了分步集合 I/O 的相关限制,比如说其允许在同一个文件句柄上的多个不同的非阻塞集合 I/O 操作重叠在一起,类似于下图所示。因此在支持这些方法的 MPI 环境下,应优先使用这些方法而不是分步集合 I/O 相关方法。

重叠非阻塞集合 I/O 操作

非阻塞独立文件指针读/写

MPI.File.Iread_all(self, buf)

MPI.File.Iwrite_all(self, buf)

非阻塞显式偏移地址

MPI.File.Iread_at_all(self, Offset offset, buf)

MPI.File.Iwrite_at_all(self, Offset offset, buf)

使用共享文件指针的非阻塞读/写操作

共享文件指针相关操作将在下一篇中介绍。

例程

下面给出使用例程。

# array_io.py

"""
Demonstrates the usage of nonblocking I/O.

Run this with 6 processes like:
$ mpiexec -n 6 python array_io.py
"""

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# create a p x q Cartesian process grid
p, q = 2, 3
cart_comm = comm.Create_cart([p, q])
# get the row and column coordinate of each process in the process grid
ri, ci = cart_comm.Get_coords(rank)

# the global array
m, n = 10, 12
global_ary = np.arange(m*n, dtype='i').reshape(m, n)
rs, re = (m/p)*ri, (m/p)*(ri+1) # start and end of row
cs, ce = (n/q)*ci, (n/q)*(ci+1) # start and end of column
# local array of each process
local_ary = np.ascontiguousarray(global_ary[rs:re, cs:ce])
print 'rank %d has local_ary with shape %s' % (rank, local_ary.shape)

filename = 'temp.txt'

# the etype
etype = MPI.INT

# construct filetype
gsizes = [m, n] # global shape of the array
distribs = [MPI.DISTRIBUTE_BLOCK, MPI.DISTRIBUTE_BLOCK] # block distribution in both dimensions
dargs = [MPI.DISTRIBUTE_DFLT_DARG, MPI.DISTRIBUTE_DFLT_DARG] # default distribution args
psizes = [p, q] # process grid in C order
filetype = MPI.INT.Create_darray(p*q, rank, gsizes, distribs, dargs, psizes)
filetype.Commit()

# -------------------------------------------------------------------------------
# use collective I/O or non-collective I/O if collective I/O is not implemented

# open the file for read and write, create it if it does not exist,
# and delete it on close
fh = MPI.File.Open(comm, filename, amode= MPI.MODE_CREATE | MPI.MODE_RDWR | MPI.MODE_DELETE_ON_CLOSE)

# set the file view
fh.Set_view(0, etype, filetype)

try:
    # collectively write the data to file
    req = fh.Iwrite_all(local_ary)
except NotImplementedError:
    print 'Iwrite_all not implemented, use Iwrite instead'
    # non-collectively write the data to file
    req = fh.Iwrite(local_ary)

# do some computatin or communication here during the nonblocking I/O operation
cnt = 0
while(not req.Test()):
    cnt += 1
print 'rank %d has cnt = %d' % (rank, cnt)

# reset file view
fh.Set_view(0, etype, etype)

# check what's in the file
if rank == 0:
    buf = np.zeros(m * n, dtype='i').reshape(m, n)
    req = fh.Iread_at(0, buf)
    req.Wait()
    assert np.allclose(buf, global_ary)

# close the file
fh.Close()


# -------------------------------------------------------------------------------
# use split collective I/O

# open the file for read and write, create it if it does not exist,
# and delete it on close
fh = MPI.File.Open(comm, filename, amode= MPI.MODE_CREATE | MPI.MODE_RDWR | MPI.MODE_DELETE_ON_CLOSE)

# set the file view
fh.Set_view(0, etype, filetype)

# begin the split collective write
fh.Write_all_begin(local_ary)

# do some computatin or communication here during the nonblocking I/O operation
for i in range(10):
    pass

# end the split collective write
fh.Write_all_end(local_ary)

# reset file view
fh.Set_view(0, etype, etype)

# check what's in the file
if rank == 0:
    buf = np.zeros(m * n, dtype='i').reshape(m, n)
    req = fh.Iread_at(0, buf)
    req.Wait()
    assert np.allclose(buf, global_ary)

# close the file
fh.Close()

运行结果如下:

$ mpiexec -n 6 python nonblocking_io.py
rank 2 has local_ary with shape (5, 4)
rank 3 has local_ary with shape (5, 4)
rank 4 has local_ary with shape (5, 4)
rank 5 has local_ary with shape (5, 4)
rank 0 has local_ary with shape (5, 4)
rank 1 has local_ary with shape (5, 4)
rank 5 has cnt = 679
rank 0 has cnt = 578
rank 2 has cnt = 833
rank 3 has cnt = 1988
rank 4 has cnt = 941
rank 1 has cnt = 1199

以上介绍了 mpi4py 中的非阻塞 I/O 操作,在下一篇中我们将介绍 mpi4py 中的共享文件指针 I/O 操作。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,826评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,968评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,234评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,562评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,611评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,482评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,271评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,166评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,608评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,814评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,926评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,644评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,249评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,866评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,991评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,063评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,871评论 2 354

推荐阅读更多精彩内容