PPL-并行模式库

PPL简介

并行模式库 (PPL) 提供命令式编程模型,以促进开发并发应用程序的可扩展性和易用性。 PPL 构建在并发运行时的计划和资源管理组件上。
通过提供并行作用于数据的泛型安全算法和容器,提高应用程序代码与基础线程机制之间的抽象级别。 使用 PPL 还可以开发通过为共享状态提供替代方案实现缩放的应用程序。

PPL 提供以下功能:

  • 任务并行︰ 一种工作在Windows线程池上,可以并行地执行多个任务的机制
  • 并行算法︰ 利用并发运行时(Concurrency Runtime ),并行处理数据集合的泛型算法
  • 并行容器和对象︰ 提供对其元素的安全并发访问的泛型容器类型

并行算法

并行模式库 (PPL) 提供了在数据集合并发执行工作的算法。 这些算法类似于标准模板库 (STL)所提供的。
并行算法由并发运行时中的现有功能组成。 例如, concurrency:: parallel_for 算法使用 concurrency:: structured_task_group 对象执行并行循环迭代。 parallel_for 算法以最佳方式工作分区给定可用数量的计算资源。

parallel_for 算法

简介

Concurrency:: parallel_for 算法重复地以并行方式执行相同的任务。 每个任务是由迭代值参数化。 当您具有不共享资源分布该循环的迭代循环主体时,此算法很有用。
parallel_for 算法以可最佳并行执行的方式对任务进行分区。 当工作负载不平衡时,此算法还会使用工作窃取算法和范围窃取来平衡这些分区。 当某个循环迭代以协作方式被阻塞时,则运行时将原本分配当前线程的迭代范围重新分配给其他线程或处理器。 同样,当某个线程完成迭代范围,则运行时将原本属于其他线程的工作重新分配给该线程。 parallel_for 算法还支持 嵌套并行。 当一个并行循环中包含另一个并行循环时,运行时之间以高效并行执行的方式协调处理循环主体的资源。

parallel_for 算法有多个重载版本。 第一个版本采用起始值、 结束值和工作函数 (lambda 表达式、 函数对象或函数指针)。 第二个版本所依据采用起始值、 结束值、 一个步长值和工作函数。 此函数的第一个版本使用 1 作为步长值。 其余版本采用分区程序对象,使您能够指定 parallel_for如何在线程之间对范围进行分区。 分区程序在 分区工作 中有更详细地介绍。
您可以将许多 for 循环,转换成 parallel_for。 但是, parallel_for 算法与 for 语句以下不同点︰

  • parallel_for 算法不再以预先确定的顺序执行这些任务。

  • parallel_for 算法不支持任意终止条件。 parallel_for 当迭代变量的当前值小于上一次时,并行算法就停止了。

  • _Index_type 类型参数必须是整数类型。 此整型可以有符号或无符号。

  • 循环迭代必须前滚。 如果 _Step 参数是小于 1,parallel_for 算法将引发类型的异常 std::invalid_argument

  • parallel_for 算法的异常处理机制不同于 for 循环。 如果在并行循环主体中同时发生多个异常,运行时仅会将一个异常传播到调用parallel_for的线程。 此外,当某个循环迭代抛出异常,则运行时不会立即停止整个循环。 相反,该循环处于已取消状态,并且运行时会丢弃尚未启动的任何任务。 有关异常处理和并行算法的详细信息,请参阅 异常处理

尽管 parallel_for 算法不支持任意终止条件,但你可以使用取消以停止所有的任务。 有关取消的详细信息,请参阅 取消。

注意
对负载均衡和取消之类的功能的支持,可能会抵消并行执行循环主体可能带来的好处,尤其当循环体相对较小时。
您可以在并行循环中使用分区程序来尽量减少此开销。 有关详细信息,请参阅 分区工作

示例

下面的示例演示的基本结构 parallel_for 算法。 本示例向控制台打印范围 [1,5] 并行中的每个值。

// parallel-for-structure.cpp
// compile with: /EHsc
#include <ppl.h>
#include <array>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
   // Print each value from 1 to 5 in parallel.
   parallel_for(1, 6, [](int value) {
      wstringstream ss;
      ss << value << L' ';
      wcout << ss.str();
   });
}

该示例产生下面的示例输出︰
1 2 4 3 5

