iocp

  1. iocp模型的设计初衷是配合线程池处理大规模数据处理
  2. 一般建议使用iocp 收数据
  3. iocp是微软封装好的高集成度的使用方式
  4. socket异常/不在需要时 要及时 closesocket,否则GetQueuedCompletionStatus会阻塞

使用逻辑

  1. 创建iocp对象(CreateIoCompletionPort)
  2. 创建多个线程 等待iocp消息(GetQueuedCompletionStatus)
  3. 监听端口 将 建立的socket关联到 iocp对象(CreateIoCompletionPort),并且投递一次 消息接受请求(WSARecv)
  4. GetQueuedCompletionStatus中如果接收到某个 socket异常/关闭,要及时closesocket

必备知识

WSASend

在IOCP中投递WSASend返回WSA_IO_PENDING的时候,表示异步投递已经成功,但是稍后发送才会完成。这其中涉及到了三个缓冲区。

网卡缓冲区,TCP/IP层缓冲区,程序缓冲区。

  1. 情况一:调用WSASend发送正确的时候(即立即返回,且没有错误),TCP/IP将数据从程序缓冲区中拷贝到TCP/IP层缓冲区中,然后不锁定该程序缓冲区,由上层程序自己处理。TCP/IP层缓冲区在网络合适的时候,将其数据拷贝到网卡缓冲区,进行真正的发送。
  2. 情况二:调用WSASend发送错误,但是错误码是WSA_IO_PENDING的时候,表示此时TCP/IP层缓冲区已满,暂时没有剩余的空间将程序缓冲区的数据拷贝出来,这时系统将锁定用户的程序缓冲区,按照书上说的WSASend指定的缓冲区将会被锁定到系统的非分页内存中。直到TCP/IP层缓冲区有空余的地方来接受拷贝我们的程序缓冲区数据才拷贝走,并将给IOCP一个完成消息。
  3. 情况三:调用WSASend发送错误,但是错误码不是WSA_IO_PENDING,此时应该是发送错误,应该释放该SOCKET对应的所有资源。

WSARecv

  1. 情况一:调用WSARecv正确,TCP/IP将数据从TCP/IP层缓冲区拷贝到缓冲区,然后由我们的程序自行处理了。清除TCP/IP层缓冲区数据。
  2. 情况二:调用WSARecv错误,但是返回值是WSA_IO_PENDING,此时是因为TCP/IP层缓冲区中没有数据可取,系统将会锁定我们投递的WSARecv的buffer,直到TCP/IP层缓冲区中有新的数据到来。
  3. 情况三:调用WSARecv错误,错误值不是WSA_IO_PENDING,此时是接收出错,应该释放该SOCKET对应的所有资源。

WSANOBUF

在以上情况中有几个非常要注意的事情:

  1. 系统锁定非分页内存的时候,最小的锁定大小是4K(当然,这个取决于您系统的设置,也可以设置小一些,在注册表里面可以改)
  2. 当我们投递了很多WSARecv或者WSASend的时候,不管我们投递的Buffer有多大(0除外),系统在出现IO_PENGDING的时候,都会锁定我们4K的内存。这也就是经常有开发者出现WSANOBUF的情况原因了。

我们在解决这个问题的时候,要针对WSASend和WSARecv做处理

  1. 投递WSARecv的时候,可以采用一个巧妙的设计,先投递0大小Buf的WSARecv,如果返回,表示有数据可以接收,我们开启真正的recv将数据从TCP/IP层缓冲区取出来,直到WSA_IO_PENGDING.
  2. 对投递的WSARecv以及WSASend进行计数统计,如果超过了我们预定义的值,就不进行WSASend或者WSARecv投递了。
  3. 现在我们应该就可以明白为什么WSASend会返回小于我们投递的buffer空间数据值了,是因为TCP/IP层缓冲区小于我们要发送的缓冲区,TCP/IP只会拷贝他剩余可被Copy的缓冲区大小的数据走,然后给我们的WSASend的已发送缓冲区设置为移走的大小,下一次投递的时候,如果TCP/IP层还未被发送,将返回WSA_IO_PENGDING
  4. 在很多地方有提到,可以关闭TCP/IP层缓冲区,可以提高一些效率和性能,这个从上面的分析来看,有这个可能,要实际的网络情况去实际分析了。

