码迷,mamicode.com
首页 > 编程语言 > 详细

用户模式下的线程同步

时间:2015-04-13 18:13:33      阅读:222      评论:0      收藏:0      [点我收藏+]

标签:

  1 /******************************************************************************
  2 Module:  Queue.cpp
  3 Notices: Copyright (c) 2008 Jeffrey Richter & Christophe Nasarre
  4 ******************************************************************************/
  5 
  6 
  7 #include "..\CommonFiles\CmnHdr.h"     /* See Appendix A. */
  8 #include <windowsx.h>
  9 #include <tchar.h>
 10 #include <StrSafe.h>
 11 #include "Resource.h"
 12 
 13 
 14 ///////////////////////////////////////////////////////////////////////////////
 15 
 16 
 17 class CQueue {
 18 public:
 19    struct ELEMENT {
 20       int   m_nThreadNum;
 21       int   m_nRequestNum;
 22       // Other element data should go here
 23    };
 24    typedef ELEMENT* PELEMENT;
 25 
 26 private:
 27    struct INNER_ELEMENT {
 28       int      m_nStamp;  // 0 means empty
 29       ELEMENT  m_element;
 30    };
 31    typedef INNER_ELEMENT* PINNER_ELEMENT;
 32 
 33 private:
 34    PINNER_ELEMENT m_pElements;      // Array of elements to be processed
 35    int            m_nMaxElements;   // Maximum # of elements in the array
 36    int            m_nCurrentStamp;  // Keep track of the # of added elements
 37    
 38 private:
 39    int GetFreeSlot();
 40    int GetNextSlot(int nThreadNum);
 41 
 42 public:
 43    CQueue(int nMaxElements);
 44    ~CQueue();
 45    BOOL IsFull();
 46    BOOL IsEmpty(int nThreadNum);
 47    void AddElement(ELEMENT e);
 48    BOOL GetNewElement(int nThreadNum, ELEMENT& e);
 49 };
 50 
 51 
 52 ///////////////////////////////////////////////////////////////////////////////
 53 
 54 
 55 CQueue::CQueue(int nMaxElements) {
 56 
 57    // Allocate and initialize the elements
 58    m_pElements = (PINNER_ELEMENT) 
 59       HeapAlloc(GetProcessHeap(), 0, sizeof(INNER_ELEMENT) * nMaxElements);
 60    ZeroMemory(m_pElements, sizeof(INNER_ELEMENT) * nMaxElements);
 61 
 62    // Initialize the element counter
 63    m_nCurrentStamp = 0;
 64 
 65    // Remember the max number of elements
 66    m_nMaxElements = nMaxElements;
 67 }
 68 
 69 
 70 CQueue::~CQueue() {
 71 
 72    HeapFree(GetProcessHeap(), 0, m_pElements);
 73 }
 74 
 75 
 76 BOOL CQueue::IsFull() {
 77    
 78    return(GetFreeSlot() == -1);
 79 }
 80 
 81 
 82 BOOL CQueue::IsEmpty(int nThreadNum) {
 83 
 84    return(GetNextSlot(nThreadNum) == -1);
 85 }
 86 
 87 
 88 int CQueue::GetFreeSlot() {
 89 
 90    // Look for the first element with a 0 stamp
 91    for(int current = 0; current < m_nMaxElements; current++) {
 92       if (m_pElements[current].m_nStamp == 0)
 93          return(current);
 94    }
 95    
 96    // No free slot was found
 97    return(-1);
 98 }
 99 