注意
因为 parallel_for 算法作用于并行每个项目,向控制台打印值的顺序将会不同。
有关完整的示例使用 parallel_for 算法,请参阅 如何︰ 编写 parallel_for 循环

parallel_for_each 算法

Concurrency:: parallel_for_each 算法对迭代容器(如 STL提供的)并行执行任务。 它跟parallel_for 算法一样使用相同的分区逻辑。

parallel_for_each 算法类似于 STL for_each 算法,不同之处在于 parallel_for_each 算法并发执行任务。 像其他并行算法一样, parallel_for_each 不会按特定顺序执行这些任务。

尽管 parallel_for_each 算法支持前向迭代器和随机访问迭代器,但随机访问迭代器表现更好。

示例

下面的示例演示parallel_for_each 算法的基本结构。 此示例并行地将 std:: array对象中的每个值打印到控制台 。

// parallel-for-each-structure.cpp
// compile with: /EHsc
#include <ppl.h>
#include <array>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
   // Create an array of integer values.
   array<int, 5> values = { 1, 2, 3, 4, 5 };

   // Print each value in the array in parallel.
   parallel_for_each(begin(values), end(values), [](int value) {
      wstringstream ss;
      ss << value << L' ';
      wcout << ss.str();
   });
}
/* Sample output:
   5 4 3 1 2
*/

输出如下:
4 5 1 2 3

注意
因为 parallel_for_each 算法并行作用于每个元素,向控制台打印值的顺序将会不同。
有关完整的示例使用 parallel_for_each 算法,请参阅 如何︰ 编写 parallel_for_each 循环

parallel_invoke 算法

Concurrency:: parallel_invoke 算法以并行方式执行一组任务。 每个任务完成之前不返回。 当您想要在同一时间执行多个独立的任务时,此算法很有用。

parallel_invoke 算法将一系列工作函数作为其参数 (lambda 函数、 函数对象或函数指针)。 parallel_invoke 算法被重载成以 2 到 10 个参数之间的形式。 传递给 parallel_invoke的每个函数必须都必须是不带参的。

像其他并行算法 parallel_invoke 不会按特定顺序执行这些任务。 任务并行 解释了parallel_invoke 算法如何关联不同的任务和任务组。

示例

下面的示例演示 parallel_invoke 算法的基本结构。 此示例用三个局部变量并发调用 twice函数,并输出结果到控制台。

// parallel-invoke-structure.cpp
// compile with: /EHsc
#include <ppl.h>
#include <string>
#include <iostream>

using namespace concurrency;
using namespace std;

// Returns the result of adding a value to itself.
template <typename T>
T twice(const T& t) {
   return t + t;
}

int wmain()
{
   // Define several values.
   int n = 54;
   double d = 5.6;
   wstring s = L"Hello";

   // Call the twice function on each value concurrently.
   parallel_invoke(
      [&n] { n = twice(n); },
      [&d] { d = twice(d); },
      [&s] { s = twice(s); }
   );

   // Print the values to the console.
   wcout << n << L' ' << d << L' ' << s << endl;
}

输出:
108 11.2 HelloHello

有关使用 parallel_invoke 算法的完整示例,请参阅 如何︰ 使用 parallel_invoke 来编写并行排序例程如何︰ 使用 parallel_invoke 来执行并行操作

parallel_transform 和 parallel_reduce 算法

Concurrency:: parallel_transformconcurrency:: parallel_reduce 算法分别是 STL 算法 std:: transformstd:: accumulate的并行版本。 并发运行时版本的行为类似于 STL 版本,只不过因为它们是并行执行的,所以操作顺序不确定。 当您使用的集合足够大,可从并行处理中获得性能和可扩展性优势时,请使用这些算法。
注意

因为这些迭代器会生成稳定的内存地址,所以 parallel_transform 算法和 parallel_reduce 算法仅支持随机访问、双向和向前迭代器。 
而且这些迭代器必须生成非 const 左值。

parallel_transform 算法

您可以使用 parallel transform 算法执行许多数据并行化操作。 例如,你可以:

  • 调整图像的亮度,并执行其他图像处理操作。

  • 在两个向量之间求和或取点积,并对向量执行其他数值计算。

  • 执行三维射线跟踪,其中每次迭代引用一个必须呈现的像素。

下面的示例显示用于调用 parallel_transform 算法的基本结构。 此示例中采用两种方式对std::vector对象中的每个元素求反。 第一种方法是使用 lambda 表达式。 第二种方法是使用 std::negate, ,它派生自 std::unary_function

