多路 io 转接模型 select/poll

多路io转发服务器模型也是为了解决大并发多客户端场景下的问题,比多进程、多线程开销要少。多进程多线程常规情况下都是使用 accept 或 read 函数在阻塞等接收客户端发送过来的数据,而多路io模型则是提供了一个系统函数,该函数负责阻塞判断各路被监控的文件描述符是否有数据读取或写入操作,当有数据读取或写入时再让 accept 或 read 去直接处理从而不会阻塞,系统函数可能会同时返回多个有数据的文件描述符等待后面的代码处理,所以效率上要比多进程和多线程同时只在一个位置阻塞获取数据效率要高一些,下面就介绍一下多路 io 模型 select 和 poll,poll 模型较 select 模型还存在一些优势,在本文后面将介绍。


公共头文件和客户端代码

/* wrap.h */
#ifndef __WRAP_H__
#define __WRAP_H__

void perr_exit(const char* s);
int Accept(int fd, struct sockaddr* sa, socklen_t* salenptr);
void Bind(int fd, const struct sockaddr* sa, socklen_t salen);
void Connect(int fd, const struct sockaddr* sa, socklen_t salen);
void Listen(int fd, int backlog);
int Socket(int family, int type, int protocol);
ssize_t Read(int fd, void* ptr, size_t nbytes);
ssize_t Write(int fd, const void* ptr, size_t nbytes);
void Close(int fd);
ssize_t Readn(int fd, void* vptr, size_t n);
ssize_t Writen(int fd, const void* vptr, size_t n);
static ssize_t my_read(int fd, char* ptr);
ssize_t Readline(int fd, void* vptr, size_t maxlen);

#endif
#include <stdlib.h>
#include <errno.h>
#include <sys/socket.h>
#include <stdio.h>
#include <unistd.h>
#include "wrap.h"

void perr_exit(const char* s)
{
	perror(s);
	exit(1);
}

int Accept(int fd, struct sockaddr* sa, socklen_t* salenptr)
{
	int n;

again:
	if ( (n = accept(fd, sa, salenptr)) < 0 )
	{
		if ((errno == ECONNABORTED) || (errno == EINTR))
		{
			goto again;
		}
		else
		{
			perr_exit("accept error");
		}
	}
	return n;
}

void Bind(int fd, const struct sockaddr* sa, socklen_t salen)
{
	if ( bind(fd, sa, salen) < 0 )
	{
		perr_exit("bind error");
	}
}

void Connect(int fd, const struct sockaddr* sa, socklen_t salen)
{
	if ( connect(fd, sa, salen) < 0 )
	{
		perr_exit("connect error");
	}
}

void Listen(int fd, int backlog)
{
	if ( listen(fd, backlog) < 0 )
	{
		perr_exit("listen error");
	}
}

int Socket(int family, int type, int protocol)
{
	int n = socket(family, type, protocol);
	if ( n < 0 )
	{
		perr_exit("socket error");
	}
	return n;
}

ssize_t Read(int fd, void* ptr, size_t nbytes)
{
	ssize_t n;

again:
	if ( (n = read(fd, ptr, nbytes)) == -1)
	{
		if (errno == EINTR)
		{
			goto again;
		}
		else
		{
			return -1;
		}
	}
	return n;
}

ssize_t Write(int fd, const void* ptr, size_t nbytes)
{
	ssize_t n;

again:
	if ( (n = write(fd, ptr, nbytes)) == -1)
	{
		if (errno == EINTR)
		{
			goto again;
		}
		else
		{
			return -1;
		}
	}
	return n;
}

void Close(int fd)
{
	if (close(fd) == -1)
	{
		perr_exit("close error");
	}
}

ssize_t Readn(int fd, void* vptr, size_t n)
{
	size_t nleft;
	ssize_t nread;
	char* ptr;

	ptr = vptr;
	nleft = n;
	while (nleft > 0)
	{
		if ( (nread = read(fd, ptr, nleft)) < 0 )
		{
			if (errno == EINTR)
			{
				nread = 0;
			}
			else
			{
				return -1;
			}
		}
		else if (nread == 0)
		{
			break;
		}
		
		nleft -= nread;
		ptr += nread;
	}
}

ssize_t Writen(int fd, const void* vptr, size_t n)
{
	size_t nleft;
	ssize_t nwritten;
	const char* ptr;

	ptr = vptr;
	nleft = n;
	while (nleft > 0)
	{
		if ( (nwritten = write(fd, ptr, nleft)) <= 0)
		{
			if (nwritten < 0 && errno == EINTR)
			{
				nwritten = 0;
			}
			else
			{
				return -1;
			}
		}

		nleft -= nwritten;
		ptr += nwritten;
	}

	return n;
}

