MPI-3 中线程安全的 Mprobe

上一篇中我们介绍了 MPI 中多线程的使用,下面我们将介绍 MPI-3 中线程安全的 Mprobe。

Probe 和 Iprobe 操作的作用是在不实际接收消息的情况下检查消息中包含的信息,据此决定接收消息的具体方式——如为待接收消息申请空间等。

但是 Probe 和 Iprobe 操作不是线程安全的,如在多线程中使用 Probe 或 Iprobe 来确定一个消息的来源和大小,然后接收该消息的操作就不是线程安全的,有可能会导致程序无限阻塞或发生错误。因此在多线程环境下,必须通过同步/互斥机制使得同一时刻仅有一个线程执行与期望通信相匹配的操作,如某个线程的接收操作使用 Probe 或 Iprobe 检测得到的 <源进程,tag> 参数时,不得再有其它线程使用同样的 <源进程,tag> 二元组去匹配同一个发送操作来接收数据。

线程安全性在上一篇中已经作过介绍,具体来说,线程安全性是指多个线程可以同时执行消息传递的相关调用而不会相互影响。MPI-3 中引进了线程安全的 Mprobe 和 Improbe 操作,该操作与一个新引进的对象 MPI.Message 绑定,该对象标识被匹配到的特定消息,某个消息一旦被 Mprobe 或 Improbe 匹配到,就不会再被其它的 Mprobe 或 Improbe 匹配到,也不会被相应的 Mrecv 接收到,因此可以在多线程环境下安全地使用,避免 Probe 和 Iprobe 的以上限制,增加程序的安全性并简化编程。被 Mprobe 或 Improbe 匹配到的消息必须由 MPI.Message.Recv 或 MPI.Message.Irecv 来接收,而不能以通常的 MPI.Comm.Recv 或 MPI.Comm.Irecv 来接收。

方法接口

非线程安全的 probe

MPI.Comm.probe(self, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)

检测与指定参数相匹配的消息。source 为消息源,tag 为消息 tag,status 如果非 None,为一个 MPI.Status 对象。可使用 MPI.ANY_SOURCE 和 MPI.ANY_TAG 来匹配任意消息。该函数为阻塞操作,直到检测到所匹配的消息才会结束阻塞而返回,此时如果传递了 status 参数,则会为其设置所检测到的消息的状态信息。

注意:此是以小写字母开头的方法,一般应该用来检测被 pickle 系列化的消息。

MPI.Comm.iprobe(self, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)

MPI.Comm.probe 的非阻塞版本,参数同。该函数被调用后会立即返回,并且可以对相同的消息检测多次,如果检测到所匹配的消息,会返回 True,否则返回 False。

注意:此是以小写字母开头的方法,一般应该用来检测被 pickle 系列化的消息。

MPI.Comm.Probe(self, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)

MPI.Comm.probe 的以大写字母开头的版本,为阻塞操作,一般应该用来检测以缓冲区形式发送的消息。

MPI.Comm.Iprobe(self, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)

MPI.Comm.iprobe 的以大写字母开头的版本,为非阻塞操作,一般应该用来检测以缓冲区形式发送的消息。

线程安全的 mprobe

线程安全的 mprobe/improbe/Mprobe/Improbe 都有两种等价的形式,一种为 MPI.Comm 类的方法, 另一种为 MPI.Message 类的类方法。这两种形式的区别是,前者直接在一个通信子对象上调用,后者需将通信子作为一个参数传人。两者都返回一个 MPI.Message 对象。注意:被 mprobe/improbe/Mprobe/Improbe 检测到的消息应该用 MPI.Message 类的接收方法 recv/irecv/Recv/Irecv 来接收,阻塞的检测方法可以用阻塞的接收方法也可以用非阻塞的接收方法,同样非阻塞的检测方法可以用阻塞的接收方法也可以用非阻塞的接收方法,但以小(大)写字母开头的检测方法一般应该与以小(大)写字母开头的接收方法配对使用。

下面是这些方法的使用接口:

MPI.Comm.mprobe(self, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)
MPI.Message.probe(type cls, Comm comm, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)

MPI.Message.recv(self, Status status=None)

阻塞的 mprobe 和阻塞的 recv,用于 pickle 系列化的消息。

MPI.Comm.improbe(self, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)
MPI.Message.iprobe(type cls, Comm comm, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)

MPI.Message.irecv(self)