// basic-parallel-transform.cpp
// compile with: /EHsc
#include <ppl.h>
#include <random>

using namespace concurrency;
using namespace std;

int wmain()
{
    // Create a large vector that contains random integer data.
    vector<int> values(1250000);
    generate(begin(values), end(values), mt19937(42));

    // Create a vector to hold the results.
    // Depending on your requirements, you can also transform the 
    // vector in-place.
    vector<int> results(values.size());

    // Negate each element in parallel.
    parallel_transform(begin(values), end(values), begin(results), [](int n) {
        return -n;
    });

    // Alternatively, use the negate class to perform the operation.
    parallel_transform(begin(values), end(values), begin(results), negate<int>());
}

注意
本示例演示 parallel_transform 的基本用法。 由于工作函数不会执行大量工作,因此本示例中不会有显著的性能提升。

parallel_transform 算法有两个重载。 第一个重载采用一个输入范围和一个一元函数。 该一元函数可以是采用一个参数的 lambda 表达式、一个函数对象或从 unary_function 派生的一个类型。 第二个重载采用两个输入范围和一个二元函数。 该二元函数可以采用两个参数、 函数对象或派生自的类型的 lambda 表达式 std:: binary_function。 下面的示例阐释了这些差异。

    //
    // Demonstrate use of parallel_transform together with a unary function.

    // This example uses a lambda expression.
    parallel_transform(begin(values), end(values), 
        begin(results), [](int n) { 
            return -n;
        });

    // Alternatively, use the negate class:
    parallel_transform(begin(values), end(values), 
        begin(results), negate<int>());

    //
    // Demonstrate use of parallel_transform together with a binary function.

    // This example uses a lambda expression.
    parallel_transform(begin(values), end(values), begin(results), 
        begin(results), [](int n, int m) {
            return n * m;
        });

    // Alternatively, use the multiplies class:
    parallel_transform(begin(values), end(values), begin(results), 
        begin(results), multiplies<int>());

注意
您为 parallel_transform 的输出提供的迭代器必须与输入迭代器完全重叠或根本不重叠。 如果输入迭代器和输出迭代器部分重叠,则此算法的行为是未指定的。

parallel_reduce 算法

当您具有满足关联属性的操作序列时,parallel_reduce 算法很有用。 (此算法不要求可交换属性。)以下是可以使用 parallel_reduce 执行的一些操作:

  • 将矩阵的序列相乘以生成一个矩阵。

  • 用矩阵序列乘以一个向量来生成一个向量。

  • 计算字符串序列的长度。

  • 将一个元素列表(例如字符串)组合为一个元素。

下面的基本示例演示如何使用 parallel_reduce 算法将一个字符串序列组合为一个字符串。 与 parallel_transform 的示例一样,此基本示例中不会有性能提升。

// basic-parallel-reduce.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include <string> 
#include <vector>

using namespace concurrency;
using namespace std;

int wmain()
{
    // Create a vector of strings.
    vector<wstring> words;
    words.push_back(L"Lorem ");
    words.push_back(L"ipsum ");
    words.push_back(L"dolor ");
    words.push_back(L"sit ");
    words.push_back(L"amet, ");
    words.push_back(L"consectetur ");
    words.push_back(L"adipiscing ");
    words.push_back(L"elit.");

    // Reduce the vector to one string in parallel.
    wcout << parallel_reduce(begin(words), end(words), wstring()) << endl;
}

/* Output:
   Lorem ipsum dolor sit amet, consectetur adipiscing elit.
*/

在许多情况下,您可以将 parallel_reduce 当成是parallel_for_eachconcurrency:: combinable 类一起使用的的简写形式。

示例︰ 并行执行映射和归约

一个 映射 操作将函数应用于序列中的每个值。 一个 归约 操作会将序列中的每一个元素组合成一个值。 可以使用标准模板库 (STL) std:: transformstd:: accumulate 类来执行映射和归约操作。 但是,对于许多问题,您可以使用 parallel_transform 算法并行执行映射操作,并使用 parallel_reduce 算法并行执行归约操作。

下面的示例将按串行方式计算质数和所需的时间与按并行方式计算质数和所需的时间进行比较。 映射阶段会将非质数值转换为 0,而归约阶段将对这些值求和。

