生产者-消费者问题是一个经典的进程同步问题,该问题最早由Dijkstra提出,用以演示他提出的信号量机制。在同一个进程地址空间内执行的N个线程生产者线程生产物品,然后将物品放置在一个空缓冲区中供N个消费者线程消费。消费者线程从缓冲区中获得物品,然后释放缓冲区。当生产者线程生产物品时,如果没有空缓冲区可用,那么生产者线程必须等待消费者线程释放出一个空缓冲区。当消费者线程消费物品时,如果没有满的缓冲区,那么消费者线程将被阻塞,直到新的物品被生产出来。
生产者个数、消费者个数、还是缓冲区大小、每个生产者生产产品的个数等。
生产者-消费者并发执行的过程。消费者消费完所有的产品结束。
//main.cpp
#include "Storage.h" //代码如下
#include <signal.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <stdio.h>
#include <iostream>
using namespace std;
const int sleepTime = 1;
struct passStruct
{
pthread_mutex_t *m_mutex; //所有线程所共享的互斥量
int *m_products; //生产者一次生产的产品数
Storage *m_storage; //共享的存储区
int *m_nThreadNum; //线程标号
};
pthread_cond_t notempty;
pthread_cond_t notfull;
void *producer(void *arg)
{
//获取从控制线程中传来的数值
passStruct tmp = *static_cast<passStruct *>(arg);
//生产者每次生产的产品数
int *products = tmp.m_products;
//控制线程中已经初始化了的互斥量
pthread_mutex_t *mutex = tmp.m_mutex;
//获取仓库
Storage *storage = tmp.m_storage;
char strToStore[128],strCurrentStored[128];
sprintf(strToStore,"\tAdd %d product to storage...\n",*products);
while (true)
{
//获取互斥量
pthread_mutex_lock(mutex);
//如果当前仓库已满(已经没有空间可以填充),则打印阻塞消息
if (!storage -> isHaveSpace())
{
cout << "++ producer " << *(tmp.m_nThreadNum) << " Block\n" << endl;
//等待条件变量,并且释放互斥量
pthread_cond_wait(?full,mutex);
}
//仓库中已经有空间了,先睡一会再进行生产
sleep(sleepTime);
//打印激活消息
cout << "++ producer " << *(tmp.m_nThreadNum) << " Activate" << endl;
//开始生产产品
write(STDOUT_FILENO,strToStore,strlen(strToStore));
storage -> addToStorage(*products);
//打印当前仓库总的产品数
sprintf(strCurrentStored,"\t\tcurrent storage %d products\n\n",
storage -> currentCount());
write(STDOUT_FILENO,strCurrentStored,strlen(strCurrentStored));
//通知等待信号量notempty的第一个消费者线程,当前仓库非空
pthread_cond_signal(?empty);
pthread_mutex_unlock(mutex);
//睡一会儿参与下一轮竞争
sleep(sleepTime);
}
pthread_exit(NULL);
}
void *consumer(void *arg)
{
passStruct tmp = *static_cast<passStruct *>(arg);
pthread_mutex_t *mutex = tmp.m_mutex;
Storage *storage = tmp.m_storage;
string strToDisplay("\tGet a Product from storage...\n");
char strCurrentStored[128];
while (true)
{
//获取互斥量
pthread_mutex_lock(mutex);
//当前仓库中已空(仓库中没有产品),则打印阻塞消息,并等待条件变量的到来
if (!storage -> isHaveProduct())
{
cout << "-- consumer " << *(tmp.m_nThreadNum) << " Block\n" << endl;
pthread_cond_wait(?empty,mutex);
}
//当前仓库中已经有产品了^^,先睡一会在进行消费
sleep(sleepTime);
//打印激活消息
cout << "-- consumer " << *(tmp.m_nThreadNum) << " Activate" << endl;
write(STDOUT_FILENO,strToDisplay.c_str(),strToDisplay.size());
storage -> getFromStorage();
//打印当前仓库中的产品数
sprintf(strCurrentStored,"\t\tcurrent storage %d products\n\n",
storage -> currentCount());
write(STDOUT_FILENO,strCurrentStored,strlen(strCurrentStored));
//告知第一个阻塞在notfull条件变量的生产者线程,当前仓库已经有空间了
pthread_cond_signal(?full);
pthread_mutex_unlock(mutex);
//先睡一会再参与竞争
sleep(sleepTime);
}
pthread_exit(NULL);
}
//信号捕捉函数
void onSignal(int signalNumber)
{
switch (signalNumber)
{
//如果捕捉到SIGUSR1,则整个程序退出,SIGUSR1由stop.sh程序产生
case SIGUSR1:
cout << "Main Program Ending..." << endl;
_exit(0);
break;
case SIGINT:
cout << "Can‘t Kill The Program with Ctrl+C, Please Use the Shell Script stop.sh!" << endl;
sleep(5);
break;
default:
break;
}
}
int main()
{
//注册信号
signal(SIGUSR1,onSignal);
signal(SIGINT,onSignal);
//freopen("back.txt","w",stdout);
//初始化互斥量以及条件变量
pthread_mutex_t *mutex = new pthread_mutex_t;
pthread_mutex_init(mutex,NULL);
pthread_cond_init(?empty,NULL);
pthread_cond_init(?full,NULL);
int numberOfProducer;
cout << "Please input the number of Producer: ";
cin >> numberOfProducer;
int numberOfConsumer;
cout << "Please input the number of Consumer: ";
cin >> numberOfConsumer;
int numberOfProducts;
cout << "Please input the number of Products for ONE PRODUCER: ";
cin >> numberOfProducts;
int sizeOfStorage;
cout << "Please input the size of the Storage: ";
cin >> sizeOfStorage;
Storage storage(sizeOfStorage);
//初始化所传递的值
passStruct passValue;
passValue.m_mutex = mutex;
passValue.m_products = &numberOfProducts;
passValue.m_storage = &storage;
pthread_t pthreadProducer,pthreadConsumer;
for (int i = 0; i != numberOfConsumer; ++i)
{
passValue.m_nThreadNum = new int(i+1);
pthread_create(&pthreadConsumer,NULL,consumer,
static_cast<void *>(&passValue));
}
for (int i = 0; i != numberOfProducer; ++i)
{
passValue.m_nThreadNum = new int(i+1);
pthread_create(&pthreadProducer,NULL,producer,
static_cast<void *>(&passValue));
}
//等待线程结束
pthread_join(pthreadProducer,NULL);
pthread_join(pthreadConsumer,NULL);
pthread_mutex_destroy(mutex);
delete mutex;
pthread_cond_destroy(?empty);
pthread_cond_destroy(?full);
return 0;
}//Storage.h
#ifndef STORAGE_H_INCLUDED
#define STORAGE_H_INCLUDED
class Storage
{
public:
Storage(int);
~Storage();
int currentCount()
{
return hasBeenStored;
}
bool isHaveSpace()
{
if (hasBeenStored < bufferSize)
{
return true;
}
return false;
}
bool isHaveProduct()
{
if (hasBeenStored != 0)
{
return true;
}
return false;
}
bool isEmpty()
{
return hasBeenStored == 0;
}
void addToStorage(int n);
void getFromStorage();
private:
int bufferSize;
int hasBeenStored;
};
#endif // STORAGE_H_INCLUDED//Storage.cpp
#include "Storage.h"
Storage::Storage(int n = 0):bufferSize(n),hasBeenStored(0)
{
}
Storage::~Storage()
{
}
void Storage::addToStorage(int n)
{
if (hasBeenStored + n > bufferSize)
hasBeenStored = bufferSize;
else
hasBeenStored += n;
}
void Storage::getFromStorage()
{
if (!isEmpty())
-- hasBeenStored;
else
hasBeenStored = 0;
}//Makefile CC = g++ CPPFLAGS = -Wall -g -pthread SOURCES = $(wildcard *.cpp) OBJECTS = $(SOURCES:.cpp=.o) BIN = main .PHONY: all clean all: $(BIN) $(BIN): $(OBJECTS) $(CC) $(CPPFLAGS) -o $@ $^ @echo "# # # # # # OK! # # # # # " %.o: %.cpp $(CC) $(CPPFLAGS) -c $^ -o $@ clean: -rm -rf $(BIN) $(OBJECTS) *.cbp *.layout
main程序的启动脚本
#!/bin/bash # A Shell Script for Start the main program BIN=main ISEXIST=$(/bin/ls | /bin/grep main$) #if this program not exits, make it. if [ "$ISEXIST" = "" ] ; then /usr/bin/make fi #if this program not running, start it. PID=$(/usr/bin/pgrep $BIN) if [ "$PID" = "" ] ; then ./main fi
main程序的终止脚本
#!/bin/bash # A Shell Script for Stop the main Program PID=$(/usr/bin/pgrep main) if [ "$PID" != "" ] ; then /bin/kill -USR1 $PID #给main程序发送SIGUSR1信号 fi
原文地址:http://blog.csdn.net/zjf280441589/article/details/41287091