函数原型

//创建iocp对象
HANDLE hd_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE,//创建iocp必须用这个参数
                                        NULL,//创建iocp是这里设定为 NULL
                                        0,//创建iocp是这里设定为 NULL
                                        2//最多允许几个线程运行
                                        );
//从消息队列中取出消息(完成的消息)
bRet = GetQueuedCompletionStatus(hd_iocp,
                                 &dwTransferBytes,//返回接收到的数据大小
                                 &dwCompleteKey,//socket绑定到iocp时设定的值,我们这里是socket句柄,这里的指针不能指向局部变量,建议new出来一个副本
                                 &lpOl,         //请求数据时我们利用这个参数夹带私货
                                 INFINITE);
//将链接绑定到iocp
HANDLE dwRet = CreateIoCompletionPort((HANDLE)sClient,//这里填写支持ol的对象,建议直接填写socket句柄
                                      hd_iocp,//这里填写绑定的iocp
                                      (ULONG_PTR)&sClient,//处理iocp事件时会接收到这个参数
                                      0//iocp的线程数,如果函数用于绑定iocp,那么这里设置的值会被忽略
                                      );
//请求接收数据
int nRet = WSARecv(sClient, //请求的socket
                   &wsaBuf,// 接收数据的配置信息
                   1,
                   &dwReadedBytes, //获取 读取了多少数据
                   &flags,//指定 接收数据的方式,取出数据,偷看数据等等
                   &pRecvOl->ol,//内部使用,指向WSAOVERLAPPED结构的指针
                   NULL//指向完成接收操作完成时调用的完成例程的指针,我们这里忽略
                   );
// 请求发送数据
int nRet = WSASend(dst_socket,//请求的socket
                   &wsaBuf,// 接收数据的配置信息
                   1,
                   &dwSendedBytes,//获取 发送了多少数据
                   0,//指定 发送数据的方式
                   &pWriteOl->ol,//内部使用,指向WSAOVERLAPPED结构的指针
                   NULL//指向完成接收操作完成时调用的完成例程的指针,我们这里忽略
                   );

实例

// iocp模型.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
//
    
#include "pch.h"
#include <iostream>
#include <thread>
#include <mutex>
#include <map>
#include <Winsock2.h>
#include <WS2tcpip.h>
#pragma comment(lib, "ws2_32.lib")
    
    
std::mutex mutex_data_send_buf;
std::map<SOCKET, std::string> sockets_send_buf;
    
/*
线程池里面的GetQueuedCompletionStatus有时候会阻塞
可以使用关闭IOCP句柄或者使用PostQueuedCompletionStatus( hCompletionPort, DWORD(0), 0, NULL );来退出GetQueuedCompletionStatus
*/
    
////////////自定义结构体,用于夹带私货
/*
iocp模型中 请求接收数据时 可以传递一个参数(WSAOVERLAPPED)给 最后的消息处理者
我们重定义一个结构体 包含 WSAOVERLAPPED,以及我们自己的私货
这样 处理消息时就可以直接获取到我们夹杂的私货
私货一般是接收到的数据
*/
    
enum IOTYPE
{
    IO_READ,
    IO_WRITE,
    IO_CLOSE,
    IO_ACCEPT,
};
    
struct MyOverlapped
{
    MyOverlapped(IOTYPE type)
    {
        nType = type;
        memset(&ol, 0, sizeof(OVERLAPPED));
    }
    
    IOTYPE nType;
    WSAOVERLAPPED ol;
    char pRecvBuf[8192];
    
