码迷,mamicode.com
首页 > 其他好文 > 详细

简单说一个IOCP不好的地方

时间:2015-01-10 20:56:35      阅读:750      评论:0      收藏:0      [点我收藏+]

标签:

   IOCP是windows下IO事件处理的最高效的一种方式了,结合OVERLAPPED IO可以实现真正的完全异步IO。windows在此种模式下提供了一站式服务,只要你提交一个IO请求,接下来windows替你处理其他所有的工作,你只需要等着接受windows的完成通知就行了。

    响马大叔在他的孢子社区有了一个帖子再谈select, iocp, epoll,kqueue及各种I/O复用机制对此有比较全面的对比介绍了,故而本文不对IOCP这方面的内容再做赘述了,相反说说自己在自己开发过程中认为IOCP不好的地方。

    IOCP不好的地方体现这个地方:一个File/Socket Handle是不能多次调用CreateIoCompletionPort()绑定到不同的IOCP上的,只有第一次是成功的,第二次开始是参数错误失败!因此一旦绑定了一个IOCP就没法迁移到其他的IOCP了,这个是我经过实际的代码测试和分析ReactOS代码实现得出的结论。测试代码如下

技术分享
 1 int main(int argc, char *argv[])
 2 {
 3     HANDLE iocp;
 4     HANDLE iocp1;
 5     SOCKET s;
 6     HANDLE ret;
 7     
 8     WSADATA wsa_data;
 9     WSAStartup(MAKEWORD(2, 2), &wsa_data);
10     
11     iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 4);
12     iocp1 = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 4);
13     s = create_client_socket();
14     
15     assert(NULL != iocp);
16     assert(NULL != iocp1);
17     
18     ret = CreateIoCompletionPort((HANDLE)s, iocp, 0, 0);
19     printf("first bind, ret: %lu, error: %u\n", (long)ret, GetLastError());
20 
21     ret = CreateIoCompletionPort((HANDLE)s, iocp1, 0, 0);
22     printf("second bind, ret: %lu, error: %u\n", (long)ret, GetLastError());
23     
24     CloseHandle(iocp);
25     CloseHandle(iocp1);
26     closesocket(s);
27     
28     WSACleanup();
29     
30     return 0;
31 }
View Code

运行结果

Administrator@attention /e/tinylib/windows/net_iocp
$ iocp.exe
first bind, ret: 60, error: 0
second bind, ret: 0, error: 87

ReactOS-0.3.12-REL-src的代码体现在NtSetInformationFile()中以下代码片段

技术分享
 1     /* FIXME: Later, we can implement a lot of stuff here and avoid a driver call */
 2     /* Handle IO Completion Port quickly */
 3     if (FileInformationClass == FileCompletionInformation)
 4     {
 5         /* Check if the file object already has a completion port */
 6         if ((FileObject->Flags & FO_SYNCHRONOUS_IO) ||
 7             (FileObject->CompletionContext))
 8         {
 9             /* Fail */
10             Status = STATUS_INVALID_PARAMETER;
11         }
12         else
13         {
14             /* Reference the Port */
15             CompletionInfo = Irp->AssociatedIrp.SystemBuffer;
16             Status = ObReferenceObjectByHandle(CompletionInfo->Port,
17                                                IO_COMPLETION_MODIFY_STATE,
18                                                IoCompletionType,
19                                                PreviousMode,
20                                                (PVOID*)&Queue,
21                                                NULL);
22             if (NT_SUCCESS(Status))
23             {
24                 /* Allocate the Context */
25                 Context = ExAllocatePoolWithTag(PagedPool,
26                                                 sizeof(IO_COMPLETION_CONTEXT),
27                                                 IOC_TAG);
28                 if (Context)
29                 {
30                     /* Set the Data */
31                     Context->Key = CompletionInfo->Key;
32                     Context->Port = Queue;
33                     if (InterlockedCompareExchangePointer((PVOID*)&FileObject->
34                                                           CompletionContext,
35                                                           Context,
36                                                           NULL))
37                     {
38                         /*
39                          * Someone else set the completion port in the
40                          * meanwhile, so dereference the port and fail.
41                          */
42                         ExFreePool(Context);
43                         ObDereferenceObject(Queue);
44                         Status = STATUS_INVALID_PARAMETER;
45                     }
46                 }
47                 else
48                 {
49                     /* Dereference the Port now */
50                     ObDereferenceObject(Queue);
51                     Status = STATUS_INSUFFICIENT_RESOURCES;
52                 }
53             }
54         }
55 
56         /* Set the IRP Status */
57         Irp->IoStatus.Status = Status;
58         Irp->IoStatus.Information = 0;
59     }
View Code