非阻塞的 improbe 和非阻塞的 irecv,用于 pickle 系列化的消息。

MPI.Comm.Mprobe(self, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)
MPI.Message.Probe(type cls, Comm comm, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)

MPI.Message.Recv(self, buf, Status status=None)

阻塞的 Mprobe 和阻塞的 Recv,用于缓冲区消息。

MPI.Comm.Improbe(self, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)
MPI.Message.Iprobe(type cls, Comm comm, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)

MPI.Message.Irecv(self, buf)

非阻塞的 Improbe 和非阻塞的 Irecv,用于缓冲区消息。

例程

下面首先展示非线程安全的 probe 操作在多线程环境下可能导致的错误。在下面这个例程中,进程 1 的两个线程都使用 probe 来检测消息,然后接收检测到的消息。如果这两个线程在接收之前检测到同一个消息,则先执行接收操作的线程会成功接收到消息,而另一个线程再执行接收动作时,因为该消息已经被前一个线程接收,该线程可能会无限阻塞,或者发生错误。

# probe_error.py

"""
This is an example which shows the incorrect use of probe in
multi-threading environment.

Run this with 2 processes like:
$ mpiexec -n 2 python probe_error.py
"""

import sys
import time
import numpy as np
import threading
from mpi4py import MPI


if MPI.Query_thread() < MPI.THREAD_MULTIPLE:
    sys.stderr.write("MPI does not provide enough thread support\n")
    sys.exit(0)

comm = MPI.COMM_WORLD
rank = comm.rank

# -----------------------------------------------------------------------------------
# mprobe and recv
if rank == 0:
    comm.send(11, dest=1, tag=11)
    comm.send(22, dest=1, tag=22)
