C++简单实现线程池

一、线程池简介

  1. 什么是线程池

    线程池主要是用来复用线程,减少线程的频繁创建与销毁。

  2. 线程池的优点

    节省资源 吧

二、线程池的基本结构

  1. 线程池管理器,设置线程池参数,控制线程池的运行与停止。、
  2. 工作线程(Worker)初始化线程池是会有固定的线程存在与线程队列中并处于空闲状态。
  3. 一个线程队列 queue,线程队列的任务就是从任务队列中提取
  4. 一个缓冲任务队列 queue,其中存储的是任务(函数指针),此队列向外提供接口,能够让任务添加进来,同时保证并发安全(加互斥锁)。

三、运行过程

  1. 主程序当前没有任务要执行,线程池中的任务队列为空闲状态.
  2. 主程序添加小于等于线程池中线程数量的任务,任务缓冲队列为空。
  3. 主程序添加任务数量大于当前线程池中线程数量的任务,且多出来的任务小于缓冲队列的大小,缓存任务队列的任务等待执行。
  4. 主程序添加任务数量大于当前线程池中线程数量的任务,且任务缓冲队列已满.
  5. 动态扩展线程池大小:在任务缓冲队列已满的情况下,可以尝试动态扩展线程池的大小,以容纳更多的任务。这样可以确保所有任务都能被处理,避免任务丢失。
  6. 拒绝策略:当任务缓冲队列已满且无法动态扩展线程池大小时,可以采用一些拒绝策略来处理无法执行的任务。常见的拒绝策略包括:
    • 抛出异常:直接抛出异常,通知调用者任务无法执行。
    • 丢弃最旧的任务:从任务缓冲队列中丢弃最早添加的任务,以腾出空间来执行新的任务。
    • 调用者运行:由调用线程直接执行该任务,而不放入任务队列中。
  7. 定时重试:将无法立即执行的任务重新放入任务队列中,并通过定时器定期重试执行这些任务,直到任务被成功执行为止。
  8. 优先级队列:使用优先级队列管理任务,确保重要任务能够优先执行,可以避免一些次要任务占用队列空间。

四、具体实现部分

  1. 实现线程安全的队列,对队列的入队,出队操作加锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// safe_queue.h
private:
queue<T> m_queue;// 队列
mutex m_mutex;// 互斥锁
public:
SafeQueue(){}
SafeQueue(SafeQueue && other){}
~SafeQueue(){}


bool empty(){
unique_lock <mutex> lock(m_mutex);// 检测是否可以访问
return m_queue.empty();
}

int size(){
unique_lock <mutex> lock(m_mutex);
return m_queue.size();
}

void push(T &t){
unique_lock <mutex> lock(m_mutex);
m_queue.push(t);
}

