C++11使用std::packaged_task和lambda构建一个简单的线程池

本例使用std::packaged_task和lambda表达式构建了一个简单的线程池。
众所周知,线程池的核心就是一个后台运行的线程组和一个不断提交任务的threadsafe_queue。线程组不断从queue里面拿取任务,进行执行。
本例就是简单的线程池,没有做工作偷取线程这些,后面两节应该有。
本例是C++ Concurrency in Action一书的源码,但是原书附的源码有诸多错误,无法运行。
本例是一个能work的sample。
代码如下,
conanfile.txt

[requires]
boost/1.72.0

[generators]
cmake

CMakeLists.txt

cmake_minimum_required(VERSION 3.3)

project(9_3_parallel_accumulate_thread_pool)

set(ENV{PKG_CONFIG_PATH} "$ENV{PKG_CONFIG_PATH}:/usr/local/lib/pkgconfig/")

set ( CMAKE_CXX_FLAGS "-pthread")
set(CMAKE_CXX_STANDARD 17)
add_definitions(-g)

include(${CMAKE_BINARY_DIR}/conanbuildinfo.cmake)
conan_basic_setup()

include_directories(${INCLUDE_DIRS})
LINK_DIRECTORIES(${LINK_DIRS})

file( GLOB main_file_list ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp) 

foreach( main_file ${main_file_list} )
    file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${main_file})
    string(REPLACE ".cpp" "" file ${filename})
    add_executable(${file}  ${main_file})
    target_link_libraries(${file} ${CONAN_LIBS} pthread)
endforeach( main_file ${main_file_list})

threadsafe_queue.hpp

#ifndef _FREDRIC_THREAD_SAFE_QUEUE_HPP_
#define _FREDRIC_THREAD_SAFE_QUEUE_HPP_

#include <mutex>
#include <string>
#include <queue>
#include <memory>
#include <atomic>
#include <condition_variable>
#include <exception>

template <typename T>
class threadsafe_queue {
private:
    struct node {
        std::shared_ptr<T> data;
        std::unique_ptr<node> next;
    };

    std::mutex head_mutex;
    std::mutex tail_mutex;

    std::unique_ptr<node> head;
    node* tail;

    std::condition_variable data_cond;
    
    node* get_tail() {
        std::lock_guard<std::mutex> tail_lock(tail_mutex);
        return tail;
    }

    std::unique_ptr<node> pop_head() {
        std::unique_ptr<node> old_head = std::move(head);
        head = std::move(old_head->next);
        return old_head;
    }

    std::unique_lock<std::mutex> wait_for_data() {
        std::unique_lock<std::mutex> head_lock(head_mutex);
        data_cond.wait(head_lock, [&]() {
            return head.get() != get_tail();
        });

        return std::move(head_lock);
    }

    std::unique_ptr<node> wait_pop_head() {
        std::unique_lock<std::mutex> head_lock(wait_for_data());
        return pop_head();
    }

    std::unique_ptr<node> wait_pop_head(T& value) {
        std::unique_lock<std::mutex> head_lock(wait_for_data());
        value = std::move(*head->data);
        return pop_head();
    }

    std::unique_ptr<node> try_pop_head() {
        std::lock_guard<std::mutex> head_lock(head_mutex);
        if(head.get() == get_tail()) {
            return std::unique_ptr<node>();
        }
        return pop_head();
    }

    std::unique_ptr<node> try_pop_head(T& value) {
        std::lock_guard<std::mutex> head_lock(head_mutex);
        if(head.get() == get_tail()) {
            return std::unique_ptr<node>();
        }
        value = std::move(*head->data);
        return pop_head();
    } 

public:
    threadsafe_queue():
        head(new node), tail(head.get()) {}
    
    threadsafe_queue(threadsafe_queue const&) = delete;
    threadsafe_queue& operator=(threadsafe_queue const&) = delete;

    void push(T new_value) {
        std::shared_ptr<T> new_data(std::make_shared<T>(std::move(new_value)));
        std::unique_ptr<node> p (new node);
        {
            std::lock_guard<std::mutex> tail_lock(tail_mutex);
            tail->data = new_data;
            node* const new_tail = p.get();
            tail->next = std::move(p);
            tail = new_tail;
        }

        data_cond.notify_one();
    }

    std::shared_ptr<T> wait_and_pop() {
        std::unique_ptr<node> const old_head = wait_pop_head();
        return old_head->data;
    }

    void wait_and_pop(T& value) {
        wait_pop_head(value);
    }

    bool empty() {
        std::lock_guard<std::mutex> head_lock(head_mutex);
        return (head.get() == get_tail());
    }

    std::shared_ptr<T> try_pop() {
        std::unique_ptr<node> old_head = try_pop_head();
        return old_head ? old_head->data: std::shared_ptr<T>();
    }