elif rank == 1:

    def recv():

        status = MPI.Status()
        comm.probe(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
        print '%s probed source = %d, tag = %d' % (threading.currentThread().getName(), status.source, status.tag)

        time.sleep(0.5)

        recv_obj = comm.recv(source=status.source, tag=status.tag)
        print '%s receives %d from rank 0' % (threading.currentThread().getName(), recv_obj)

    # create thread by using a function
    recv_thread1 = threading.Thread(target=recv, name='[rank-%d recv_thread 1]' % rank)
    recv_thread2 = threading.Thread(target=recv, name='[rank-%d recv_thread 2]' % rank)

    # start the threads
    recv_thread1.start()
    recv_thread2.start()

    # wait for complete
    recv_thread1.join()
    recv_thread2.join()

运行结果如下:

$ mpiexec -n 2 python probe_error.py
[rank-1 recv_thread 1] probed source = 0, tag = 11
[rank-1 recv_thread 2] probed source = 0, tag = 11
[rank-1 recv_thread 1] receives 11 from rank 0

以上例程中,线程 1 和 线程 2 都检测到 source 为 0,tag 为 11 的同一个消息,线程 1 成功接收该消息,线程 2 则无限阻塞。

下面给出线程安全的 mprobe 使用例程。

# mprobe.py

"""
Demonstrates the usage of mprobe.

Run this with 2 processes like:
$ mpiexec -n 2 python mprobe.py
"""

import sys
import numpy as np
import threading
from mpi4py import MPI


if MPI.Query_thread() < MPI.THREAD_MULTIPLE:
    sys.stderr.write("MPI does not provide enough thread support\n")
    sys.exit(0)

comm = MPI.COMM_WORLD
rank = comm.rank

# -----------------------------------------------------------------------------------
# mprobe and recv
if rank == 0:
    comm.send(11, dest=1, tag=11)
    comm.send(22, dest=1, tag=22)
elif rank == 1:

    def recv():

        status = MPI.Status()
        msg = comm.mprobe(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
        # msg = MPI.Message.probe(comm, source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)

        recv_obj = msg.recv()
        print '%s receives %d from rank 0' % (threading.currentThread().getName(), recv_obj)

    # create thread by using a function
    recv_thread1 = threading.Thread(target=recv, name='[rank-%d recv_thread 1]' % rank)
    recv_thread2 = threading.Thread(target=recv, name='[rank-%d recv_thread 2]' % rank)

    # start the threads
    recv_thread1.start()
    recv_thread2.start()

    # wait for complete
    recv_thread1.join()
    recv_thread2.join()


comm.barrier()

# ------------------------------------------------------------------------------------
# Mprobe and Recv
if rank == 0:
    comm.Send(np.array([33, 33]), dest=1, tag=33)
    comm.Send(np.array([44, 44]), dest=1, tag=44)
elif rank == 1:

    def recv():

        status = MPI.Status()
        msg = comm.Mprobe(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
        # msg = MPI.Message.Probe(comm, source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)

        recv_buf = np.array([-1, -1])
        msg.Recv(recv_buf)
        print '%s receives %s from rank 0' % (threading.currentThread().getName(), recv_buf)

    # create thread by using a function
    recv_thread1 = threading.Thread(target=recv, name='[rank-%d recv_thread 1]' % rank)
    recv_thread2 = threading.Thread(target=recv, name='[rank-%d recv_thread 2]' % rank)

    # start the threads
    recv_thread1.start()
    recv_thread2.start()

    # wait for complete
    recv_thread1.join()
    recv_thread2.join()


comm.barrier()

# ------------------------------------------------------------------------------------
# improbe and irecv
if rank == 0:
    comm.send(55, dest=1, tag=55)
    comm.send(66, dest=1, tag=66)
elif rank == 1:

    def recv():

        status = MPI.Status()
        msg = None
        while not msg:
            msg = comm.improbe(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
            # msg = MPI.Message.iprobe(comm, source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
            if msg is None:
                print '%s improbe is not completed...' % threading.currentThread().getName()

        req = msg.irecv()
        recv_obj = req.wait()
        print '%s receives %d from rank 0' % (threading.currentThread().getName(), recv_obj)

    # create thread by using a function
    recv_thread1 = threading.Thread(target=recv, name='[rank-%d recv_thread 1]' % rank)
    recv_thread2 = threading.Thread(target=recv, name='[rank-%d recv_thread 2]' % rank)

    # start the threads
    recv_thread1.start()
    recv_thread2.start()

    # wait for complete
    recv_thread1.join()
    recv_thread2.join()


comm.barrier()

# ------------------------------------------------------------------------------------
# Improbe and Irecv
if rank == 0:
    comm.Send(np.array([77, 77]), dest=1, tag=77)
    comm.Send(np.array([88, 88]), dest=1, tag=88)
elif rank == 1:

    def recv():

        status = MPI.Status()
        msg = None
        while not msg:
            msg = comm.Improbe(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
            # msg = MPI.Message.Iprobe(comm, source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
            if msg is None:
                print '%s Improbe is not completed...' % threading.currentThread().getName()

        recv_buf = np.array([-1, -1])
        req = msg.Irecv(recv_buf)
        req.Wait()
        print '%s receives %s from rank 0' % (threading.currentThread().getName(), recv_buf)

    # create thread by using a function
    recv_thread1 = threading.Thread(target=recv, name='[rank-%d recv_thread 1]' % rank)
    recv_thread2 = threading.Thread(target=recv, name='[rank-%d recv_thread 2]' % rank)

    # start the threads
    recv_thread1.start()
    recv_thread2.start()

    # wait for complete
    recv_thread1.join()
    recv_thread2.join()

运行结果如下:

$ mpiexec -n 2 python mprobe.py
[rank-1 recv_thread 1] receives 11 from rank 0
[rank-1 recv_thread 2] receives 22 from rank 0
[rank-1 recv_thread 1] receives [33 33] from rank 0
[rank-1 recv_thread 2] receives [44 44] from rank 0
[rank-1 recv_thread 1] improbe is not completed...
[rank-1 recv_thread 1] receives 55 from rank 0
[rank-1 recv_thread 2] improbe is not completed...
[rank-1 recv_thread 2] receives 66 from rank 0
[rank-1 recv_thread 1] Improbe is not completed...
[rank-1 recv_thread 1] receives [77 77] from rank 0
[rank-1 recv_thread 2] Improbe is not completed...
[rank-1 recv_thread 2] receives [88 88] from rank 0

以上介绍了非线程安全的 Probe 和 MPI-3 中线程安全的 Mprobe,在下一篇中我们将介绍 MPI-3 中大的计数及相关函数。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容

  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,444评论 25 707
  • 《圣经》里说:女人是男人的一根断掉的肋骨做的。在现实里,一个男人断了十根肋骨,该当如何?我在快递公司上班,有被人打...
    2668e9ad2f35阅读 475评论 6 2
  • 灰色调、成人无力的眼神、小孩天真的眼神、带得紧紧的口罩 这是这副画的重点 什么时候,我们不在只关心眼前和自己的利益...
    3_be阅读 223评论 0 0