100 
101 int CQueue::GetNextSlot(int nThreadNum) {
102    
103    // By default, there is no slot for this thread
104    int firstSlot = -1;
105    
106    // The element can‘t have a stamp higher than the last added
107    int firstStamp = m_nCurrentStamp+1;
108    
109    // Look for the even (thread 0) / odd (thread 1) element that is not free 
110    for(int current = 0; current < m_nMaxElements; current++) {
111    
112       // Keep track of the first added (lowest stamp) in the queue
113       // --> so that "first in first out" behavior is ensured
114       if ((m_pElements[current].m_nStamp != 0) &&  // free element
115           ((m_pElements[current].m_element.m_nRequestNum % 2) == nThreadNum) &&
116           (m_pElements[current].m_nStamp < firstStamp)) {
117 
118          firstStamp = m_pElements[current].m_nStamp;
119          firstSlot = current;
120       }
121    }
122    
123    return(firstSlot);
124 }
125 
126 
127 void CQueue::AddElement(ELEMENT e) {
128 
129    // Do nothing if the queue is full
130    int nFreeSlot = GetFreeSlot();
131    if (nFreeSlot == -1)
132       return;
133 
134    // Copy the content of the element
135    m_pElements[nFreeSlot].m_element = e;
136 
137    // Mark the element with the new stamp
138    m_pElements[nFreeSlot].m_nStamp = ++m_nCurrentStamp;
139 }
140 
141 
142 BOOL CQueue::GetNewElement(int nThreadNum, ELEMENT& e) {
143 
144    int nNewSlot = GetNextSlot(nThreadNum);
145    if (nNewSlot == -1)
146       return(FALSE);
147 
148    // Copy the content of the element
149    e = m_pElements[nNewSlot].m_element;
150 
151    // Mark the element as read
152    m_pElements[nNewSlot].m_nStamp = 0;
153 
154    return(TRUE);
155 }
156 
157 
158 ///////////////////////////////////////////////////////////////////////////////
159 
160 
161 CQueue               g_q(10);    // The shared queue
162 volatile LONG        g_fShutdown;// Signals client/server threads to die
163 HWND                 g_hWnd;     // How client/server threads give status
164 SRWLOCK              g_srwLock;  // Reader/writer lock to protect the queue
165 CONDITION_VARIABLE   g_cvReadyToConsume;  // Signaled by writers
166 CONDITION_VARIABLE   g_cvReadyToProduce;  // Signaled by readers
167 
168 
169 // Handles to all reader/writer threads
170 HANDLE g_hThreads[MAXIMUM_WAIT_OBJECTS];
171 
172 // Number of reader/writer threads  
173 int    g_nNumThreads = 0;
174 
175 
176 ///////////////////////////////////////////////////////////////////////////////
177 
178 
179 void AddText(HWND hWndLB, PCTSTR pszFormat, ...) {
180 
181    va_list argList;
182    va_start(argList, pszFormat);
183 
184    TCHAR sz[20 * 1024];
185    _vstprintf_s(sz, _countof(sz), pszFormat, argList);
186    ListBox_SetCurSel(hWndLB, ListBox_AddString(hWndLB, sz));
187    
188    va_end(argList);
189 }
190 
191 
192 BOOL ConsumeElement(int nThreadNum, int nRequestNum, HWND hWndLB) {
193 
194    // Get access to the queue to consume a new element
195    AcquireSRWLockShared(&g_srwLock); 
196 
197    // Fall asleep until there is something to read.
198    // Check if, while it was asleep, 
199    // it was not decided that the thread should stop
200    while (g_q.IsEmpty(nThreadNum) && !g_fShutdown) {
201       // There was not a readable element
202       AddText(hWndLB, TEXT("[%d] Nothing to process"), nThreadNum);
203          
204       // The queue is empty
205       // --> Wait until a writer adds a new element to read
206       //     and come back with the lock acquired in shared mode
207       SleepConditionVariableSRW(&g_cvReadyToConsume, &g_srwLock, 
208          INFINITE, CONDITION_VARIABLE_LOCKMODE_SHARED);
209    }
210 
211    // When thread is exiting, the lock should be released for writer
212    // and readers should be signaled through the condition variable
213    if (g_fShutdown) {
214       // Show that the current thread is exiting
215       AddText(hWndLB, TEXT("[%d] bye bye"), nThreadNum);
216 
217       // Another writer thread might still be blocked on the lock
218       // --> release it before exiting
219       ReleaseSRWLockShared(&g_srwLock);
220 
221       // Notify other readers that it is time to exit
222       // --> release readers
223       WakeConditionVariable(&g_cvReadyToConsume);
224 
225       return(FALSE);
226    }
227 
228    // Get the first new element
229    CQueue::ELEMENT e;
230    // Note: No need to test the return value since IsEmpty
231    //       returned FALSE
232    g_q.GetNewElement(nThreadNum, e);
233       
234    // No need to keep the lock any longer
235    ReleaseSRWLockShared(&g_srwLock);
236 
237    // Show result of consuming the element
238    AddText(hWndLB, TEXT("[%d] Processing %d:%d"), 
239       nThreadNum, e.m_nThreadNum, e.m_nRequestNum);
240 
241    // A free slot is now available for writer threads to produce
242    // --> wake up a writer thread
243    WakeConditionVariable(&g_cvReadyToProduce);
244 
245    return(TRUE);
246 }
247 
248 
249 DWORD WINAPI ReaderThread(PVOID pvParam) {
250 
251    int nThreadNum = PtrToUlong(pvParam);
252    HWND hWndLB = GetDlgItem(g_hWnd, IDC_SERVERS);
253 
254    for (int nRequestNum = 1; !g_fShutdown; nRequestNum++) {
255 
256       if (!ConsumeElement(nThreadNum, nRequestNum, hWndLB))
257          return(0);
258 
259       Sleep(2500);   // Wait before reading another element
260    }
261    
262    // g_fShutdown has been set during Sleep
263    // --> Show that the current thread is exiting
264    AddText(hWndLB, TEXT("[%d] bye bye"), nThreadNum);
265 
266    return(0);
267 }
268 
269 
270 ///////////////////////////////////////////////////////////////////////////////
271 
272 
273 DWORD WINAPI WriterThread(PVOID pvParam) {
274 
275    int nThreadNum = PtrToUlong(pvParam);
276    HWND hWndLB = GetDlgItem(g_hWnd, IDC_CLIENTS);
277 
278    for (int nRequestNum = 1; !g_fShutdown; nRequestNum++) {
279 
280       CQueue::ELEMENT e = { nThreadNum, nRequestNum };
281 
282       // Require access for writing
283       AcquireSRWLockExclusive(&g_srwLock);
284 
285       // If the queue is full, fall asleep as long as the condition variable 
286       // is not signaled
287       // Note: During the wait for acquiring the lock, 
288       //       a stop might have been received
289       if (g_q.IsFull() & !g_fShutdown) {
290          // No more room in the queue
291          AddText(hWndLB, TEXT("[%d] Queue is full: impossible to add %d"), 
292             nThreadNum, nRequestNum);
293 
294          // --> Need to wait for a reader to empty a slot before acquiring 
295          //     the lock again 
296          SleepConditionVariableSRW(&g_cvReadyToProduce, &g_srwLock, 
297             INFINITE, 0);
298       }
299 
300       // Other writer threads might still be blocked on the lock
301       // --> Release the lock and notify the remaining writer threads to quit
302       if (g_fShutdown) {
303          // Show that the current thread is exiting
304          AddText(hWndLB, TEXT("[%d] bye bye"), nThreadNum);
305 
306          // No need to keep the lock any longer
307          ReleaseSRWLockExclusive(&g_srwLock);
308 
309          // Signal other blocked writers threads that it is time to exit
310          WakeAllConditionVariable(&g_cvReadyToProduce);
311 
312          // Bye bye
313          return(0);
314       } else {
315          // Add the new ELEMENT into the queue
316          g_q.AddElement(e);
317 
318          // Show result of processing element
319          AddText(hWndLB, TEXT("[%d] Adding %d"), nThreadNum, nRequestNum);
320 
321          // No need to keep the lock any longer
322          ReleaseSRWLockExclusive(&g_srwLock);
323 
324          // Signal reader threads that there is an element to consume
325          WakeAllConditionVariable(&g_cvReadyToConsume);
326 
327          // Wait before adding a new element
328          Sleep(1500);
329       }
330    }
331 
332    // Show that the current thread is exiting
333    AddText(hWndLB, TEXT("[%d] bye bye"), nThreadNum);
334 
335    return(0);
336 }
337 
338 
339 
340 ///////////////////////////////////////////////////////////////////////////////
341 
342 
343 BOOL Dlg_OnInitDialog(HWND hWnd, HWND hWndFocus, LPARAM lParam) {
344 
345     chSETDLGICONS(hWnd, IDI_QUEUE);
346 
347     g_hWnd = hWnd; // Used by client/server threads to show status
348 
349     // 初始化  SRWLock
350     InitializeSRWLock(&g_srwLock);
351 
352     // 初始化  条件变量
353     InitializeConditionVariable(&g_cvReadyToConsume);
354     InitializeConditionVariable(&g_cvReadyToProduce);
355 
356     // Will be set to TRUE in order to end threads
357     g_fShutdown = FALSE;
358 
359     // 创建写入线程,INT_PTR和指针长度相等的int,x传入线程函数
360     DWORD dwThreadID;
361     for (int x = 0; x < 4; x++)
362         g_hThreads[g_nNumThreads++] = 
363         chBEGINTHREADEX(NULL, 0, WriterThread, (PVOID) (INT_PTR) x, 
364         0, &dwThreadID);
365 
366     // 创建读取线程
367     for (int x = 0; x < 2; x++)
368         g_hThreads[g_nNumThreads++] = 
369         chBEGINTHREADEX(NULL, 0, ReaderThread, (PVOID) (INT_PTR) x, 
370         0, &dwThreadID);
371 
372     return(TRUE);
373 }
374 
375 
376 ///////////////////////////////////////////////////////////////////////////////
377 
378 
379 void StopProcessing() {
380 
381    if (!g_fShutdown) {
382       // Ask all threads to end
383       InterlockedExchange(&g_fShutdown, TRUE);
384 
385       // Free all threads waiting on condition variables
386       WakeAllConditionVariable(&g_cvReadyToConsume);
387       WakeAllConditionVariable(&g_cvReadyToProduce);
388 
389       // Wait for all the threads to terminate & then clean up
390       WaitForMultipleObjects(g_nNumThreads, g_hThreads, TRUE, INFINITE);
391 
392       // Don‘t forget to clean up kernel resources
393       // Note: This is not really mandatory since the process is exiting
394       while (g_nNumThreads--)
395          CloseHandle(g_hThreads[g_nNumThreads]);
396 
397       // Close each list box
398       AddText(GetDlgItem(g_hWnd, IDC_SERVERS), TEXT("---------------------"));
399       AddText(GetDlgItem(g_hWnd, IDC_CLIENTS), TEXT("---------------------"));
400    }
401 }
402 
403 
404 DWORD WINAPI StoppingThread(PVOID pvParam) {
405 
406    StopProcessing();
407    return(0);
408 }
409 
410 
411 ///////////////////////////////////////////////////////////////////////////////
412 
413 
414 void Dlg_OnCommand(HWND hWnd, int id, HWND hWndCtl, UINT codeNotify) {
415 
416    switch (id) {
417       case IDCANCEL:
418          EndDialog(hWnd, id);
419          break;
420 
421       case IDC_BTN_STOP: 
422       {
423          // StopProcessing can‘t be called from the UI thread
424          // or a deadlock will occur: SendMessage() is used 
425          // to fill up the listboxes
426          // --> Another thread is required
427          DWORD dwThreadID;
428          CloseHandle(chBEGINTHREADEX(NULL, 0, StoppingThread, 
429             NULL, 0, &dwThreadID));
430          
431          // This button can‘t be pushed twice
432          Button_Enable(hWndCtl, FALSE);
433       }
434       break;
435    }
436 }
437 
438 
439 ///////////////////////////////////////////////////////////////////////////////
440 
441 
442 INT_PTR WINAPI Dlg_Proc(HWND hWnd, UINT uMsg, WPARAM wParam, LPARAM lParam) {
443    
444    switch (uMsg) {
445       chHANDLE_DLGMSG(hWnd, WM_INITDIALOG, Dlg_OnInitDialog);
446       chHANDLE_DLGMSG(hWnd, WM_COMMAND,    Dlg_OnCommand);
447    }
448    return(FALSE);
449 }
450 
451 
452 ///////////////////////////////////////////////////////////////////////////////
453 
454 
455 int WINAPI _tWinMain(HINSTANCE hinstExe, HINSTANCE, PTSTR pszCmdLine, int) {
456 
457    DialogBox(hinstExe, MAKEINTRESOURCE(IDD_QUEUE), NULL, Dlg_Proc);
458    StopProcessing();
459    return(0);
460 }
461 
462 
463 //////////////////////////////// End of File //////////////////////////////////

 

用户模式下的线程同步

标签:

原文地址:http://www.cnblogs.com/dLong/p/4422588.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!