iocp
iocp
模型的设计初衷是配合线程池处理大规模数据处理- 一般建议使用
iocp
收数据 iocp
是微软封装好的高集成度的使用方式socket
异常/不在需要时 要及时closesocket
,否则GetQueuedCompletionStatus
会阻塞
使用逻辑
- 创建
iocp
对象(CreateIoCompletionPort
) - 创建多个线程 等待
iocp
消息(GetQueuedCompletionStatus
) - 监听端口 将 建立的
socket
关联到iocp
对象(CreateIoCompletionPort
),并且投递一次 消息接受请求(WSARecv
) GetQueuedCompletionStatus
中如果接收到某个socket
异常/关闭,要及时closesocket
必备知识
WSASend
在IOCP中投递WSASend
返回WSA_IO_PENDING
的时候,表示异步投递已经成功,但是稍后发送才会完成。这其中涉及到了三个缓冲区。
网卡缓冲区,TCP/IP层缓冲区,程序缓冲区。
- 情况一:调用WSASend发送正确的时候(即立即返回,且没有错误),TCP/IP将数据从程序缓冲区中拷贝到TCP/IP层缓冲区中,然后不锁定该程序缓冲区,由上层程序自己处理。TCP/IP层缓冲区在网络合适的时候,将其数据拷贝到网卡缓冲区,进行真正的发送。
- 情况二:调用WSASend发送错误,但是错误码是WSA_IO_PENDING的时候,表示此时TCP/IP层缓冲区已满,暂时没有剩余的空间将程序缓冲区的数据拷贝出来,这时系统将锁定用户的程序缓冲区,按照书上说的WSASend指定的缓冲区将会被锁定到系统的非分页内存中。直到TCP/IP层缓冲区有空余的地方来接受拷贝我们的程序缓冲区数据才拷贝走,并将给IOCP一个完成消息。
- 情况三:调用WSASend发送错误,但是错误码不是WSA_IO_PENDING,此时应该是发送错误,应该释放该SOCKET对应的所有资源。
WSARecv
- 情况一:调用WSARecv正确,TCP/IP将数据从TCP/IP层缓冲区拷贝到缓冲区,然后由我们的程序自行处理了。清除TCP/IP层缓冲区数据。
- 情况二:调用WSARecv错误,但是返回值是WSA_IO_PENDING,此时是因为TCP/IP层缓冲区中没有数据可取,系统将会锁定我们投递的WSARecv的buffer,直到TCP/IP层缓冲区中有新的数据到来。
- 情况三:调用WSARecv错误,错误值不是WSA_IO_PENDING,此时是接收出错,应该释放该SOCKET对应的所有资源。
WSANOBUF
在以上情况中有几个非常要注意的事情:
- 系统锁定非分页内存的时候,最小的锁定大小是4K(当然,这个取决于您系统的设置,也可以设置小一些,在注册表里面可以改)
- 当我们投递了很多
WSARecv
或者WSASend
的时候,不管我们投递的Buffer有多大(0除外),系统在出现IO_PENGDING的时候,都会锁定我们4K的内存。这也就是经常有开发者出现WSANOBUF的情况原因了。
我们在解决这个问题的时候,要针对WSASend和WSARecv做处理
- 投递
WSARecv
的时候,可以采用一个巧妙的设计,先投递0大小Buf的WSARecv,如果返回,表示有数据可以接收,我们开启真正的recv将数据从TCP/IP层缓冲区取出来,直到WSA_IO_PENGDING
. - 对投递的
WSARecv
以及WSASend
进行计数统计,如果超过了我们预定义的值,就不进行WSASend
或者WSARecv
投递了。 - 现在我们应该就可以明白为什么WSASend会返回小于我们投递的buffer空间数据值了,是因为TCP/IP层缓冲区小于我们要发送的缓冲区,TCP/IP只会拷贝他剩余可被Copy的缓冲区大小的数据走,然后给我们的WSASend的已发送缓冲区设置为移走的大小,下一次投递的时候,如果TCP/IP层还未被发送,将返回
WSA_IO_PENGDING
- 在很多地方有提到,可以关闭TCP/IP层缓冲区,可以提高一些效率和性能,这个从上面的分析来看,有这个可能,要实际的网络情况去实际分析了。
函数原型
//创建iocp对象
HANDLE hd_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE,//创建iocp必须用这个参数
NULL,//创建iocp是这里设定为 NULL
0,//创建iocp是这里设定为 NULL
2//最多允许几个线程运行
);
//从消息队列中取出消息(完成的消息)
bRet = GetQueuedCompletionStatus(hd_iocp,
&dwTransferBytes,//返回接收到的数据大小
&dwCompleteKey,//socket绑定到iocp时设定的值,我们这里是socket句柄,这里的指针不能指向局部变量,建议new出来一个副本
&lpOl, //请求数据时我们利用这个参数夹带私货
INFINITE);
//将链接绑定到iocp
HANDLE dwRet = CreateIoCompletionPort((HANDLE)sClient,//这里填写支持ol的对象,建议直接填写socket句柄
hd_iocp,//这里填写绑定的iocp
(ULONG_PTR)&sClient,//处理iocp事件时会接收到这个参数
0//iocp的线程数,如果函数用于绑定iocp,那么这里设置的值会被忽略
);
//请求接收数据
int nRet = WSARecv(sClient, //请求的socket
&wsaBuf,// 接收数据的配置信息
1,
&dwReadedBytes, //获取 读取了多少数据
&flags,//指定 接收数据的方式,取出数据,偷看数据等等
&pRecvOl->ol,//内部使用,指向WSAOVERLAPPED结构的指针
NULL//指向完成接收操作完成时调用的完成例程的指针,我们这里忽略
);
// 请求发送数据
int nRet = WSASend(dst_socket,//请求的socket
&wsaBuf,// 接收数据的配置信息
1,
&dwSendedBytes,//获取 发送了多少数据
0,//指定 发送数据的方式
&pWriteOl->ol,//内部使用,指向WSAOVERLAPPED结构的指针
NULL//指向完成接收操作完成时调用的完成例程的指针,我们这里忽略
);
实例
// iocp模型.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
//
#include "pch.h"
#include <iostream>
#include <thread>
#include <mutex>
#include <map>
#include <Winsock2.h>
#include <WS2tcpip.h>
#pragma comment(lib, "ws2_32.lib")
std::mutex mutex_data_send_buf;
std::map<SOCKET, std::string> sockets_send_buf;
/*
线程池里面的GetQueuedCompletionStatus有时候会阻塞
可以使用关闭IOCP句柄或者使用PostQueuedCompletionStatus( hCompletionPort, DWORD(0), 0, NULL );来退出GetQueuedCompletionStatus
*/
////////////自定义结构体,用于夹带私货
/*
iocp模型中 请求接收数据时 可以传递一个参数(WSAOVERLAPPED)给 最后的消息处理者
我们重定义一个结构体 包含 WSAOVERLAPPED,以及我们自己的私货
这样 处理消息时就可以直接获取到我们夹杂的私货
私货一般是接收到的数据
*/
enum IOTYPE
{
IO_READ,
IO_WRITE,
IO_CLOSE,
IO_ACCEPT,
};
struct MyOverlapped
{
MyOverlapped(IOTYPE type)
{
nType = type;
memset(&ol, 0, sizeof(OVERLAPPED));
}
IOTYPE nType;
WSAOVERLAPPED ol;
char pRecvBuf[8192];
int send_size = 0;
char *pSendBuf=NULL;
};
////////////自定义结构体,用于夹带私货
/*
windows专属的 初始化api
使用Winsock 之前一定要运行这段代码
*/
void initial_win_socket()
{
WORD wVersionRequested;
WSADATA wsaData;
int err;
wVersionRequested = MAKEWORD(2, 2);
err = WSAStartup(wVersionRequested, &wsaData);
if(err != 0)
{
/* Tell the user that we could not find a usable */
/* WinSock DLL. */
return;
}
}
//请求接收数据,接收结果传递给iocp消息处理器
void PostRecv(SOCKET sClient)
{
DWORD dwReadedBytes = 0;
//1 生成我们的私有结构体
MyOverlapped* pRecvOl = new MyOverlapped(IO_READ);
//2 配置 接收数据的 地址
WSABUF wsaBuf;
wsaBuf.buf = pRecvOl->pRecvBuf;
wsaBuf.len = 8192;
DWORD flags = 0;
//3 请求接收数据
int nRet = WSARecv(sClient,
&wsaBuf,
1,
&dwReadedBytes,
&flags,
&pRecvOl->ol,
NULL);
if(nRet == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
{
printf("数据接收错误\r\n");
return;
}
else
{
printf("数据接收正常\r\n");
return;
}
}
/*
请求发送数据
本设计使用了双缓冲队列发送
1. 凡是直接使用PostSend发送的数据都会直接发送
2. 并且 PostSend 处理之后会触发 消息处理器
->查询 socket消息队列,循环取出消息进行发送
两种方案二选一:
要么直接使用消息队列发送
要么让消息队列为空,每次都调用 PostSend 直接投递数据
*/
void PostSend(SOCKET dst_socket,const char * data,int n_size)
{
DWORD dwSendedBytes = 0;
//1 生成我们的私有结构体
MyOverlapped* pWriteOl = new MyOverlapped(IO_WRITE);
//2 配置 发送数据的 地址
pWriteOl->pSendBuf = new char[n_size];
pWriteOl->send_size = n_size;
memcpy(pWriteOl->pSendBuf,data,n_size);
WSABUF wsaBuf;
wsaBuf.buf = pWriteOl->pSendBuf;
wsaBuf.len = n_size;
DWORD flags = 0;
//3 请求发送数据
int nRet = WSASend(dst_socket,
&wsaBuf,
1,
&dwSendedBytes,
0,
&pWriteOl->ol,
NULL);
if(nRet == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
{
printf("数据发送错误\r\n");
return;
}
else
{
printf("数据发送正常\r\n");
return;
}
}
/*
输入 ip端口 开始绑定,监听端口
将建立的链接 关联到iocp上
*/
void bind_port(const char * bind_ip, int bind_port, HANDLE hd_iocp)
{
//1 建立SOCKET(套接字)
SOCKET s = socket(AF_INET,
SOCK_STREAM,//数据流形式
IPPROTO_TCP);
if(s == INVALID_SOCKET)
{
return;
}
int nRet;
//2 配置监听端口的结构体
sockaddr_in name;
name.sin_family = AF_INET;
name.sin_port = htons(bind_port);
inet_pton(AF_INET, bind_ip, &(name.sin_addr.S_un.S_addr));//将ip地址转化为 api使用的比特流形式
// 3 将socket 绑定到端口
nRet = bind(s,
(struct sockaddr*)&name,
sizeof(name));
if(nRet == SOCKET_ERROR)
{
return;
}
//4 开始监听
nRet = listen(s, SOMAXCONN/* 设置socket队列最大数量*/);
if(nRet == SOCKET_ERROR)
{
return;
}
//5 等待新的链接建立
sockaddr_in recvName = { 0 };
recvName.sin_family = AF_INET;
int nLength = sizeof(sockaddr_in);
while(1)
{
SOCKET sClient = accept(s, (sockaddr*)&recvName, &nLength);//等待链接
if(sClient == INVALID_SOCKET)
{
//closesocket(sClient);
continue;
}
//将链接绑定到iocp
SOCKET *new_set = new SOCKET;
*new_set = sClient;
HANDLE dwRet = CreateIoCompletionPort((HANDLE)sClient,//这里填写支持ol的对象,建议直接填写socket句柄
hd_iocp,//这里填写绑定的iocp
(ULONG_PTR)new_set,//处理iocp事件时会接收到这个参数,这里一定要分配新的内存空间
1//iocp的线程数,如果函数用于绑定iocp,那么这里设置的值会被忽略
);
if(dwRet == INVALID_HANDLE_VALUE)
{
break;
}
//请求新的数据
PostRecv(sClient);
}
closesocket(s);
}
//消息处理器
/*
由于是异步处理socket
这里接收到的数据可能是有多个 send构成的 或0.?个send构成
注意消息的提取与分离
*/
void iocp_msg_receive_thread(HANDLE hd_iocp)
{
DWORD dwTransferBytes = 0;
ULONG_PTR dwCompleteKey = 0;
LPOVERLAPPED lpOl = NULL;
BOOL bRet = false;
while(true)
{
//从消息队列中取出消息(完成的消息)
bRet = GetQueuedCompletionStatus(hd_iocp,
&dwTransferBytes,//返回接收到的数据大小
&dwCompleteKey,//socket绑定到iocp时设定的值,我们这里是socket句柄
&lpOl, //请求数据时我们利用这个参数夹带私货
INFINITE);
//获取socket句柄
SOCKET s = *(SOCKET*)dwCompleteKey;
//处理消息
MyOverlapped* pOl = CONTAINING_RECORD(lpOl, MyOverlapped, ol);
if(!bRet)
{
printf("数据接收异常,关闭socket:%d\r\n",s);
closesocket(s);
if(dwCompleteKey != NULL)
{
delete (SOCKET*)dwCompleteKey;
}
if(pOl->pSendBuf!=NULL)
{
delete pOl->pSendBuf;
pOl->pSendBuf = NULL;
}
if(pOl != NULL)
{
delete pOl;
pOl = NULL;
}
continue;
}
//读取消息
if(pOl->nType==IO_READ)
{
printf("线程%d接收到_socket%d 数据:\r\n", std::this_thread::get_id(), s);
for(int i = 0; i < dwTransferBytes; i++)
{
//printf("%c", *((char *)&(pOl->pRecvBuf[0])+i));
printf("%c", (pOl->pRecvBuf)[i]);
}
puts("\r\n");
//回复信息
mutex_data_send_buf.lock();
sockets_send_buf[s].append("这是来自服务器的回应");//增加要发送的数据到队列里
//PostQueuedCompletionStatus();
PostSend(s,"",0);//强行发送0字节,这里只是为了触发我们的消息处理器
mutex_data_send_buf.unlock();
//再次请求收取信息
PostRecv(s);
}
//发送消息
else if(pOl->nType == IO_WRITE)
{
mutex_data_send_buf.lock();
//移除已经发送成的消息
if(dwTransferBytes > 0 &&
sockets_send_buf[s].size()>= dwTransferBytes
)
{
sockets_send_buf[s].erase(0, dwTransferBytes);
}
//还有多少数据没有发送
int size_data_residue = sockets_send_buf[s].size();
//发送剩余的消息
if(size_data_residue > 0)
{
//一次性全部发送
PostSend(s, sockets_send_buf[s].c_str(), size_data_residue);
//强行制定发送的分片大小
//PostSend(s, sockets_send_buf[s].c_str(), 2);
}
delete pOl->pSendBuf;
mutex_data_send_buf.unlock();
}
delete pOl;
}
}
int main()
{
initial_win_socket();
//1 创建iocp对象
HANDLE hd_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE,//创建iocp必须用这个参数
NULL,//创建iocp是这里设定为 NULL
0,//创建iocp是这里设定为 NULL
2//最多允许几个线程运行
);
//2 端口监听 绑定iocp
new std::thread(bind_port, "0.0.0.0", 10086, hd_iocp);
//3 创建消息处理线程
std::thread t0(iocp_msg_receive_thread,hd_iocp);
std::thread t1(iocp_msg_receive_thread,hd_iocp);
std::thread t2(iocp_msg_receive_thread,hd_iocp);
t2.join();
}