在上一篇中我们介绍了 mpi4py 中的全规约操作方法,下面我们将介绍规约发散操作。
对组内通信子上的规约发散操作,首先对各个进程所保有的输入向量实施规约操作,再将结果向量散发到各个进程。相当于以某个进程为根,执行一次规约操作后跟一次散发操作。
对组间通信子上的规约发散操作,对与之相关联的组 group A 和 group B,将 A 中所有进程提供的数据的规约结果散发到 B 的进程中,反之亦然。
方法接口
mpi4py 中的规约发散操作的方法(MPI.Comm 类的方法)接口为:
Reduce_scatter_block(self, sendbuf, recvbuf, Op op=SUM)
Reduce_scatter(self, sendbuf, recvbuf, recvcounts=None, Op op=SUM)
注意:没有对应的以小写字母开头的方法。Reduce_scatter 相对于 Reduce_scatter_block 多了一个 recvcounts
参数,用以指定每个进程接收到的数据个数,每个进程接收到的数据量可以不同,因此 Reduce_scatter 的散发步骤实际上执行的是 Scatterv 操作,而 Reduce_scatter_block 在散发步骤则执行的是 Scatter 操作,即散发到各个进程的数据量相同。
对组内通信子对象的 Reduce_scatter_block 和 Reduce_scatter,可以将其 sendbuf
参数设置成 MPI.IN_PLACE,此时 recvbuf
将既作为发送缓冲区又作为接收缓冲区,每个进程将从 recvbuf
中提取数据,并将规约后的结果填充到 recvbuf
中。当结果短于 recvbuf
的容量时,只会填充其起始部分。
例程
下面给出全规约操作的使用例程。
# reduce_scatter.py
"""
Demonstrates the usage of Reduce_scatter_block, Reduce_scatter.
Run this with 4 processes like:
$ mpiexec -n 4 python reduce_scatter.py
"""
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
# ------------------------------------------------------------------------------
# reduce scatter a numpy array by using Reduce_scatter_block
send_buf = np.arange(8, dtype='i')
recv_buf = np.empty(2, dtype='i')
# first step: reduce
# rank 0 | 0 1 2 3 4 5 6 7
# rank 1 | 0 1 2 3 4 5 6 7
# rank 2 | 0 1 2 3 4 5 6 7
# rank 3 | 0 1 2 3 4 5 6 7
# --------+-------------------------
# SUM | 0 4 8 12 16 20 24 28
# second step: scatter
# rank 0 | rank 1 | rank 2 | rank 3
# ---------+----------+---------+---------
# 0 4 | 8 12 | 16 20 | 24 28
comm.Reduce_scatter_block(send_buf, recv_buf, op=MPI.SUM)
print 'Reduce_scatter_block by SUM: rank %d has %s' % (rank, recv_buf)
# ------------------------------------------------------------------------------
# reduce scatter a numpy array by using Reduce_scatter_block with MPI.IN_PLACE
recv_buf = np.arange(8, dtype='i')
# with MPI.IN_PLACE, recv_buf is used as both send buffer and receive buffer
# the first two elements of recv_buf will be filled with the scattered results
comm.Reduce_scatter_block(MPI.IN_PLACE, recv_buf, op=MPI.SUM)
print 'Reduce_scatter_block by SUM with MPI.IN_PLACE: rank %d has %s' % (rank, recv_buf)
# ------------------------------------------------------------------------------
# reduce scatter a numpy array by using Reduce_scatter
send_buf = np.arange(8, dtype='i')
recvcounts = [2, 3, 1, 2]
recv_buf = np.empty(recvcounts[rank], dtype='i')
# first step: reduce
# rank 0 | 0 1 2 3 4 5 6 7
# rank 1 | 0 1 2 3 4 5 6 7
# rank 2 | 0 1 2 3 4 5 6 7
# rank 3 | 0 1 2 3 4 5 6 7
# --------+-------------------------
# SUM | 0 4 8 12 16 20 24 28
# second step: scatterv with [2, 3, 1, 2]
# rank 0 | rank 1 | rank 2 | rank 3
# ---------+----------+---------+---------
# 0 4 | 8 12 16 | 20 | 24 28
comm.Reduce_scatter(send_buf, recv_buf, recvcounts=[2, 3, 1, 2], op=MPI.SUM)
print 'Reduce_scatter by SUM: rank %d has %s' % (rank, recv_buf)
运行结果如下:
$ mpiexec -n 4 python reduce_scatter.py
Reduce_scatter_block by SUM: rank 0 has [0 4]
Reduce_scatter_block by SUM: rank 1 has [ 8 12]
Reduce_scatter_block by SUM: rank 3 has [24 28]
Reduce_scatter_block by SUM with MPI.IN_PLACE: rank 0 has [0 4 2 3 4 5 6 7]
Reduce_scatter by SUM: rank 0 has [0 4]
Reduce_scatter_block by SUM: rank 2 has [16 20]
Reduce_scatter_block by SUM with MPI.IN_PLACE: rank 2 has [16 20 2 3 4 5 6 7]
Reduce_scatter by SUM: rank 2 has [20]
Reduce_scatter_block by SUM with MPI.IN_PLACE: rank 1 has [ 8 12 2 3 4 5 6 7]
Reduce_scatter by SUM: rank 1 has [ 8 12 16]
Reduce_scatter_block by SUM with MPI.IN_PLACE: rank 3 has [24 28 2 3 4 5 6 7]
Reduce_scatter by SUM: rank 3 has [24 28]
以上我们介绍了 mpi4py 中的规约发散操作方法,在下一篇中我们将介绍全发散操作。