码迷,mamicode.com
首页 > 编程语言 > 详细

Linux下简易线程池

时间:2015-07-21 12:41:01      阅读:182      评论:0      收藏:0      [点我收藏+]

标签:

线程池简介

 

 

简易线程池实现

  线程池头文件threadpool.h如下:

 1 #ifndef THREADPOOL_H
 2 #define THREADPOOL_H
 3 
 4 #include <stdio.h>
 5 #include <stdlib.h>
 6 #include <unistd.h>
 7 #include <pthread.h>
 8 
 9 /**
10 * 线程体数据结构
11 */
12 typedef struct runner
13 {
14     void(*callback)(void* arg);    // 回调函数指针
15     void* arg;                     // 回调函数的参数
16     struct runner* next;
17 } thread_runner;
18 
19 /**
20 * 线程池数据结构
21 */
22 typedef struct
23 {
24     pthread_mutex_t mutex;             // 互斥量
25     pthread_cond_t cond;               // 条件变量
26     thread_runner* runner_head;        // 线程池中所有等待任务的头指针
27     thread_runner* runner_tail;        // 线程池所有等待任务的尾指针
28     int shutdown;                      // 线程池是否销毁
29     pthread_t* threads;                // 所有线程
30     int max_thread_size;               // 线程池中允许的活动线程数目
31 } thread_pool;
32 
33 /**
34 * 线程体
35 */
36 void run(void *arg);
37 
38 /**
39 *   初始化线程池
40 *   参数:
41 *   pool:指向线程池结构有效地址的动态指针
42 *   max_thread_size:最大的线程数
43 */
44 void threadpool_init(thread_pool* pool, int max_thread_size);
45 
46 /**
47 *   向线程池加入任务
48 *   参数:
49 *   pool:指向线程池结构有效地址的动态指针
50 *   callback:线程回调函数
51 *   arg:回调函数参数
52 */
53 void threadpool_add_runner(thread_pool* pool, void(*callback)(void *arg), void *arg);
54 
55 /**
56 *   销毁线程池
57 *   参数:
58 *   ppool:指向线程池结构有效地址的动态指针地址(二级指针),销毁后释放内存,该指针为NULL
59 */
60 void threadpool_destroy(thread_pool** ppool);
61 
62 #endif

  线程池实现文件threadpool.c如下:

  1 #include "threadpool.h"
  2 
  3 #define DEBUG 1
  4 
  5 /**
  6 *   初始化线程池
  7 *   参数:
  8 *   pool:指向线程池结构有效地址的动态指针
  9 *   max_thread_size:最大的线程数
 10 */
 11 void threadpool_init(thread_pool* pool, int max_thread_size)
 12 {
 13     // 初始化互斥量
 14     pthread_mutex_init(&(pool->mutex), NULL);
 15     // 初始化条件变量
 16     pthread_cond_init(&(pool->cond), NULL);
 17     pool->runner_head = NULL;
 18     pool->runner_tail = NULL;
 19     pool->max_thread_size = max_thread_size;
 20     pool->shutdown = 0;
 21     
 22     // 创建所有分离态线程(即创建线程池)
 23     pool->threads = (pthread_t *)malloc(max_thread_size * sizeof(pthread_t));
 24     int i = 0;
 25     for (i = 0; i < max_thread_size; i++)
 26     {
 27         pthread_attr_t attr;
 28         pthread_attr_init(&attr);
 29         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
 30         pthread_create(&(pool->threads[i]), &attr, (void*)run, (void*)pool);
 31     }
 32 #ifdef DEBUG
 33     printf("threadpool_init-> create %d detached thread\n", max_thread_size);
 34 #endif
 35 }
 36 
 37 /**
 38 * 线程体
 39 */
 40 void run(void *arg)
 41 {
 42     thread_pool* pool = (thread_pool*)arg;
 43     while (1)
 44     {
 45         // 加锁
 46         pthread_mutex_lock(&(pool->mutex));
 47 #ifdef DEBUG
 48         printf("run-> locked\n");
 49 #endif
 50         // 如果等待队列为0并且线程池未销毁,则处于阻塞状态(即等待任务的唤醒)
 51         while (pool->runner_head == NULL && !pool->shutdown)
 52         {
 53             pthread_cond_wait(&(pool->cond), &(pool->mutex));
 54         }
 55         //如果线程池已经销毁
 56         if (pool->shutdown)
 57         {
 58             // 解锁
 59             pthread_mutex_unlock(&(pool->mutex));
 60 #ifdef DEBUG
 61             printf("run-> unlocked and thread exit\n");
 62 #endif
 63             pthread_exit(NULL);
 64         }
 65         // 取出链表中的头元素
 66         thread_runner *runner = pool->runner_head;
 67         pool->runner_head = runner->next;
 68         // 解锁
 69         pthread_mutex_unlock(&(pool->mutex));
 70 #ifdef DEBUG
 71         printf("run-> unlocked\n");
 72 #endif
 73         // 调用回调函数,执行任务
 74         (runner->callback)(runner->arg);
 75         free(runner);
 76         runner = NULL;
 77 #ifdef DEBUG
 78         printf("run-> runned and free runner\n");
 79 #endif
 80     }
 81     pthread_exit(NULL);
 82 }
 83 
 84 /**
 85 *   向线程池加入任务
 86 *   参数:
 87 *   pool:指向线程池结构有效地址的动态指针
 88 *   callback:线程回调函数
 89 *   arg:回调函数参数
 90 */
 91 void threadpool_add_runner(thread_pool* pool, void(*callback)(void *arg), void *arg)
 92 {
 93     // 构造一个新任务
 94     thread_runner *newrunner = (thread_runner *)malloc(sizeof(thread_runner));
 95     newrunner->callback = callback;
 96     newrunner->arg = arg;
 97     newrunner->next = NULL;
 98     // 加锁
 99     pthread_mutex_lock(&(pool->mutex));
100 #ifdef DEBUG
101     printf("threadpool_add_runner-> locked\n");
102 #endif
103     // 将任务加入到等待队列中
104     if (pool->runner_head != NULL)
105     {
106         pool->runner_tail->next = newrunner;
107         // 尾指针指到最后一个任务
108         pool->runner_tail = newrunner;
109     }
110     else
111     {
112         pool->runner_head = newrunner;
113         pool->runner_tail = newrunner;
114     }
115     // 解锁
116     pthread_mutex_unlock(&(pool->mutex));
117 #ifdef DEBUG
118     printf("threadpool_add_runner-> unlocked\n");
119 #endif
120     // 唤醒一个等待线程
121     pthread_cond_signal(&(pool->cond));
122 #ifdef DEBUG
123     printf("threadpool_add_runner-> add a runner and wakeup a waiting thread\n");
124 #endif
125 }
126 
127 /**
128 *   销毁线程池
129 *   参数:
130 *   ppool:指向线程池结构有效地址的动态指针地址(二级指针)
131 */
132 void threadpool_destroy(thread_pool** ppool)
133 {
134     thread_pool *pool = *ppool;
135     // 防止2次销毁
136     if (!pool->shutdown)
137     {
138         pool->shutdown = 1;
139         // 唤醒所有等待线程,线程池要销毁了
140         pthread_cond_broadcast(&(pool->cond));
141         // 等待所有线程中止
142         sleep(1);
143 #ifdef DEBUG
144         printf("threadpool_destroy-> wakeup all waiting threads\n");
145 #endif
146         // 回收空间
147         free(pool->threads);
148         // 销毁等待队列
149         thread_runner *head = NULL;
150         while (pool->runner_head != NULL)
151         {
152             head = pool->runner_head;
153             pool->runner_head = pool->runner_head->next;
154             free(head);
155         }
156 
157 #ifdef DEBUG
158         printf("threadpool_destroy-> all runners freed\n");
159 #endif
160         // 条件变量和互斥量也别忘了销毁
161         pthread_mutex_destroy(&(pool->mutex));
162         pthread_cond_destroy(&(pool->cond));
163 
164 #ifdef DEBUG
165         printf("threadpool_destroy-> mutex and cond destoryed\n");
166 #endif
167         free(pool);
168         (*ppool) = NULL;
169 
170 #ifdef DEBUG
171         printf("threadpool_destroy-> pool freed\n");
172 #endif
173     }
174 }

  测试文件如下:

 1 #include "threadpool.h"
 2 
 3 void threadrun(void* arg)
 4 {
 5     int *i = (int *)arg;
 6     printf("%d\n", *i);
 7 }
 8 
 9 int main(void)
10 {
11     thread_pool *pool = malloc(sizeof(thread_pool));
12     threadpool_init(pool, 2);
13     
14     int i;
15     int tmp[3];
16     for (i = 0; i < 3; i++)
17     {
18         tmp[i] = i;
19         threadpool_add_runner(pool, threadrun, &tmp[i]);
20     }
21     
22     sleep(1);
23     threadpool_destroy(&pool);
24     printf("main-> %p\n", pool);
25     printf("main-> test over\n");
26     
27     return 0;
28 }

  程序运行结果如下:

  技术分享

参考资料

  Linux线程池(C语言描述) - 互斥量+条件变量同步

 

Linux下简易线程池

标签:

原文地址:http://www.cnblogs.com/xiehongfeng100/p/4663942.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!