不同于IO堆叠对象在IO完成时通过触发事件或触发CompletionRoutine回调函数,IOCP模型将socket和完成端口对象(CompletionPort,简称CP对象)绑定,当IO操作完成时,会改变该对象的状态,而我们通过完成端口对象,即可以确认IO操作是不是完成。
创建完成端口对象的API以下:
HANDLE WINAPI CreateIoCompletionPort( __in HANDLE FileHandle, __in_opt HANDLE ExistingCompletionPort, __in ULONG_PTR CompletionKey, __in DWORD NumberOfConcurrentThreads );
CreateIoCompletionPort既可以创建完成端口对象,也能够用来将socket和完成对象绑定,通过对其赋予不同的参数,可以实现不同的功能:
FileHandle ——创建CP对象时传入INVALID_HANDLE_VALUE;绑定socket和CP对象时传入socket描写符。
ExistingCompletionPort——创建CP对象时传入NULL;绑定socket和CP对象传入完成端口对象的句柄;
CompletionKey —— 创建CP对象是传入0;绑定socket和CP对象时作为参数传递给GetQueuedCompletionStatus
NumberOfConcurrentThreads—— 分配给CP对象用于处理IO的线程数。如果参数是0,系统中的CPU个数就是最大的线程数。
返回值 —— 返回CP对象的句柄。
以下,我们创建了1个完成端口对象:
hComPort=CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0);
我们依然用CreateIoCompletionPort将socket和CP对象进行绑定:
CreateIoCompletionPort((HANDLE)clntSock,hComPort,(DWORD)handleInfo,0);
clntSock是我们要进行绑定的socket描写符,hComPort使我们之前创建的CP对象,而handleInfo是我们自定义的结构体,作为参数可以传给GetQueuedCompletionStatus函数。
GetQueuedCompletionStatus在我们定义的线程函数中调用,用于获得CP对象的状态。当IO操作未完成时,该函数会产生阻塞;若IO操作完成,函数产生返回。其函数原型以下:
BOOL WINAPI GetQueuedCompletionStatus( __in HANDLE CompletionPort, __out LPDWORD lpNumberOfBytes, __out PULONG_PTR lpCompletionKey, __out LPOVERLAPPED *lpOverlapped, __in DWORD dwMilliseconds );
CompletionPort —— 进行注册过的完成对象的句柄。
lpNumberOfBytes —— 完成IO对象时传递或接受的字节数。
lpCompletionKey —— 使用CreateIoCompletionPort注册时传递的参数,参数可以传递我们自定义的结构信息。
lpOverlapped —— 调用WSARecv或WSASend时传递的OVERLAPPED对象指针。
dwMilliseconds —— GetQueuedCompletionStatus阻塞的时间,如果设置成INFINITE时无穷期等待。
#include "stdafx.h" #include "stdio.h" #include "process.h" #include "stdlib.h" #include "WinSock2.h" #include "Windows.h" #pragma comment(lib,"ws2_32.lib") #define BUF_SIZE 100 #define READ 3 #define WRITE 5 typedef struct { SOCKEThClntSock; SOCKADDRclntAdr; }HANDLE_DATA,*LPHANDLE_DATA; typedef struct { OVERLAPPEDoverlapped; WSABUFwsaBuf; char buffer[BUF_SIZE]; int rwMode; }IO_DATA,*LPIO_DATA; unsigned WINAPI ThreadMain(LPVOID CompletionPortIO); void ErrorHandler(char* message); int _tmain(int argc, _TCHAR* argv[]) { WSADatawsaData; HANDLEhComPort; SYSTEM_INFOsysInfo; LPIO_DATAioInfo; LPHANDLE_DATAhandleInfo; SOCKETservSock; SOCKADDR_INservAddr; DWORDrecvBytes,i,flags=0; WSAStartup(MAKEWORD(2,2),&wsaData); hComPort=CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0); GetSystemInfo(&sysInfo); for(i=0;i<sysInfo.dwNumberOfProcessors;i++) _beginthreadex(NULL,0,ThreadMain,(LPVOID)hComPort,0,NULL); servSock=WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED); if(servSock==INVALID_SOCKET) ErrorHandler("WSASocket Error"); memset(&servAddr,0,sizeof(servAddr)); servAddr.sin_family=AF_INET; servAddr.sin_addr.s_addr=htonl(INADDR_ANY); servAddr.sin_port=htons(atoi("8888")); if(bind(servSock,(SOCKADDR*)&servAddr,sizeof(servAddr))==SOCKET_ERROR) ErrorHandler("bind error"); if(listen(servSock,5)==SOCKET_ERROR) ErrorHandler("listen error"); while(1) { SOCKETclntSock; SOCKADDR_INclntAddr; int clntAddrSz; clntSock=accept(servSock,(SOCKADDR*)&clntAddr,&clntAddrSz); handleInfo=(LPHANDLE_DATA)malloc(sizeof(HANDLE_DATA)); handleInfo->hClntSock=clntSock; memcpy(&(handleInfo->clntAdr),&clntAddr,sizeof(clntAddr)); CreateIoCompletionPort((HANDLE)clntSock,hComPort,(DWORD)handleInfo,0); ioInfo=(LPIO_DATA)malloc(sizeof(IO_DATA)); memset(&(ioInfo->overlapped),0,sizeof(OVERLAPPED)); ioInfo->wsaBuf.len=BUF_SIZE; ioInfo->wsaBuf.buf=ioInfo->buffer; ioInfo->rwMode=READ; WSARecv(handleInfo->hClntSock,&(ioInfo->wsaBuf),1,&recvBytes,&flags,&(ioInfo->overlapped),NULL); } WSACleanup(); return 0; } unsigned WINAPI ThreadMain(LPVOID CompletionPortIO) { HANDLEhComPort=(HANDLE)CompletionPortIO; SOCKET sock; DWORDbytesTrans; LPHANDLE_DATAhandleInfo; LPIO_DATAioInfo; DWORD flags; while(1) { GetQueuedCompletionStatus(hComPort,&bytesTrans,(PULONG_PTR)&handleInfo,(LPOVERLAPPED*)&ioInfo,INFINITE); sock=handleInfo->hClntSock; if(ioInfo->rwMode==READ) { puts("message received!"); if(bytesTrans==0) { closesocket(sock); free(handleInfo); free(ioInfo); continue; } memset(&(ioInfo->overlapped),0,sizeof(OVERLAPPED)); ioInfo->wsaBuf.len=bytesTrans; ioInfo->rwMode=WRITE; WSASend(sock,&(ioInfo->wsaBuf),1,NULL,0,&(ioInfo->overlapped),NULL); ioInfo=(LPIO_DATA)malloc(sizeof(IO_DATA)); memset(&(ioInfo->overlapped),0,sizeof(OVERLAPPED)); ioInfo->wsaBuf.len=BUF_SIZE; ioInfo->wsaBuf.buf=ioInfo->buffer; ioInfo->rwMode=READ; WSARecv(sock,&(ioInfo->wsaBuf),1,NULL,&flags,&(ioInfo->overlapped),NULL); } else { puts("message sent"); free(ioInfo); } } return 0; } void ErrorHandler(char* message) { fputs(message,stderr); fputc('\n',stderr); exit(1); }
Github位置:
https://github.com/HymanLiuTS/NetDevelopment
克隆本项目:
git clone git@github.com:HymanLiuTS/NetDevelopment.git
获得本文源代码:
git checkout NL56
上一篇 Java 反射机制浅析
下一篇 python-初始篇(一)