该程序来源于传智博客教师课件,本人只是自己对照着写了一遍并做了注释,该模型可以承载大量客户端的连接二不会出现卡顿等情况,前提是我们交互的数据很少,如果交互数据较大,该例子还是有些小问题的。大家可以自己拓展,配合多线程可以实现大数据多客户端连接传输的程序。具体代码如下:
代码实现
#include <stdio.h> #include <sys/socket.h> #include <sys/epoll.h> #include <arpa/inet.h> #include <fcntl.h> #include <unistd.h> #include <errno.h> #include <string.h> #define MAX_EVENTS 1024 #define BUFLEN 128 #define SERV_PORT 8080 struct myevent_s { int fd; // 监控的文件描述符 int events; // 监控什么事件 void* arg; // 给回调函数传的参数 void (*call_back)(int fd, int events, void* arg); int status; // 当前监听标志,1为在监听,0为未监听 char buf[BUFLEN]; // 用以发送和接收传递数据的buf int len; // 用来存放 buf 长度 long last_active; // 最后交互时间,有超时判断 }; int g_efd; struct myevent_s g_events[MAX_EVENTS + 1]; void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void*), void* arg) { ev->fd = fd; // 设置文件描述符 ev->call_back = call_back; // 设置回调函数 ev->events = 0; // 设置事件为0 ev->arg = arg; // 设置回调函数的参数,在使用时就是传递结构体自身 ev->status = 0; // 设置当前监控状态 ev->last_active = time(NULL); // 设置最后修改时间 return; } void recvdata(int fd, int events, void* arg); void senddata(int fd, int events, void* arg); void eventadd(int efd, int events, struct myevent_s* ev) { // 声明并初始化一个epoll所需结构体 struct epoll_event epv = {0, {0}}; ev->events = events; //设置结构体中的事件,在循环处理过程中会判断使用 epv.events = events; //设置epoll监控的事件 epv.data.ptr = ev; //把结构体传给epoll的联合体,触发事件时会拷贝它到数组 // 用以给epoll_ctl传递参数 int op; if (ev->status == 1) //如果status是1证明是正在监控,再次进入该函数是想修改 { op = EPOLL_CTL_MOD; //将op设定为修改模式 } else //否则证明是首次添加到监控列表 { op = EPOLL_CTL_ADD; //将op设定为添加模式 ev->status = 1; } // 将文件描述符添加到epoll监控列表 int epoll = epoll_ctl(efd, op, ev->fd, &epv); if (epoll < 0) { printf("event add failed [fd=%d], events[%d]\n", ev->fd, events); } else { printf("event add ok [fd=%d], op=%d, events[%0X]\n", ev->fd, op, events); } return; } void eventdel(int efd, struct myevent_s* ev) { if (ev->status != 1) // 没有监控该事件直接退出 { return; } struct epoll_event epv = {0, {0}}; epv.data.ptr = ev; ev->status = 0; // 从epoll 监控列表中删除该文件描述符的监控 epoll_ctl(efd, EPOLL_CTL_DEL, ev->fd, &epv); return; } void acceptconn(int lfd, int events, void* arg) { struct sockaddr_in cin; socklen_t len = sizeof(cin); int cfd; int i; if ((cfd = accept(lfd, (struct sockaddr*)&cin, &len)) == -1) { if (errno != EAGAIN && errno != EINTR) { /* 没做出错处理 */ } printf("%s: accept, %s\n", __func__, strerror(errno)); return; } do { // 遍历全局的数组寻找到一个当前没有被监听的数组,记录到i中 for (i = 0; i < MAX_EVENTS; i++) { if (g_events[i].status == 0) { break; } } // 判断 i 是不是与数组最大上限一致,如果一致证明没有空位了 if (i == MAX_EVENTS) { printf("%s: max connect limit[%d]\n", __func__, MAX_EVENTS); break; } int flag = 0; if ((flag = fcntl(cfd, F_SETFL, O_NONBLOCK)) < 0) { printf("%s: fcntl nonblocking failed, %s\n", __func__, strerror(errno)); break; } // 将这个新来的cfd初始化,让其默认相应recvdata函数 // 把结构体自身作为回调函数的参数传递,让其可以在回调函数中使用该结构体 eventset(&g_events[i], cfd, recvdata, &g_events[i]); // 添加这个新的文件描述符到监控列表 eventadd(g_efd, EPOLLIN, &g_events[i]); }while (0); printf("new connect [%s:%d][time:%ld], pos[%d]\n", inet_ntoa(cin.sin_addr), // 客户端ip ntohs(cin.sin_port), // 客户端port g_events[i].last_active, // 客户端最后交互时间 i); // 客户端在全局数组中的位置 } void recvdata(int fd, int events, void* arg) { // 先把参数转成自身结构体 struct myevent_s* ev = (struct myevent_s*)arg; // 接收数据 int len = recv(fd, ev->buf, sizeof(ev->buf), 0); // 接收完数据后立即从监控列表中将这个文件描述符删除 eventdel(g_efd, ev); // 如果接收到了数据 if (len > 0) { // 把读取到的字符长度赋值给结构体len,在后面的发送事件中会用到这个长度 ev->len = len; // 把buf加上\0打印一下内容 ev->buf[len] = '\0'; printf("C[%d]:%s\n", fd, ev->buf); // 转为发送事件 eventset(ev, fd, senddata, ev); eventadd(g_efd, EPOLLOUT, ev); } else if (len == 0) { close(ev->fd); printf("[fd=%d] pos[%d], closed\n", fd, ev - g_events); } else { close(ev->fd); printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno)); } } void senddata(int fd, int events, void* arg) { // 把参数转成自身结构体 struct myevent_s* ev = (struct myevent_s*)arg; // ev->len 是recvdata时设定的长度,ev->buf读时也用,写时也用 int len = send(fd, ev->buf, ev->len, 0); // 从监控列表删除 eventdel(g_efd, ev); if (len > 0) { // 如果读取到了数据,那么打印接收的数据 printf("send[fd=%d], [%d]%s\n", fd, len, ev->buf); // 再次将事件添加到监控列表,回调函数改回recvdata eventset(ev, fd, recvdata, ev); // 监控 EPOLLIN 事件 eventadd(g_efd, EPOLLIN, ev); } else { // 否则判定为出现错误,关闭文件描述符并打印错误信息 close(ev->fd); printf("send[fd=%d] error %s\n", fd, strerror(errno)); } return; } void initlistensocket(int efd, short port) { int lfd = socket(AF_INET, SOCK_STREAM, 0); // 修改为非阻塞模式 fcntl(lfd, F_SETFL, O_NONBLOCK); // 初始化结构体,g_events[MAX_EVENTS] 是结构体数组最后一个元素,用来保存lfd eventset(&g_events[MAX_EVENTS], lfd, acceptconn, &g_events[MAX_EVENTS]); // 添加到监控事件列表 eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]); struct sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_addr.s_addr = INADDR_ANY; sin.sin_port = htons(port); bind(lfd, (struct sockaddr*)&sin, sizeof(sin)); listen(lfd, 20); return; } int main(int argc, char* argv[]) { // 接收参数传递过来的端口 unsigned short port = SERV_PORT; if (argc == 2) { port = atoi(argv[1]); } // 创建epoll句柄储存到全局变量g_efd中 g_efd = epoll_create(MAX_EVENTS + 1); if (g_efd <= 0) { printf("create efd in %s err %s\n", __func__, strerror(errno)); } // 调用上面初始化socket的函数 initlistensocket(g_efd, port); // 创建 epoll 事件循环所需的数组 struct epoll_event events[MAX_EVENTS + 1]; printf("server running : port[%d]\n", port); int checkpos = 0; int i; while (1) { // 验证客户端超时机制 // 取当前时间 long now = time(NULL); for (i = 0; i < 100; i++, checkpos++) { // 判断下标变量是否超过了数组最大值 if (checkpos == MAX_EVENTS) { checkpos = 0; } // 如果状态不是监听中,那么跳出到下一个循环 if (g_events[checkpos].status != 1) { continue; } // 当前时间-最后一次交互时间 long duration = now - g_events[checkpos].last_active; // 如果差超过了60秒,那么关闭这个socket if (duration >= 60) { close(g_events[checkpos].fd); printf("[fd=%d] timeout\n", g_events[checkpos].fd); eventdel(g_efd, &g_events[checkpos]); } } // 等待事件发生 int nfd = epoll_wait(g_efd, events, MAX_EVENTS + 1, 1000); if (nfd < 0) { printf("epoll_wait error, exit\n"); break; } for (i = 0; i < nfd; i++) { // events[i].data.ptr 中存放的是myevent_s结构体,将其解析出来 struct myevent_s* ev = (struct myevent_s*)events[i].data.ptr; // 如果epoll事件和结构体中记录的事件都是EPOLLIN那么调用结构体中的回调函数 if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) { ev->call_back(ev->fd, events[i].events, ev->arg); } if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) { ev->call_back(ev->fd, events[i].events, ev->arg); } } } return 0; }