// parallel-map-reduce-sum-of-primes.cpp
// compile with: /EHsc
#include <windows.h>
#include <ppl.h>
#include <array>
#include <numeric>
#include <iostream>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// Determines whether the input value is prime.
bool is_prime(int n)
{
   if (n < 2)
      return false;
   for (int i = 2; i < n; ++i)
   {
      if ((n % i) == 0)
         return false;
   }
   return true;
}

int wmain()
{   
   // Create an array object that contains 200000 integers.
   array<int, 200000> a;

   // Initialize the array such that a[i] == i.
   iota(begin(a), end(a), 0);

   int prime_sum;
   __int64 elapsed;

   // Compute the sum of the numbers in the array that are prime.
   elapsed = time_call([&] {
       transform(begin(a), end(a), begin(a), [](int i) { 
         return is_prime(i) ? i : 0; 
      });
      prime_sum = accumulate(begin(a), end(a), 0);
   });   
   wcout << prime_sum << endl;   
   wcout << L"serial time: " << elapsed << L" ms" << endl << endl;

   // Now perform the same task in parallel.
   elapsed = time_call([&] {
      parallel_transform(begin(a), end(a), begin(a), [](int i) { 
         return is_prime(i) ? i : 0; 
      });
      prime_sum = parallel_reduce(begin(a), end(a), 0);
   });
   wcout << prime_sum << endl;
   wcout << L"parallel time: " << elapsed << L" ms" << endl << endl;
}
/* Sample output:
   1709600813
   serial time: 7406 ms
   
   1709600813
   parallel time: 1969 ms
*/

注意
有关执行映射和化简操作的并行的另一个示例,请参阅 如何︰ 并行执行映射和归约操作

分区工作

若要在数据源上并行化一个操作,不可或缺的步骤为将操作源 分区成可被多线程并发操作的几块。 分区程序将指定并行算法如何在线程间划分区间范围。 如本文档前面所述,PPL 使用的是默认分区机制,该默认分区机制创建初始工作负荷并在工作负荷不平衡时使用工作窃取算法和范围窃取来平衡这些分区。 例如,当某个循环迭代完成一个迭代范围时,运行时会将其他线程的工作重新分配给该线程。 但是,在某些方案中,您可能希望指定另一个更适用于您的问题的分区机制。

parallel_forparallel_for_eachparallel_transform 算法提供有一附加参数 _Partitioner 的重载版本。 此参数定义了用于划分工作的分区程序类型。 以下是 PPL 定义的分区程序种类:

concurrency::affinity_partitioner
将工作划分为一个固定数量的范围(通常是可用于在循环中工作的辅助线程的数量)。 此分区程序类型与 static_partitioner 类似,但通过将范围映射到辅助线程的方式改善了缓存的关联。 当在相同数据集中多次执行一个循环(例如一个循环内的循环)且数据适合缓存时,此分区程序类型可提高性能。 此分区程序不完全适用取消。 它也不使用协作阻塞语义,因此不能与具有前向依赖关系的并行循环一起使用。

concurrency::auto_partitioner
将工作划分为一个初始数量的范围(通常是可用于在循环中工作的辅助线程的数量)。 当您不调用采用 _Partitioner 参数的重载的并行算法时,运行时默认使用此类型。 每个范围可以划分为子范围,从而实现负载平衡。 当一个工作范围完成时,运行时会将其他线程工作的子范围重新分配给该线程。 如果您的工作负荷不在另外一个类别下或者您需要完全支持取消或协作阻塞,请使用该分区程序。

concurrency::simple_partitioner
将工作划分到范围中,使每个范围至少拥有给定区块大小所指定的迭代的数目。 此分区程序类型加入了负载平衡;然而,运行时未将范围划分为子范围。 对于每个辅助,运行时将在 _Chunk_size 迭代完成后检查取消情况并执行负载平衡。

concurrency::static_partitioner
将工作划分为一个固定数量的范围(通常是可用于在循环中工作的辅助线程的数量)。 此分区程序类型不使用工作窃取,开销较小,可以一定程度上提高性能。当一个并行循环的每次迭代执行固定和统一数量的工作而且您不需要支持取消或前向协作阻塞时,请使用此分区程序类型。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,864评论 6 494
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,175评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,401评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,170评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,276评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,364评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,401评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,179评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,604评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,902评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,070评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,751评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,380评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,077评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,312评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,924评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,957评论 2 351

推荐阅读更多精彩内容