一、并行算法初步尝试
template<typename Iterator,typename T>
struct accumulate_block
{
void operator()(Iterator first,Iterator last,T& result)
{
result = std::accumulate(first, last, result);//累积所有任务块的总和
}
};
template<typename Iterator,typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
unsigned long const length = std::distance(first, last);//distance是计算两个iterator的距离。
if(!length)
return init;
unsigned long const min_per_thread = 25;
/*
* 用范围内元素的总数量(length + min_per_thread)除以线程(块)中最小任务数,从而确定启动线程的最大数量。
* 这样能避免无谓的计算资源的浪费。
* */
unsigned long const max_threads =
(length+min_per_thread-1) / min_per_thread;
unsigned long const hardware_threads =
std::thread::hardware_concurrency();//返回能同时并发在一个程序中的线程数量
/*
* 计算量的最大值和硬件支持线程数中,较小的值为启动线程的数量。
* 因为上下文频繁的切换会降低线程的性能,所以你肯定不想启动的线程数多于硬件支持的线程数量。
*
* 当 std::thread::hardware_concurrency() 返回0,你可以选择一个合适的数作为你的选择;在本例中,我选择了"2"。
* 也就是说,无论如何都会开启两个线程
* */
unsigned long const num_threads =
std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
//每个线程中处理的元素数量,是范围中元素的总量除以线程的个数得出的。
unsigned long const block_size = length / num_threads;
std::vector<T> results(num_threads);//用于存放当前能开启的线程中,每个线程处理的任务块
std::vector<std::thread> threads(num_threads-1);//存放总共能开启多少线程,因为在启动之前已经有了一个线程(主线程), 所以-1。
Iterator block_start = first;
for(unsigned long i=0; i < (num_threads-1); ++i)
{
Iterator block_end = block_start;
std::advance(block_end, block_size);//block_end迭代器指向当前任务块的末尾
threads[i] = std::thread(
accumulate_block<Iterator,T>(),
block_start, block_end, std::ref(results[i]) );//启动一个新线程为当前块累加结果
block_start = block_end;//当迭代器指向当前块的末尾时,启动下一个块
}
/*
* 启动所有线程后,线程会处理最终块的结果。对于分配不均,因为知道最终块是哪一个,那么这个块中有多少个元素就无所谓了。
* */
accumulate_block<Iterator,T>()(block_start,last,results[num_threads-1]);
/*
* 把容器中的thread对象逐个join
* */
std::for_each(threads.begin(),threads.end(),
std::mem_fn(&std::thread::join));
return std::accumulate(results.begin(),results.end(),init);//使用 std::accumulate 将所有结果进行累加
}
此程序的一些要求和缺点:
- 对于创建出results容器,需要保证T有默认构造函数。
- 当线程运行时,所有必要的信息都需要传入到线程中去,包括存储计算结果的位置。不过,有时候显得太麻烦,可以用识别线程的来解决,可以传递一个标识数,例如 i来标记线程在容器中的位置。不过,当需要标识的函数在调用栈的深层,同时其他线程也可调用该函数,那么标识数就会变的捉襟见肘。因此,标准库为每个线程附加了唯一标识符。
二、识别线程
线程标识类型是 std::thread::id,有两种方式可以获取它:
- 可以通过调用 std::thread 对象的成员函数
get_id()
获取线程标识。 - 在当前线程中调用
std::this_thread::get_id()
获取线程标识。
std::thread::id 的对象可以自由的拷贝和对比:
- 如果两个对象的 std::thread::id 相等,那它们就是同一个线程,或者都“没有线程”。
- 如果不等,那么就代表了两个不同线程,或者一个有线程,另一没有。
C++标准库允许程序员将 std::thread::id 的对象做为容器的键值,同时也提供std::hash<std::thread::id> 容器,所以 std::thread::id 也可以作为无序容器的键值。
三、共享数据带来的问题
当一个或多个线程要修改共享数据时,就会产生很多麻烦。
例子:
从一个列表中删除一个节点的步骤如下(不变量为此双链表)
a. 找到要删除的节点N
b. 更新前一个节点指向N的指针,让这个指针指向N的下一个节点
c. 更新后一个节点指向N的指针,让这个指正指向N的前一个节点
d. 删除节点N
b) 和 c) 在相同的方向上指向和原来已经不一致了,修改了共享数据,同时也破坏了不变量。在并发程序中,极有可能因为其他线程访问刚刚被删除的数据,而引发错误:条件竞争。
四、条件竞争
条件竞争的定义
如上例子,并发中竞争条件的形成,取决于一个以上线程的相对执行顺序,每个线程都抢着完成自己的任务,也被称为 “恶性条件竞争” 。
C++标准也定义了数据竞争这个术语:并发的去修改一个独立对象,数据竞争时可怕的未定义行为的起因。
如何避免恶性条件竞争?
1. 通过加锁或者无锁的方式来避免
- 使用C++标准库提供的互斥量,确保只有进行修改的线程才能看到不变量被破坏时的中间状态。
- 对数据结构和不变量的设计进行修改,修改完的结构必须能完成一系列不可分割的变化,也就是 保证每个不变量保持稳定的状态, 这就是所谓的无锁编程,不过会使得程序变得很复杂。
2.使用事务的方式去处理数据结构的更新
- 所需的一些数据和读取都存储在事务日志中,然后将之前的操作合为一步,再进行提交。 当数据结构被另一个线程修改后,或处理已经重启的情况下,提交就会无法进行 , 这称作为“软件事务内存”(STM)。不过,C++没有对STM进行直接支持。