标签:des blog io for art cti div ar
#ifndef __DEF_H__
#define __DEF_H__
#include <stddef.h>
#include <pthread.h>
#include <stdio.h>
#define TRUE 1
#define FALSE 0
//任务结构体
typedef struct{
void (*thread_function_ptr_) (void*);
void *arg_;
}task_t;
//队列结构体
typedef struct node_t{
task_t data_; //队列结点内存放任务
struct node_t *next_;
}node_t, *pnode_t;
typedef struct{
pnode_t head_;
pnode_t tail_;
size_t size_;
}queue_t;
//线程池结构体
typedef struct{
size_t size_; //线程池大小
pthread_t *threads_; //线程id数组
queue_t queue_; //任务队列
int is_started_; // 线程池状态 是否开启 c 没有bool
pthread_mutex_t mutex_; //互斥锁 各线程互斥访问任务队列
pthread_cond_t cond_; //条件队列 同步主线程和各子线程
}pool_t;
// 线程池的接口
void thread_pool_init(pool_t *pool, size_t size); //初始化线程池
void thread_pool_start(pool_t *pool); // 开启线程池
void thread_pool_add_task(pool_t *pool, task_t task); //往线程池中加入新的任务
int thread_pool_get_task(pool_t *pool, task_t *task);//从线程池中取出任务
void thread_pool_stop(pool_t *pool); // 关闭线程池
void thread_pool_deatroy(pool_t *pool); //销毁 线程池
int thread_pool_is_started(pool_t *pool);// 线程池是否开启
size_t thread_pool_get_size_of_queue(pool_t *pool); //返回线程池任务队列的大小
void* thread_pool_thread_func(void *); //每个线程执行的函数
//队列的接口
void queue_init(queue_t *queue); //初始化队列 这里动态创建队列
void queue_push(queue_t *queue, task_t task); // 入队列
void queue_pop(queue_t *queue); //出队列
void queue_destroy(queue_t *queue); //销毁队列
void queue_clear(queue_t *queue); //清空队列
int queue_is_empty(queue_t *queue); //判断队列是否为空
size_t queue_size(queue_t *queue); //返回队列的大小
task_t queue_top(queue_t *queue); // 返回队首元素
#endif
#include "def.h"
#include <stdlib.h>
#include <assert.h>
void queue_init(queue_t *queue){
queue->head_ = NULL;
queue->tail_ = NULL;
queue->size_ = 0;
}
void queue_push(queue_t *queue, task_t task){
pnode_t pCur = (pnode_t )malloc(sizeof(node_t));
pCur->data_ = task;
pCur->next_ = NULL;
if(queue_is_empty(queue)){
queue->head_= queue->tail_ = pCur;
}
else{
queue->tail_->next_ = pCur;
queue->tail_ = pCur;
}
queue->size_++;
}
void queue_pop(queue_t *queue){
assert(!queue_is_empty(queue));
pnode_t pCur = queue->head_;
queue->head_ = queue->head_->next_;
free(pCur);
queue->size_ --;
}
void queue_destroy(queue_t *queue){
queue_clear(queue);
}
void queue_clear(queue_t *queue){
while(!queue_is_empty(queue)){
queue_pop(queue);
}
}
int queue_is_empty(queue_t *queue){
return queue->size_ == 0;
}
size_t queue_size(queue_t *queue){
return queue->size_;
}
task_t queue_top(queue_t *queue){
return queue->head_->data_;
}
#include "def.h"
#include <stdio.h>
#include <stdlib.h>
#define POOL_SIZE 3
void quare(void *arg){
int num = (int)arg;
printf("%d * %d = %d\n", num, num, num * num);
}
int main(int argc, const char *argv[])
{
pool_t pool;
task_t task;
srand(10000);
thread_pool_init(&pool, POOL_SIZE);
thread_pool_start(&pool);
while(1){
task.thread_function_ptr_ = quare;
task.arg_ = (void *)(rand()%100);
thread_pool_add_task(&pool, task);
sleep(1);
}
thread_pool_stop(&pool);
thread_pool_destroy(&pool);
return 0;
}
#include "def.h"
#include <stdio.h>
/*
* 测试队列
*/
void func_ptr(void *arg){
printf("arg = %d\n", (int)arg);
}
int main(int argc, const char *argv[])
{
queue_t queue;
queue_init(&queue);
task_t task, task_2;
task.thread_function_ptr_ = func_ptr;
task.arg_ = (void *)10;
queue_push(&queue, task);
task_2 = queue_top(&queue);
printf("task_2.arg = %d\n",(int)task_2.arg_);
queue_pop(&queue);
printf("queue_is_empty = %d\n", queue_is_empty(&queue));
return 0;
}
#include "def.h"
#include <stdlib.h>
#include <assert.h>
void thread_pool_init(pool_t *pool, size_t size){
pool->size_ = size;
pool->threads_ = (pthread_t *)malloc(pool->size_ * sizeof(pthread_t));
queue_init(&pool->queue_);
pool->is_started_ = FALSE;
pthread_mutex_init(&pool->mutex_, NULL);
pthread_cond_init(&pool->cond_, NULL);
}
void *thread_pool_thread_func(void * arg){
pool_t *pool = (pool_t *)arg;
task_t task;
while(1){
int ret = thread_pool_get_task(pool, &task);
if(ret == TRUE)
task.thread_function_ptr_(task.arg_);
else //此时说明线程池关闭
break;
}
}
void thread_pool_start(pool_t *pool){
if(pool->is_started_ == FALSE){
pool->is_started_ = TRUE;
int i;
for(i = 0; i < pool->size_; i++){
pthread_create(&pool->threads_[i], NULL,thread_pool_thread_func, (void*)pool);
}
}
}
void thread_pool_add_task(pool_t *pool, task_t task){
assert(pool->is_started_);
pthread_mutex_lock(&pool->mutex_);
queue_push(&pool->queue_, task); //将新任务加入任务队列中去
pthread_cond_signal(&pool->cond_);
pthread_mutex_unlock(&pool->mutex_);
}
int thread_pool_get_task(pool_t *pool, task_t *task){ // 根据返回值判断是否成功取出任务
pthread_mutex_lock(&pool->mutex_);
while(queue_is_empty(&pool->queue_) && pool->is_started_ == TRUE){
pthread_cond_wait(&pool->cond_, &pool->mutex_);
}
if(pool->is_started_ == FALSE){//有可能是关闭线程池时 被唤醒的
pthread_mutex_unlock(&pool->mutex_);
return FALSE;
}
*task = queue_top(&pool->queue_);
queue_pop(&pool->queue_);
pthread_mutex_unlock(&pool->mutex_);
return TRUE;
}
void thread_pool_stop(pool_t *pool){
if(pool->is_started_ == FALSE)
return;
pool->is_started_ = FALSE;
pthread_cond_broadcast(&pool->cond_); //唤醒所有睡眠线程 结束回收资源
int i;
for(i = 0; i < pool->size_; i++){
pthread_join(pool->threads_[i], NULL);
}
queue_clear(&pool->queue_); // 清空任务队列
}
void thread_pool_destroy(pool_t *pool){ // 销毁线程池
thread_pool_stop(pool);
pthread_mutex_destroy(&pool->mutex_); // 销毁互斥锁和条件变量
pthread_cond_destroy(&pool->cond_);
free(pool->threads_); //释放动态分配的内存 线程数组和任务队列
queue_destroy(&pool->queue_);
}
int thread_pool_is_started(pool_t *pool){
return pool->is_started_ == TRUE;
}
size_t thread_pool_get_size_of_queue(pool_t *pool){
return pool->queue_.size_;
}
0726------Linux基础----------线程池,布布扣,bubuko.com
0726------Linux基础----------线程池
标签:des blog io for art cti div ar
原文地址:http://www.cnblogs.com/monicalee/p/3874178.html