Linux高性能编程-线程池

今天我们来学习Linux线程池,线程池是高并发场景必须具备的软件组件,很多开源项目都会使用线程池,话不多说,直接开始。

1.线程池简介

1.1 什么是线程池?

    线程池就是提前创建好一批线程,通过一个池子来管理所有的线程。当有任务时,从池子中取出一个线程去执行该任务,执行结束后,线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务。

    我们通过几个图来讲解一下线程池的作用。

1)常规多线程

图片

    常规多线程方式,每个任务都会创建一个新的线程来处理,任务处理完后线程会销毁。

    优点:在任务量比较少的情况下,任务执行效率比较高。

    缺点:1.随着任务量增多,线程数量会越来越多,线程开销(CPU,内存开销)很大,如果不控制线程数量,系统会出现异常。2.缺乏任务管理机制。3.缺乏多线程管理机制。

2)简单的线程池

图片

    简单线程池通过任务队列和线程池完成。

    新任务产生后存储在任务队列,线程池中的空闲线程从任务队列获取任务并执行。

    优点:1.线程数量可控,能够保证系统安全。2.任务和线程统一管理,方便程序设计。

    缺点:1.没有任务拒绝机制,任务会堆积在任务队列。2.线程池线程数量固定,无法动态调节线程数量,导致任务处理效率不高。

3)完善的线程池

图片

    完善的线程池需要具备一下几个优点:

多线程动态管理。任务实时响应。高并发安全保护机制。

    线程池被分为核心线程和动态线程,核心线程会一直运行保证基础业务,当任务越来越多的时候,核心线程无法保证任务快速响应,此时需要通过创建动态线程提高线程响应速度。

    当动态线程到达极限时,系统的处理能力到达瓶颈,此时需要启动安全保护机制,通过任务拒绝,保证系统安全。

1.2 线程池优点

通过上述分析,我们了解到线程池有以下优点:

降低开销:线程池通过重用已存在的线程,降低了线程的创建和销毁所带来的开销,提高了性能。控制并发数:线程池能够有效控制同时执行的线程数量,防止因线程过多导致系统资源耗尽或性能下降,提高系统稳定性。提高响应速度:由于线程池中的线程可以复用,减少了线程创建的时间,因此提高了任务的响应速度。提供丰富功能:线程池不仅可以执行普通任务,还提供定时执行、定期执行、单线程执行以及并发数控制等高级功能,使得线程的管理和使用更加灵活和高效。

2.线程池设计

2.1 整体设计

    设计一个完善的线程池,我们需要设计几个核心模块:状态管理、线程管理,任务管理。

状态管理:管理线程池的生命周期。线程管理:管理核心线程和动态线程。任务管理:管理任务申请,任务拒绝以及任务高效调度。

2.2 详细设计

    线程池定义(struct thread_pool):