    bool try_pop(T& value) {
        std::unique_ptr<node> old_head = try_pop_head(value);
        return old_head != nullptr;
    }
};

#endif

thread_pool.hpp

#ifndef _FREDRIC_THREAD_POOL_HPP_
#define _FREDRIC_THREAD_POOL_HPP_
#include "thread_safe_queue.hpp"
#include <thread>
#include <vector>
#include <atomic>
#include <functional>
#include <utility>
#include <future>
#include <utility>
#include <functional>
#include <memory>

struct join_threads {

    std::thread& operator[](int index) {
        return threads[index];
    }

    void add_thread(std::thread&& thread) {
        threads.emplace_back(std::move(thread));
    }

    ~join_threads() {
        for(std::thread& thread: threads) {
            if(thread.joinable()) {
                thread.join();
            }
        }
    }
private:
    std::vector<std::thread> threads;
};


class function_wrapper {
    struct impl_base {
        virtual void call() = 0;
        virtual ~impl_base() {}
    };

    template <typename F>
    struct impl_type: impl_base {
        F f;
        impl_type(F&& f_): f(std::move(f_)) {}

        void call() {
            f();
        }
    };
    
    std::unique_ptr<impl_base> impl;

public:
    function_wrapper() {}

    // 这个wrapper wrapper的是 packaged_task
    template <typename F>
    function_wrapper(F&& f):
        impl(new impl_type<F>(std::move(f))) {}

    void call() {
        impl->call();
    }

    function_wrapper(function_wrapper&& other): impl(std::move(other.impl)) {}

    function_wrapper& operator=(function_wrapper&& other) {
        impl = std::move(other.impl);
        return *this;
    }

    function_wrapper(function_wrapper const&) = delete;
    function_wrapper(function_wrapper&) = delete;
    function_wrapper& operator=(function_wrapper const&) = delete;
};

class thread_pool {
    std::atomic<bool> done;
    threadsafe_queue<function_wrapper> work_queue;
    join_threads joiner;

    void work_thread() {
        while(!done) {
           function_wrapper task;
            if(work_queue.try_pop(task)) {
                task.call();
            } else {
                std::this_thread::yield();
            }
        }
    }

public:
    thread_pool():
        done(false) {
        unsigned const thread_count = std::thread::hardware_concurrency();

        try {
            for(unsigned i=0; i<thread_count; ++i) {
                joiner.add_thread(std::thread(&thread_pool::work_thread, this));
            }
        } catch(...) {
            done = true;
            throw;
        }
    }

    ~thread_pool() {
        done = true;
    }

    template <typename FunctionType>
    std::future<typename std::result_of<FunctionType()>::type> submit(FunctionType f) {
        typedef typename std::result_of<FunctionType()>::type result_type;
        std::packaged_task<result_type()> task(std::move(f));
        std::future<result_type> res = task.get_future();
        work_queue.push(std::move(task));
        return res;
    }
};
#endif

main.cpp

#include "thread_pool.hpp"
#include <iostream>
#include <algorithm>
#include <numeric>


template <typename Iterator, typename T>
struct accumulate_block {
    T operator()(Iterator first, Iterator last) {
        return std::accumulate(first, last, T());
    }
};

template <typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init) {
    unsigned long const length = std::distance(first, last);

    if(!length) {
        return init;
    }

    unsigned long const block_size = 25;
    unsigned long const num_blocks = (length + block_size - 1)/block_size;
    std::vector<std::future<T>> futures(num_blocks - 1);

    thread_pool pool;
    Iterator block_start = first;
    for(unsigned long i=0; i<num_blocks - 1; ++i) {
        Iterator block_end = block_start;
        std::advance(block_end, block_size);
        // 这里不能传引用,否则会产生竟态
        futures[i] = pool.submit([=]() {
            return accumulate_block<Iterator, T>()(block_start, block_end);
        });
        block_start = block_end;
    }

    T last_result = accumulate_block<Iterator, T>()(block_start, last);
    T result = init;
    for(unsigned long i=0; i<num_blocks - 1; ++i) {
        result += futures[i].get();
    }

    result += last_result;
    return result;
}

int main(int argc, char* argv[]) {
    std::vector<int> v(100);
    for(std::size_t i=0; i<100; ++i) {
        v[i] = i+1;
    }

    int res = parallel_accumulate(v.begin(), v.end(), 0);
    std::cout << "1 + 2 + ... + 100 = " << res << std::endl;
    return EXIT_SUCCESS;
}

本例使用线程池实现了并发计算 1+2+3... +100的任务。
你也可以完成其他任务,思路就是先分批进行拆分,然后提交到线程池进行执行就可以。
执行完成之后有一个std::future对象可以用于获取结果。
程序执行效果如图,


image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容