MSDN中也明确提倡开发者启动多个线程使用GetQueuedCompletionStatus()挂在一个IOCP上来处理IO事件,我是如此理解了的,原文如下

NumberOfConcurrentThreads
[in] Maximum number of threads that the operating system allows to concurrently process I/O completion packets for the I/O completion port. If this parameter is zero, the system allows as many concurrently running threads as there are processors in the system.

Although any number of threads can call the GetQueuedCompletionStatus function to wait for an I/O completion port, each thread is associated with only one completion port at a time. That port is the port that was last checked by the thread.

可这对应有另外一个问题:会导致同一个IO handle的完成事件被分散到不同的线程中处理,从而在处理同一个handle的IO事件时会引入额外的并发竞争,对此我也写了代码进行测试确认,如下

技术分享
  1 /* 
  2  编译命令
  3  gcc iocp.c -o iocp -lws2_32 -g
  4  
  5  测试命令
  6   nc -u 192.168.100.101 1993
  7  快速反复发送数据
  8  
  9  实际运行结果
 10 Administrator@attention /e/code
 11 $ gdb -q iocp.exe
 12 Reading symbols from e:\code\iocp.exe...done.
 13 (gdb) r
 14 Starting program: e:\code\iocp.exe
 15 [New Thread 5252.0x1330]
 16 [New Thread 5252.0xcf0]
 17 thread: 3312, 5 bytes received for 168 notified by IOCP
 18 thread: 4912, 3 bytes received for 168 notified by IOCP
 19 thread: 3312, 6 bytes received for 168 notified by IOCP
 20 thread: 4912, 5 bytes received for 168 notified by IOCP
 21 thread: 4912, 2 bytes received for 168 notified by IOCP
 22 thread: 4912, 3 bytes received for 168 notified by IOCP
 23 thread: 3312, 4 bytes received for 168 notified by IOCP
 24  */
 25 
 26 #include <stdio.h>
 27 #include <stdlib.h>
 28 
 29 #define WIN32_LEAN_AND_MEAN
 30 #include <windows.h>
 31 #include <winsock2.h>
 32 #include <process.h>
 33 
 34 HANDLE iocp;
 35 SOCKET s_udp;
 36 
 37 void routine(void)
 38 {
 39     unsigned threadId;
 40     
 41     ULONG_PTR key;
 42     LPOVERLAPPED povlp;
 43     BOOL result;
 44     
 45     char buffer[65535];
 46     WSABUF wsabuf;
 47     DWORD received;
 48     DWORD flag;
 49     struct sockaddr_in peer_addr;
 50     int addr_len;
 51     WSAOVERLAPPED ovlp;
 52     int error;
 53 
 54     while (1)
 55     {
 56         wsabuf.len = sizeof(buffer);
 57         wsabuf.buf = buffer;
 58         received = 0;
 59         flag = 0;
 60         addr_len = sizeof(peer_addr);
 61         memset(&peer_addr, 0, addr_len);
 62         memset(&ovlp, 0, sizeof(ovlp));
 63         
 64         threadId = GetCurrentThreadId();
 65 
 66         if (WSARecvFrom(s_udp, &wsabuf, 1, &received, &flag, (struct sockaddr*)&peer_addr, &addr_len, &ovlp, NULL) == 0)
 67         {
 68             printf("thread: %u, %u bytes received for %lu imediately\n", threadId, received, s_udp);
 69             continue;
 70         }
 71         
 72         result = GetQueuedCompletionStatus(iocp, &received, &key, &povlp, 10);
 73         if (FALSE == result)
 74         {
 75             error = WSAGetLastError();
 76             if (WAIT_TIMEOUT != error)
 77             {
 78                 printf("GetQueuedCompletionStatus() failed, error: %d\n", error);
 79             }
 80             continue;
 81         }
 82 
 83         printf("thread: %u, %u bytes received fro %lu notified by IOCP\n", threadId, received, s_udp);
 84     }
 85 
 86     return;
 87 }
 88 
 89 unsigned __stdcall thread(void *arg)
 90 {
 91     routine();
 92     
 93     return 0;
 94 }
 95 
 96 SOCKET create_udp_socket(unsigned short port, const char *ip)
 97 {
 98     SOCKET fd;
 99     struct sockaddr_in addr;
100     unsigned long value = 1;
101 
102     fd = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_OVERLAPPED);
103     if (INVALID_SOCKET == fd)
104     {
105         printf("create_udp_socket: socket() failed, errno: %d", WSAGetLastError());
106         return INVALID_SOCKET;
107     }
108 
109     memset(&addr, 0, sizeof(addr));
110     addr.sin_family = AF_INET;
111     addr.sin_addr.s_addr = (NULL != ip ? inet_addr(ip) : INADDR_ANY);
112     addr.sin_port = htons(port);
113     if (bind(fd, (struct sockaddr*)&addr, sizeof(addr)) != 0)
114     {
115         printf("create_server_socket: bind() failed, erron: %d", WSAGetLastError());
116         closesocket(fd);
117         return INVALID_SOCKET;
118     }
119 
120     return fd;
121 }
122 
123 int main(int argc, char *argv[])
124 {
125     unsigned threadId;
126     HANDLE t;
127     WSADATA wsadata;
128     
129     WSAStartup(MAKEWORD(2,2), &wsadata);
130     
131     iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 4);
132     s_udp = create_udp_socket(1993, "0.0.0.0");
133     CreateIoCompletionPort((HANDLE)s_udp, iocp, 0, 0);
134     
135     t = (HANDLE)_beginthreadex(NULL, 0, thread, NULL, 0, &threadId);
136     
137     routine();
138     
139     WaitForSingleObject(t, INFINITE);
140     CloseHandle(t);
141     closesocket(s_udp);
142     CloseHandle(iocp);
143     
144     WSACleanup();
145 
146     return 0;
147 }
View Code

 

如此的话,由于这些并发竞争的存在实际上差不多抵消了开多个线程进行并发处理的好处,还不如将所有的IO事件全部放在同一个线程中进行处理,还能省去很多锁的开销。不过现代的程序几乎完全是在多核的CPU上运行的,如果因为IOCP,你让所有相关的工作全部放在一个线程里进行处理,又不能充分利用多核的并行优势。实际上我们在设计并发模型时,经常开多个worker来实现负载均衡,但IOCP以上的限制是与之相冲突的。

linux下的epoll就额外提供了del操作,可以使得一个fd可以随时从当期的epoll中detach出去,又立马add进另外一个epoll,如此的话就可以开多个worker线程开跑多个epoll,从而可以将不同fd均摊到不同的worker中实现负载均衡。这种均衡操作在实际的业务中是很常见的,会需要你根据业务逻辑,将不同的fd交给其他的线程来处理,若使用IOCP的话就不太方便了。

这些就算是我对IOCP吐槽的一个地方了。

~~end~~

 

简单说一个IOCP不好的地方

标签:

原文地址:http://www.cnblogs.com/lanyuliuyun/p/4215568.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!