在上一篇中我们介绍了 MPI 中多线程的使用,下面我们将介绍 MPI-3 中线程安全的 Mprobe。
Probe 和 Iprobe 操作的作用是在不实际接收消息的情况下检查消息中包含的信息,据此决定接收消息的具体方式——如为待接收消息申请空间等。
但是 Probe 和 Iprobe 操作不是线程安全的,如在多线程中使用 Probe 或 Iprobe 来确定一个消息的来源和大小,然后接收该消息的操作就不是线程安全的,有可能会导致程序无限阻塞或发生错误。因此在多线程环境下,必须通过同步/互斥机制使得同一时刻仅有一个线程执行与期望通信相匹配的操作,如某个线程的接收操作使用 Probe 或 Iprobe 检测得到的 <源进程,tag> 参数时,不得再有其它线程使用同样的 <源进程,tag> 二元组去匹配同一个发送操作来接收数据。
线程安全性在上一篇中已经作过介绍,具体来说,线程安全性是指多个线程可以同时执行消息传递的相关调用而不会相互影响。MPI-3 中引进了线程安全的 Mprobe 和 Improbe 操作,该操作与一个新引进的对象 MPI.Message 绑定,该对象标识被匹配到的特定消息,某个消息一旦被 Mprobe 或 Improbe 匹配到,就不会再被其它的 Mprobe 或 Improbe 匹配到,也不会被相应的 Mrecv 接收到,因此可以在多线程环境下安全地使用,避免 Probe 和 Iprobe 的以上限制,增加程序的安全性并简化编程。被 Mprobe 或 Improbe 匹配到的消息必须由 MPI.Message.Recv 或 MPI.Message.Irecv 来接收,而不能以通常的 MPI.Comm.Recv 或 MPI.Comm.Irecv 来接收。
方法接口
非线程安全的 probe
MPI.Comm.probe(self, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)
检测与指定参数相匹配的消息。source
为消息源,tag
为消息 tag,status
如果非 None,为一个 MPI.Status 对象。可使用 MPI.ANY_SOURCE 和 MPI.ANY_TAG 来匹配任意消息。该函数为阻塞操作,直到检测到所匹配的消息才会结束阻塞而返回,此时如果传递了 status
参数,则会为其设置所检测到的消息的状态信息。
注意:此是以小写字母开头的方法,一般应该用来检测被 pickle 系列化的消息。
MPI.Comm.iprobe(self, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)
MPI.Comm.probe 的非阻塞版本,参数同。该函数被调用后会立即返回,并且可以对相同的消息检测多次,如果检测到所匹配的消息,会返回 True,否则返回 False。
注意:此是以小写字母开头的方法,一般应该用来检测被 pickle 系列化的消息。
MPI.Comm.Probe(self, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)
MPI.Comm.probe 的以大写字母开头的版本,为阻塞操作,一般应该用来检测以缓冲区形式发送的消息。
MPI.Comm.Iprobe(self, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)
MPI.Comm.iprobe 的以大写字母开头的版本,为非阻塞操作,一般应该用来检测以缓冲区形式发送的消息。
线程安全的 mprobe
线程安全的 mprobe/improbe/Mprobe/Improbe 都有两种等价的形式,一种为 MPI.Comm 类的方法, 另一种为 MPI.Message 类的类方法。这两种形式的区别是,前者直接在一个通信子对象上调用,后者需将通信子作为一个参数传人。两者都返回一个 MPI.Message 对象。注意:被 mprobe/improbe/Mprobe/Improbe 检测到的消息应该用 MPI.Message 类的接收方法 recv/irecv/Recv/Irecv 来接收,阻塞的检测方法可以用阻塞的接收方法也可以用非阻塞的接收方法,同样非阻塞的检测方法可以用阻塞的接收方法也可以用非阻塞的接收方法,但以小(大)写字母开头的检测方法一般应该与以小(大)写字母开头的接收方法配对使用。
下面是这些方法的使用接口:
MPI.Comm.mprobe(self, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)
MPI.Message.probe(type cls, Comm comm, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)
MPI.Message.recv(self, Status status=None)
阻塞的 mprobe 和阻塞的 recv,用于 pickle 系列化的消息。
MPI.Comm.improbe(self, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)
MPI.Message.iprobe(type cls, Comm comm, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)
MPI.Message.irecv(self)
非阻塞的 improbe 和非阻塞的 irecv,用于 pickle 系列化的消息。
MPI.Comm.Mprobe(self, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)
MPI.Message.Probe(type cls, Comm comm, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)
MPI.Message.Recv(self, buf, Status status=None)
阻塞的 Mprobe 和阻塞的 Recv,用于缓冲区消息。
MPI.Comm.Improbe(self, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)
MPI.Message.Iprobe(type cls, Comm comm, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)
MPI.Message.Irecv(self, buf)
非阻塞的 Improbe 和非阻塞的 Irecv,用于缓冲区消息。
例程
下面首先展示非线程安全的 probe 操作在多线程环境下可能导致的错误。在下面这个例程中,进程 1 的两个线程都使用 probe 来检测消息,然后接收检测到的消息。如果这两个线程在接收之前检测到同一个消息,则先执行接收操作的线程会成功接收到消息,而另一个线程再执行接收动作时,因为该消息已经被前一个线程接收,该线程可能会无限阻塞,或者发生错误。
# probe_error.py
"""
This is an example which shows the incorrect use of probe in
multi-threading environment.
Run this with 2 processes like:
$ mpiexec -n 2 python probe_error.py
"""
import sys
import time
import numpy as np
import threading
from mpi4py import MPI
if MPI.Query_thread() < MPI.THREAD_MULTIPLE:
sys.stderr.write("MPI does not provide enough thread support\n")
sys.exit(0)
comm = MPI.COMM_WORLD
rank = comm.rank
# -----------------------------------------------------------------------------------
# mprobe and recv
if rank == 0:
comm.send(11, dest=1, tag=11)
comm.send(22, dest=1, tag=22)
elif rank == 1:
def recv():
status = MPI.Status()
comm.probe(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
print '%s probed source = %d, tag = %d' % (threading.currentThread().getName(), status.source, status.tag)
time.sleep(0.5)
recv_obj = comm.recv(source=status.source, tag=status.tag)
print '%s receives %d from rank 0' % (threading.currentThread().getName(), recv_obj)
# create thread by using a function
recv_thread1 = threading.Thread(target=recv, name='[rank-%d recv_thread 1]' % rank)
recv_thread2 = threading.Thread(target=recv, name='[rank-%d recv_thread 2]' % rank)
# start the threads
recv_thread1.start()
recv_thread2.start()
# wait for complete
recv_thread1.join()
recv_thread2.join()
运行结果如下:
$ mpiexec -n 2 python probe_error.py
[rank-1 recv_thread 1] probed source = 0, tag = 11
[rank-1 recv_thread 2] probed source = 0, tag = 11
[rank-1 recv_thread 1] receives 11 from rank 0
以上例程中,线程 1 和 线程 2 都检测到 source 为 0,tag 为 11 的同一个消息,线程 1 成功接收该消息,线程 2 则无限阻塞。
下面给出线程安全的 mprobe 使用例程。
# mprobe.py
"""
Demonstrates the usage of mprobe.
Run this with 2 processes like:
$ mpiexec -n 2 python mprobe.py
"""
import sys
import numpy as np
import threading
from mpi4py import MPI
if MPI.Query_thread() < MPI.THREAD_MULTIPLE:
sys.stderr.write("MPI does not provide enough thread support\n")
sys.exit(0)
comm = MPI.COMM_WORLD
rank = comm.rank
# -----------------------------------------------------------------------------------
# mprobe and recv
if rank == 0:
comm.send(11, dest=1, tag=11)
comm.send(22, dest=1, tag=22)
elif rank == 1:
def recv():
status = MPI.Status()
msg = comm.mprobe(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
# msg = MPI.Message.probe(comm, source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
recv_obj = msg.recv()
print '%s receives %d from rank 0' % (threading.currentThread().getName(), recv_obj)
# create thread by using a function
recv_thread1 = threading.Thread(target=recv, name='[rank-%d recv_thread 1]' % rank)
recv_thread2 = threading.Thread(target=recv, name='[rank-%d recv_thread 2]' % rank)
# start the threads
recv_thread1.start()
recv_thread2.start()
# wait for complete
recv_thread1.join()
recv_thread2.join()
comm.barrier()
# ------------------------------------------------------------------------------------
# Mprobe and Recv
if rank == 0:
comm.Send(np.array([33, 33]), dest=1, tag=33)
comm.Send(np.array([44, 44]), dest=1, tag=44)
elif rank == 1:
def recv():
status = MPI.Status()
msg = comm.Mprobe(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
# msg = MPI.Message.Probe(comm, source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
recv_buf = np.array([-1, -1])
msg.Recv(recv_buf)
print '%s receives %s from rank 0' % (threading.currentThread().getName(), recv_buf)
# create thread by using a function
recv_thread1 = threading.Thread(target=recv, name='[rank-%d recv_thread 1]' % rank)
recv_thread2 = threading.Thread(target=recv, name='[rank-%d recv_thread 2]' % rank)
# start the threads
recv_thread1.start()
recv_thread2.start()
# wait for complete
recv_thread1.join()
recv_thread2.join()
comm.barrier()
# ------------------------------------------------------------------------------------
# improbe and irecv
if rank == 0:
comm.send(55, dest=1, tag=55)
comm.send(66, dest=1, tag=66)
elif rank == 1:
def recv():
status = MPI.Status()
msg = None
while not msg:
msg = comm.improbe(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
# msg = MPI.Message.iprobe(comm, source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
if msg is None:
print '%s improbe is not completed...' % threading.currentThread().getName()
req = msg.irecv()
recv_obj = req.wait()
print '%s receives %d from rank 0' % (threading.currentThread().getName(), recv_obj)
# create thread by using a function
recv_thread1 = threading.Thread(target=recv, name='[rank-%d recv_thread 1]' % rank)
recv_thread2 = threading.Thread(target=recv, name='[rank-%d recv_thread 2]' % rank)
# start the threads
recv_thread1.start()
recv_thread2.start()
# wait for complete
recv_thread1.join()
recv_thread2.join()
comm.barrier()
# ------------------------------------------------------------------------------------
# Improbe and Irecv
if rank == 0:
comm.Send(np.array([77, 77]), dest=1, tag=77)
comm.Send(np.array([88, 88]), dest=1, tag=88)
elif rank == 1:
def recv():
status = MPI.Status()
msg = None
while not msg:
msg = comm.Improbe(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
# msg = MPI.Message.Iprobe(comm, source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
if msg is None:
print '%s Improbe is not completed...' % threading.currentThread().getName()
recv_buf = np.array([-1, -1])
req = msg.Irecv(recv_buf)
req.Wait()
print '%s receives %s from rank 0' % (threading.currentThread().getName(), recv_buf)
# create thread by using a function
recv_thread1 = threading.Thread(target=recv, name='[rank-%d recv_thread 1]' % rank)
recv_thread2 = threading.Thread(target=recv, name='[rank-%d recv_thread 2]' % rank)
# start the threads
recv_thread1.start()
recv_thread2.start()
# wait for complete
recv_thread1.join()
recv_thread2.join()
运行结果如下:
$ mpiexec -n 2 python mprobe.py
[rank-1 recv_thread 1] receives 11 from rank 0
[rank-1 recv_thread 2] receives 22 from rank 0
[rank-1 recv_thread 1] receives [33 33] from rank 0
[rank-1 recv_thread 2] receives [44 44] from rank 0
[rank-1 recv_thread 1] improbe is not completed...
[rank-1 recv_thread 1] receives 55 from rank 0
[rank-1 recv_thread 2] improbe is not completed...
[rank-1 recv_thread 2] receives 66 from rank 0
[rank-1 recv_thread 1] Improbe is not completed...
[rank-1 recv_thread 1] receives [77 77] from rank 0
[rank-1 recv_thread 2] Improbe is not completed...
[rank-1 recv_thread 2] receives [88 88] from rank 0
以上介绍了非线程安全的 Probe 和 MPI-3 中线程安全的 Mprobe,在下一篇中我们将介绍 MPI-3 中大的计数及相关函数。