static ssize_t my_read(int fd, char* ptr)
{
	static int read_cnt;
	static char* read_ptr;
	static char read_buf[100];

	if (read_cnt <= 0)
	{
again:
		if ( (read_cnt = read(fd, read_buf, sizeof(read_buf))) < 0 )
		{
			if (errno == EINTR)
			{
				goto again;
			}
			return -1;
		}
		else if (read_cnt == 0)
		{
			return 0;
		}

		read_ptr = read_buf;
	}

	read_cnt--;
	*ptr = *read_ptr++;
	return 1;
}

ssize_t Readline(int fd, void* vptr, size_t maxlen)
{
	ssize_t n, rc;
	char c, *ptr;

	ptr = vptr;
	for (n = 1; n < maxlen; n++)
	{
		if ( (rc = my_read(fd, &c)) == 1)
		{
			*ptr++ = c;
			if (c == '\n')
			{
				break;
			}
		}
		else if (rc == 0)
		{
			*ptr = 0;
			return n - 1;
		}
		else
		{
			return -1;
		}

		*ptr = 0;
		return n;
	}
}

公共客户端代码

#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <netinet/in.h>
#include "wrap.h"

#define MAXLINE 80
#define SERV_PORT 8000

int main(int argc, char *argv[])
{
	struct sockaddr_in servaddr;
	char buf[MAXLINE];
	int sockfd, n;
	sockfd = Socket(AF_INET, SOCK_STREAM, 0);
	bzero(&servaddr, sizeof(servaddr));
	servaddr.sin_family = AF_INET;
	inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr);
	servaddr.sin_port = htons(SERV_PORT);
	Connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
	while (fgets(buf, MAXLINE, stdin) != NULL) {
		Write(sockfd, buf, strlen(buf));
		n = Read(sockfd, buf, MAXLINE);
		if (n == 0)
			printf("the other side has been closed.\n");
		else
			Write(STDOUT_FILENO, buf, n);
	}
	Close(sockfd);
	return 0;
}

select 模型服务端代码

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "wrap.h"

#define MAXLINE 100
#define SERV_PORT 8000

int main(int argc, char* argv[])
{
	int sock;
	sock = Socket(AF_INET, SOCK_STREAM, 0);
	
	struct sockaddr_in srvaddr;
	bzero(&srvaddr, sizeof(srvaddr));
	srvaddr.sin_family		= AF_INET;
	srvaddr.sin_addr.s_addr = htonl(INADDR_ANY);
	srvaddr.sin_port		= htons(SERV_PORT);
	Bind(sock, (struct sockaddr*)&srvaddr, sizeof(srvaddr));

	Listen(sock, 128);

	// 用于保存所有已经连接过的客户端文件描述符
	int i;
	int client[FD_SETSIZE];
	for (i = 0; i < FD_SETSIZE; i++)
	{
		client[i] = -1;
	}

	// 初始化当前最大的文件描述符变量
	int max_fd = sock;
	// 当前已经有多少客户端连接
	int max_idx = -1;

	int ret;
	fd_set reset, allset;
	FD_ZERO(&allset);
	FD_SET(sock, &allset);

	socklen_t cnt_len;
	struct sockaddr_in cnt_addr;
	int conn;
	char ip[INET_ADDRSTRLEN];
	int port;
	int read_ret;
	char buf[MAXLINE];

	for ( ; ; )
	{
		reset = allset;
		ret = select(max_fd + 1, &reset, NULL, NULL, NULL);
		if (ret == 0)
		{
			perr_exit("select erro");
		}

		// 如果 sock 文件描述符有数据到来
		if (FD_ISSET(sock, &reset))
		{
			cnt_len = sizeof(cnt_addr);
			conn = Accept(sock, (struct sockaddr*)&cnt_addr, &cnt_len);
			inet_ntop(AF_INET, &cnt_addr.sin_addr, ip, sizeof(ip));
			port = ntohs(cnt_addr.sin_port);

			printf("received from %s at port %d\n", ip, port);

			// 把新的文件描述符添加到数组里面
			for (i = 0; i < FD_SETSIZE; i++)
			{
				if (client[i] < 0)
				{
					client[i] = conn;
					break;
				}
			}

			// 判断是否达到了文件描述符上限
			if (i == FD_SETSIZE)
			{
				fputs("too many clients\n", stderr);
				exit(1);
			}

			// 将新来的连接文件描述符添加到监控列表中
			FD_SET(conn, &allset);
			// 更新最大的文件描述符数量,提供select第一个参数使用
			if (conn > max_fd)
			{
				max_fd = conn;
			}
			// 更新当前已经连接的客户端数量,后面for遍历使用
			if (i > max_idx)
			{
				max_idx = i;
			}
			// 如果select返回值-1==0证明已经没有需要处理的数据了
			if (--ret == 0)
			{
				continue;
			}
		}

		// 处理非 acctep 接收的数据
		for (i = 0; i <= max_idx; i++)
		{
			if (client[i] < 0)
			{
				continue;
			}
			if (FD_ISSET(client[i], &reset))
			{
				if ( (read_ret = Read(client[i], buf, MAXLINE)) == 0 )
				{
					Close(client[i]);
					FD_CLR(client[i], &reset);
					client[i] = -1;
				}
				else
				{
					int j;
					for (j = 0; j < read_ret; j++)
					{
						buf[j] = toupper(buf[j]);
					}
					Write(client[i], buf, read_ret);
				}

				if (--ret == 0);
				{
					break;
				}
			}
		}
	}
	return 0;
}