复制
typedef struct thread_pool { struct list_head head; //任务队列 int task_max; //任务队列最大长度 int task_num; //任务队列实际长度 int core_num; //核心线程数量 int dyn_max; //动态线程最大数量 atomic_int dyn_num; //动态线程实际数量 pthread_t *core_th; //核心线程pthread_t数组 pthread_t *dyn_th; //动态线程pthread_t数组 int status; //线程池状态 pthread_mutex_t mutex; //互斥锁 pthread_cond_t cond; //条件变量 } thread_pool_t;1.2.3.4.5.6.7.8.9.10.11.12.13.

    线程池通过struct thread_pool结构体定义,每个成员的作用已在代码中注释。

    创建线程池:

复制
thread_pool_t* thread_pool_create(int core_num, int dyn_max) { thread_pool_t *tp = (thread_pool_t *)malloc(sizeof(thread_pool_t)); if (!tp) return NULL; pthread_mutex_init(&tp->mutex, NULL); pthread_cond_init(&tp->cond, NULL); list_head_init(&tp->head); tp->status = INIT_STATUS; tp->task_max = TASK_MAX; tp->task_num = 0; tp->dyn_max = dyn_max; atomic_init(&tp->dyn_num, 0); tp->core_num = core_num; tp->core_th = malloc(core_num * sizeof(pthread_t)); for (int i = 0; i < tp->core_num; i++) { //创建核心线程 thread_arg *th_arg = (thread_arg *)malloc(sizeof(thread_arg)); *th_arg = (thread_arg) {.tp = tp, .type = CORE_THREAD, .task = NULL}; pthread_create(&tp->core_th[i], NULL, thread_proc, (void *)th_arg); } tp->status = RUNNING_STATUS; //设置线程池为running状态 return tp; }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.

    线程池创建时会指定核心线程数量以及最大动态线程数量,核心线程跟着线程池一起创建。

    动态线程根据实际任务量动态创建,为了防止创建过多的动态线程,需限制动态线程最大数量。

    销毁线程池:

复制
void thread_pool_destroy_p(thread_pool_t **ptp) { if (!ptp) return; thread_pool_t *tp = *ptp; while(atomic_load(&tp->dyn_num)) { //等待动态线程全部退出 usleep(10 * 1000); } for (int i = 0; i < tp->core_num; i++) { //等待核心线程全部退出 pthread_join(tp->core_th[i], NULL); } free(tp->core_th); pthread_mutex_destroy(&tp->mutex); pthread_cond_destroy(&tp->cond); tp->status = STOP_STATUS; //设置线程池为stop状态 free(tp); ptp = NULL; return; }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.

    线程池销毁需要回收核心线程和动态线程,核心线程采用pthread_join方式回收。

    动态线程设置分离属性,线程池成员dyn_num(原子变量)用于记录仍在工作的动态线程数量,dyn_num等于0时表示当前所有的动态线程都已经退出。

2.2.1 状态管理

    线程池整个生命周期可分为4个状态:

复制
#define INIT_STATUS (1<<0) //init状态 #define RUNNING_STATUS (1<<1) //running状态 #define SHUTDOWN_STATUS (1<<2) //shutdown状态 #define STOP_STATUS (1<<3) //stop状态1.2.3.4.
init状态:线程池创建时的状态,init状态线程不能处理任务。running状态:线程池初始化完毕后设置为running状态,此时线程池能够提取任务并执行。shutdown状态:线程池正常或者异常退出,设置为shutdown状态,此时线程池能继续处理工作队列中剩余任务,但无法再接收新的任务。stop状态:线程池所有线程全部释放,线程池被销毁。2.2.2 线程管理

    核心线程和动态线程处理函数:

复制
void *thread_proc(void *arg) { thread_arg *th_arg = (thread_arg *)arg; thread_pool_t *tp = th_arg->tp; pid_t tid = gettid(); //动态线程设置成分离模式,方便管理 if (th_arg->type == DYN_THREAD) pthread_detach(pthread_self()); int count = 0; task_t *task = NULL; while(1) { if (th_arg->task) { //创建线程并执行首次任务 printf("tid:%d first process task\n", tid); task = th_arg->task; th_arg->task = NULL; } else { task = thread_get_task(tp); //从任务队列中获取任务并执行 if (!task) { if (tp->status != RUNNING_STATUS) break; //线程池被关闭,线程退出 count++; if ((count >= 10) && (th_arg->type == DYN_THREAD)) { printf("dyn thread 10s break\n"); break; //动态线程空闲10秒自动退出 } continue; } count = 0; } task->cb(task->arg); task_freep(&task); } free(th_arg); if (th_arg->type == DYN_THREAD) { atomic_fetch_sub(&tp->dyn_num, 1); } return NULL; }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.

1)核心线程

    核心线程数量固定,在线程池创建时会创建所有的核心线程,线程池退出时会销毁所有的核心线程。

2)动态线程

    动态线程的管理比较复杂,需要根据任务数量做动态调整,任务量大时,动态线程会被创建,提高线程池任务响应速率,任务量小时,空闲的动态线程会被回收,从而减少线程开销。

2.2.3 任务管理

    任务定义(struct task):

复制
typedef enum TASK_TYPE { FREE_TYPE, NOFREE_TYPE, }TASK_TYPE; typedef void (*func)(void *arg); typedef struct task { struct list_head list; //队列节点 func cb; //回调函数 void *arg; //回调函数参数 TASK_TYPE type; //任务参数是否需要释放 }task_t;1.2.3.4.5.6.7.8.9.10.11.12.

    任务通过struct task定义,任务主要成员:

list:队列节点,用于插入和移除任务队列。cb:回调函数,任务处理函数。arg:回调函数参数。type:回调函数参数是否需要释放标志。

    任务申请流程如下图,创建一个新的任务后,需要做一些检测才能将任务加入线程池,检测不通过则执行任务拒绝,从而保证线程池始终处于安全高效运行状态。

    任务执行完毕后,需释放任务,回收资源。

图片

    线程池一定要做任务管理,任务管理的目的有两个:

保证线程池安全,不会堆积过多任务,消耗CPU和内存资源。提高任务处理效率。

3.线程池测试

    测试环境:树莓派4B,4核,4GB。

    分别采用多线程和线程池方式测试CPU密集型和IO密集型任务,对比两种方式性能和效率的差异。每毫秒产生1个任务,总共测试10000个任务。

    通过time命令执行测试程序,记录测试程序执行情况。

    测试代码如下:

    (完整代码请联系博主获取)

复制
#define CORE_THREAD_NUM (4) //核心线程数量 #define DYN_THREAD_MAX (32) //动态线程最大数量 #define TASK_MAX (128) //任务队列任务最大数量 #define ENABLE_THREAD_POOL (0) //是否开启线程池,0:关闭 1:开启 #define TEST_TASK_NUM (10000) //测试任务数量 #define TASK_INTERVAL (1000) //任务产生间隔时间,单位:毫秒 #define TEST_TASK_TYPE (0) //任务类型,0:CPU密集型 1:IO密集型 #define NOP_TIMES (10000000) //CPU密集型任务执行空指令次数 #define RAND_RANGE (1 << 18) //IO密集型任务休眠时间随机范围,单位:毫秒 void cpu_stress() { for (int i = 0; i < NOP_TIMES; i++) { ; } } void rand_sleep() { srand(time(0)); int ms = rand() & (RAND_RANGE - 1); usleep(ms); } //任务处理函数 void task_cb(void *arg) { #if !TEST_TASK_TYPE cpu_stress(); #else rand_sleep(); #endif return; } atomic_int running_threads; void *thread_proc1(void *arg) { atomic_fetch_add(&running_threads, 1); pthread_detach(pthread_self()); task_t *task = (task_t *)arg; task->cb(task->arg); free(task); atomic_fetch_sub(&running_threads, 1); return NULL; } int main(int argc, char *argv[]) { #if ENABLE_THREAD_POOL thread_pool_t *tp = thread_pool_create(CORE_THREAD_NUM, DYN_THREAD_MAX); if (!tp) { printf("thread_pool_create error"); return -1; } int seq = 0; while(1) { usleep(TASK_INTERVAL); task_t *task = task_create(task_cb, NULL, FREE_TYPE); if (!task) { printf("task create error\n"); usleep(10 * 1000); continue; } int ret = thread_add_task(tp, task); if (ret == -1) { //任务拒绝 task_freep(&task); usleep(10 * 1000); continue; } if (seq++ >= TEST_TASK_NUM) { thread_pool_exit(tp); thread_pool_destroy_p(&tp); break; } } printf("thread pool test done------\n"); #else atomic_init(&running_threads, 0); int seq = 0; int old_num = 0; while(1) { usleep(TASK_INTERVAL); task_t *task = task_create(task_cb, NULL, FREE_TYPE); if (!task) { printf("task create error\n"); usleep(10 * 1000); continue; } pthread_t th; int ret = pthread_create(&th, NULL, thread_proc1, (void *)task); if (ret != 0) { free(task); usleep(10 * 1000); continue; } int num = atomic_load(&running_threads); if (old_num < num) { old_num = num; printf("running_threads:%d\n", num); } if (seq++ >= TEST_TASK_NUM) break; } printf("threads test done------\n"); #endif return 0; }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.83.84.85.86.87.88.89.90.91.92.93.94.95.96.97.98.99.100.101.102.103.104.105.106.107.108.109.

1)CPU密集型场景测试

    测试参数如下:

图片

    多线程测试结果-->:

图片

    CPU使用率397%(已使用完),最高同时创建913个线程,完成测试时间2分钟,用户时间7分51秒,系统时间0.5秒。

    线程池测试结果-->:

图片

    CPU使用率398%(已使用完),核心线程4个,动态线程32个,共36各个线程,完成测试时间1分56秒,用户时间7分41秒,系统时间0.04秒。

    小节:CPU密集场景多线程和线程池方式处理效率相差不大,多线程方式最多同时创建900多个线程,会消耗大量系统资源。线程池方式线程始终控制在36个,比较安全。

2)IO密集型场景测试

    测试参数如下:

图片

    多线程测试结果-->:

图片

    CPU使用率10.2%(已使用完),最高同时创建212个线程,完成测试时间11秒,用户时间0.15秒,系统时间为1秒。

    线程池测试结果-->:

图片

图片

    CPU使用率3.6%(已使用完),核心线程4个,动态线程32个,共36各个线程,完成测试时间20秒,用户时间0.25秒,系统时间为0.32秒。

    小节:IO密集型场景线程处于IO阻塞状态,CPU使用率并不高,此时可以适当增加线程数量来提高CPU利用率。

总结:

线程池能够有效控制线程数量,防止线程过多对系统造成危害。线程池能够高效管理任务,使软件开发更方便、高效。从测试结果来看,多线程和线程池方式处理任务效率相差并不大,即使频繁的创建和销毁线程,也未对效率产生很大影响。

THE END
本站服务器由亿华云赞助提供-企业级高防云服务器