bool pop(T &t){
unique_lock <mutex> lock(m_mutex);
if (m_queue.empty()) return false;

t = move(m_queue.front());
m_queue.pop();
return true;
}

};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class ThreadPool {
private:
bool m_shutdown; // 线程池是否关闭
SafeQueue<std::function<void()>> m_queue; // 执行函数安全队列,即任务队列
std::vector<std::thread> m_threads; // 工作线程队列​
std::mutex m_conditional_mutex; // 线程休眠锁互斥变量
std::condition_variable m_conditional_lock; // 线程环境锁,可以让线程处于休眠或者唤醒状态

class Worker {
// work_id
private:
int m_id;

ThreadPool *m_pool;//所属线程池指针
public:
Worker(ThreadPool* pool, const int id): m_pool(pool),m_id(id) // 构造函数
{
}

/*重载'()'运算符可以使对象具备函数调用的特性,增强了对象的可调用性和灵活性,使其适用于各种不同的场景和需求。
*/
void operator()(){
//一个不返回任何数值(void)且不接受任何参数的函数。在这个函数类型中,'void' 表示函数没有返回值,而 '()' 表示函数没有参数。
function<void()> func;
bool dequeued;// 判断是否正在取出队列任务
// 从任务队列获取任务,需要加锁
while(!m_pool->m_shutdown){
//加锁作用域
{
unique_lock<mutex> lock(m_pool->m_conditional_mutex);
if(m_pool->m_queue.empty()){
m_pool->m_conditional_lock.wait(lock); // 等待条件变量通知,开启线程
}
dequeued = m_pool->m_queue.pop(func);
}
if(dequeued){
func();
}
}
}
};

public:
ThreadPool(const int n_threads = 4): m_threads(std::vector<std::thread>(n_threads)), m_shutdown(false)
{
}
// 禁用了拷贝构造函数,防止通过拷贝构造函数创建 ThreadPool 类的对象的副本。
ThreadPool(const ThreadPool &) = delete;
// 禁用了移动构造函数,防止通过移动构造函数创建 ThreadPool 类的对象。
ThreadPool(ThreadPool &&) = delete;
//禁用了拷贝赋值运算符,防止通过拷贝赋值运算符将一个 ThreadPool 对象赋值给另一个对象。
ThreadPool &operator=(const ThreadPool &) = delete;
//禁用了移动赋值运算符,防止通过移动赋值运算符将一个 ThreadPool 对象赋值给另一个对象。
ThreadPool &operator=(ThreadPool &&) = delete;

void init(){
for (int i = 0;i<m_threads.size();i++){
// 创建线程,传入线程池地址与workid
m_threads.at(i) = thread(Worker(this,i));
}
}
//
void shutdown(){
m_shutdown = true;
m_conditional_lock.notify_all(); // 通知,唤醒所有工作线程
for (int i = 0; i < m_threads.size(); ++i)
{
if (m_threads.at(i).joinable()) // 判断线程是否在等待
{
m_threads.at(i).join(); // 将线程加入到等待队列
}
}
}


template <typename F,typename... Args>
auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))> {
// 调用打包
// function<decltype(f(args...))()> func = bind(forward<F>(f),forward<Args>(args));
std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
//std::packaged_task可以用来封装任何可以调用的目标,从而用于实现异步的调用。
//使用std::make_shared<>()方法,声明了一个std::packaged_task<decltype(f(args...))()>类型的智能指针
auto task_ptr = make_shared<packaged_task<decltype(f(args...))()>>(func);
std::function<void()> warpper_func = [task_ptr]()
{
(*task_ptr)();
};
// 队列通用安全封包函数,并压入安全队列
m_queue.push(warpper_func);
// 唤醒一个等待中的线程
m_conditional_lock.notify_one();
// 返回先前注册的任务指针
return task_ptr->get_future();
}
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// main.cpp
#include<bits/stdc++.h>
#include"ThreadPool.h"
#include<random>
using namespace std;

random_device rd;
mt19937 mt(rd());
uniform_int_distribution<int> dist(-1000, 1000); // 均匀分布的整数

auto rnd = bind(dist,mt);

// 设置线程睡眠时间
void simulate_hard_computation()
{
this_thread::sleep_for(chrono::milliseconds(2000 + rnd()));
}
// 添加两个数字的简单函数并打印结果
void multiply(const int a, const int b)
{
simulate_hard_computation();
const int res = a * b;
std::cout << a << " * " << b << " = " << res << std::endl;
}

// 添加并输出结果
void multiply_output(int &out, const int a, const int b)
{
simulate_hard_computation();
out = a * b;
std::cout << a << " * " << b << " = " << out << std::endl;
}

// 结果返回
int multiply_return(const int a, const int b)
{
simulate_hard_computation();
const int res = a * b;
std::cout << a << " * " << b << " = " << res << std::endl;
return res;
}

void example(){
ThreadPool pool(3);
pool.init();
for(int i=0;i<=2;i++){
//TODO
for(int j=0;j<=9;j++){
//TODO
pool.submit(multiply,i,j);

}
}
// 使用ref传递的输出参数提交函数
int output_ref;
auto future1 = pool.submit(multiply_output, std::ref(output_ref), 5, 6);

// 等待乘法输出完成
future1.get();
std::cout << "Last operation result is equals to " << output_ref << std::endl;

// 使用return参数提交函数
auto future2 = pool.submit(multiply_return, 5, 3);


// 等待乘法输出完成
int res = future2.get();
std::cout << "Last operation result is equals to " << res << std::endl;


// 关闭线程池
pool.shutdown();
}
int main(){
example();
return 0;
}

参考资料:

基于C++11实现线程池的工作原理 - 靑い空゛ - 博客园 (cnblogs.com)

基于C++11的线程池(threadpool),简洁且可以带任意多的参数 - _Ong - 博客园 (cnblogs.com)

C11实现的 100行线程池 解析 - C 无所不能 - SegmentFault 思否