    int send_size = 0;
    char *pSendBuf=NULL;
};
////////////自定义结构体,用于夹带私货
    
    
/*
windows专属的 初始化api
使用Winsock 之前一定要运行这段代码
*/
void initial_win_socket()
{
    WORD wVersionRequested;
    WSADATA wsaData;
    int err;
    
    wVersionRequested = MAKEWORD(2, 2);
    
    err = WSAStartup(wVersionRequested, &wsaData);
    if(err != 0)
    {
        /* Tell the user that we could not find a usable */
        /* WinSock DLL.                                  */
        return;
    }
    
}
    
    
//请求接收数据,接收结果传递给iocp消息处理器
void PostRecv(SOCKET sClient)
{
    
    
    DWORD dwReadedBytes = 0;
    //1 生成我们的私有结构体
    MyOverlapped* pRecvOl = new MyOverlapped(IO_READ);
    
    //2 配置 接收数据的 地址
    WSABUF wsaBuf;
    wsaBuf.buf = pRecvOl->pRecvBuf;
    wsaBuf.len = 8192;
    
    DWORD flags = 0;
    
    //3 请求接收数据
    int nRet = WSARecv(sClient,
                       &wsaBuf,
                       1,
                       &dwReadedBytes,
                       &flags,
                       &pRecvOl->ol,
                       NULL);
    
    if(nRet == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
    {
        printf("数据接收错误\r\n");
        return;
    }
    else
    {
        printf("数据接收正常\r\n");
        return;
    }
}
    
/*
请求发送数据
本设计使用了双缓冲队列发送
1. 凡是直接使用PostSend发送的数据都会直接发送
2. 并且 PostSend 处理之后会触发  消息处理器
                                        ->查询 socket消息队列,循环取出消息进行发送
两种方案二选一:
要么直接使用消息队列发送
要么让消息队列为空,每次都调用 PostSend 直接投递数据
*/
void PostSend(SOCKET dst_socket,const char * data,int n_size)
{
    DWORD dwSendedBytes = 0;
    //1 生成我们的私有结构体
    MyOverlapped* pWriteOl = new MyOverlapped(IO_WRITE);
    
    //2 配置 发送数据的 地址
    pWriteOl->pSendBuf = new char[n_size];
    pWriteOl->send_size = n_size;
    memcpy(pWriteOl->pSendBuf,data,n_size);
    
    WSABUF wsaBuf;
    wsaBuf.buf = pWriteOl->pSendBuf;
    wsaBuf.len = n_size;
    
    DWORD flags = 0;
    
    //3 请求发送数据
    int nRet = WSASend(dst_socket,
                       &wsaBuf,
                       1,
                       &dwSendedBytes,
                       0,
                       &pWriteOl->ol,
                       NULL);
    if(nRet == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
    {
        printf("数据发送错误\r\n");
        return;
    }
    else
    {
        printf("数据发送正常\r\n");
        return;
    }
}
    
/*
输入  ip端口 开始绑定,监听端口
将建立的链接 关联到iocp上
    
*/
void bind_port(const char * bind_ip, int bind_port, HANDLE hd_iocp)
{
    //1 建立SOCKET(套接字)
    SOCKET s = socket(AF_INET,
                      SOCK_STREAM,//数据流形式
                      IPPROTO_TCP);
    if(s == INVALID_SOCKET)
    {
        return;
    }
    int nRet;
    
    //2 配置监听端口的结构体
    sockaddr_in name;
    name.sin_family = AF_INET;
    name.sin_port = htons(bind_port);
    inet_pton(AF_INET, bind_ip, &(name.sin_addr.S_un.S_addr));//将ip地址转化为 api使用的比特流形式
    
    // 3 将socket 绑定到端口
    nRet = bind(s,
        (struct sockaddr*)&name,
                sizeof(name));
    
    if(nRet == SOCKET_ERROR)
    {
        return;
    }
    //4 开始监听
    nRet = listen(s, SOMAXCONN/* 设置socket队列最大数量*/);
    if(nRet == SOCKET_ERROR)
    {
        return;
    }
    
    //5 等待新的链接建立
    sockaddr_in recvName = { 0 };
    recvName.sin_family = AF_INET;
    int nLength = sizeof(sockaddr_in);
    
    while(1)
    {
        SOCKET sClient = accept(s, (sockaddr*)&recvName, &nLength);//等待链接
    
        if(sClient == INVALID_SOCKET)
        {
            //closesocket(sClient);
            continue;
        }
    
        //将链接绑定到iocp
        SOCKET *new_set = new SOCKET;
        *new_set = sClient;
        HANDLE dwRet = CreateIoCompletionPort((HANDLE)sClient,//这里填写支持ol的对象,建议直接填写socket句柄
                                              hd_iocp,//这里填写绑定的iocp
                                              (ULONG_PTR)new_set,//处理iocp事件时会接收到这个参数,这里一定要分配新的内存空间
                                              1//iocp的线程数,如果函数用于绑定iocp,那么这里设置的值会被忽略
        );
    
        if(dwRet == INVALID_HANDLE_VALUE)
        {
            break;
        }
        //请求新的数据
        PostRecv(sClient);
    }
    
    closesocket(s);
}
    
    
    
//消息处理器
/*
由于是异步处理socket 
这里接收到的数据可能是有多个 send构成的 或0.?个send构成
注意消息的提取与分离
*/
void iocp_msg_receive_thread(HANDLE hd_iocp)
{
    DWORD dwTransferBytes = 0;
    ULONG_PTR dwCompleteKey = 0;
    LPOVERLAPPED lpOl = NULL;
    BOOL bRet = false;
    
    while(true)
    {
        //从消息队列中取出消息(完成的消息)
        bRet = GetQueuedCompletionStatus(hd_iocp,
                                         &dwTransferBytes,//返回接收到的数据大小
                                         &dwCompleteKey,//socket绑定到iocp时设定的值,我们这里是socket句柄
                                         &lpOl,         //请求数据时我们利用这个参数夹带私货
                                         INFINITE);
    
        //获取socket句柄
        SOCKET s = *(SOCKET*)dwCompleteKey;
        //处理消息
        MyOverlapped* pOl = CONTAINING_RECORD(lpOl, MyOverlapped, ol);
    
        if(!bRet)
        {
            printf("数据接收异常,关闭socket:%d\r\n",s);
            closesocket(s);
            if(dwCompleteKey != NULL)
            {
                delete (SOCKET*)dwCompleteKey;
            }
            if(pOl->pSendBuf!=NULL)
            {
                delete pOl->pSendBuf;
                pOl->pSendBuf = NULL;
            }
            if(pOl != NULL)
            {
                delete pOl;
                pOl = NULL;
            }
            continue;
        }
    
        //读取消息
        if(pOl->nType==IO_READ)
        {
            printf("线程%d接收到_socket%d 数据:\r\n", std::this_thread::get_id(), s);
            for(int i = 0; i < dwTransferBytes; i++)
            {
                //printf("%c", *((char *)&(pOl->pRecvBuf[0])+i));
                printf("%c", (pOl->pRecvBuf)[i]);
            }
            puts("\r\n");
    
            //回复信息
            mutex_data_send_buf.lock();
            sockets_send_buf[s].append("这是来自服务器的回应");//增加要发送的数据到队列里
            //PostQueuedCompletionStatus();
            PostSend(s,"",0);//强行发送0字节,这里只是为了触发我们的消息处理器
            mutex_data_send_buf.unlock();
    
            //再次请求收取信息
            PostRecv(s);
        }
        //发送消息
        else if(pOl->nType == IO_WRITE)
        {
            mutex_data_send_buf.lock();
    
            //移除已经发送成的消息
            if(dwTransferBytes > 0 &&
               sockets_send_buf[s].size()>= dwTransferBytes
               )
            {
                sockets_send_buf[s].erase(0, dwTransferBytes);
            }
    
            //还有多少数据没有发送
            int size_data_residue = sockets_send_buf[s].size();
    
            //发送剩余的消息
            if(size_data_residue > 0)
            {
                //一次性全部发送
                PostSend(s, sockets_send_buf[s].c_str(), size_data_residue);
                
                //强行制定发送的分片大小
                //PostSend(s, sockets_send_buf[s].c_str(), 2);
            }
            delete pOl->pSendBuf;
            mutex_data_send_buf.unlock();
        }
        delete pOl;
    }
    
    
}
    
    
int main()
{
    initial_win_socket();
    
    //1 创建iocp对象
    HANDLE hd_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE,//创建iocp必须用这个参数
                                            NULL,//创建iocp是这里设定为 NULL
                                            0,//创建iocp是这里设定为 NULL
                                            2//最多允许几个线程运行
    );
    
    //2 端口监听 绑定iocp
    new std::thread(bind_port, "0.0.0.0", 10086, hd_iocp);
    
    //3 创建消息处理线程
    std::thread t0(iocp_msg_receive_thread,hd_iocp);
    std::thread t1(iocp_msg_receive_thread,hd_iocp);
    std::thread t2(iocp_msg_receive_thread,hd_iocp);
    t2.join();
    
}
Last modification:November 17, 2018
如果觉得我的文章对你有用,请随意赞赏