编译运行测试 select 模型代码

编译客户端:gcc client.c wrap.c -o client

编译服务端:gcc select_server.c wrap.c -o select_server

运行结果:

2015-07-11 11:30:48

poll 模型服务端代码和说明

poll 模型较 select 模型的优势:

  • poll 在调用时,传入事件和传出事件是分开的,而 select 是在一个参数中实现的传入传出。
  • select 模型被限制为最大可连接1024个客户端,这是内核代码写死的,二 poll 则真正的是根据系统 open file 的数量来决定可以连接多少客户端的
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <poll.h>
#include <errno.h>
#include "wrap.h"

#define MAXLINE 100
#define SERV_PORT 8000
#define OPEN_MAX 50000

int main(int argc, char* argv[])
{
	int sock;
	sock = Socket(AF_INET, SOCK_STREAM, 0);

	struct sockaddr_in srvaddr;
	memset(&srvaddr, 0, sizeof(srvaddr));
	srvaddr.sin_family = AF_INET;
	srvaddr.sin_addr.s_addr = htonl(INADDR_ANY);
	srvaddr.sin_port = htons(SERV_PORT);
	socklen_t socklen = sizeof(srvaddr);

	Bind(sock, (struct sockaddr*)&srvaddr, socklen);
	Listen(sock, 128);

	struct pollfd client[OPEN_MAX];
	client[0].fd = sock;
	client[0].events = POLLRDNORM;

	int i;
	// 把整个数组中的文件描述符都初始化为-1
	for (i = 1; i < OPEN_MAX; i++)
	{
		client[i].fd = -1;
	}

	int max_idx = 0;
	int ret, read_ret;
	int conn;
	struct sockaddr_in cnt_addr;
	int cnt_len = sizeof(cnt_addr);
	char ip[INET_ADDRSTRLEN];
	int port;
	char buf[MAXLINE];

	while (1)
	{
		// 第一个参数是初始化过的数组
		// 第二个参数是当前监控的最大的文件描述符数量
		// 第三个参数是设定超时时间,-1为一直等待
		// 当 poll 返回时一定是有某一个文件描述符出现了被监控的事件
		ret = poll(client, max_idx + 1, -1);

		// 判断是否是有新的客户端连接了
		if (client[0].revents & POLLRDNORM)
		{
			conn = Accept(sock, (struct sockaddr*)&cnt_addr, &cnt_len);
			inet_ntop(conn, &cnt_addr, ip, sizeof(ip));
			port = ntohs(cnt_addr.sin_port);
			printf("received from %s at port %d\n", ip, port);

			// 将新来的连接文件描述符添加到数组中
			for (i = 1; i < OPEN_MAX; i++)
			{
				if (client[i].fd < 0)
				{
					client[i].fd = conn;
					break;
				}
			}

			// 判断是否超过了最大的连接数量
			if (i == OPEN_MAX)
			{
				perr_exit("too many clients");
			}

			// 监听新来的连接
			client[i].events = POLLRDNORM;

			// 扩大数组下标的数量
			if (i > max_idx)
			{
				max_idx = i;
			}

			if (--ret <= 0)
			{
				continue;
			}
		}

		for (i = 1; i <= max_idx; i++)
		{
			if (client[i].fd < 0)
			{
				continue;
			}
			if (client[i].revents & (POLLRDNORM | POLLERR))
			{
				if ( (read_ret = Read(client[i].fd, buf, MAXLINE)) < 0 )
				{
					if (errno == ECONNRESET)
					{
						printf("client[%d] aborted connection\n", i);
						Close(client[i].fd);
						client[i].fd = -1;
					}
					else
					{
						perr_exit("read error");
					}
				}
				else if (read_ret == 0)
				{
					printf("client[%d] closed connection\n", i);
					Close(client[i].fd);
					client[i].fd = -1;
				}
				else
				{
					int j;
					for (j = 0; j < read_ret; j++)
					{
						buf[j] = toupper(buf[j]);
					}
					Write(client[i].fd, buf, read_ret);
				}

				if (--ret <= 0)
				{
					break;
				}
			}
		}
	}

	return 0;
}

编译运行测试 poll 模型代码

编译客户端:gcc client.c wrap.c -o client

编译服务端:gcc poll_server.c wrap.c -o poll_server

运行效果:

2015-07-11 11:39:18

说说你的想法