IO多路复用,是指复用线程/goroutine对多个套接字批量进行polling。对此,linux提供的系统调用有select、poll和epoll,它们引入的时间线如下:
(每个连接分配一个线程)
select: 1983年左右, 在4.2BSD首次引入;
poll: 1998年左右, 在Linux kernel 2.1.23首次引入;
epoll: 2002年左右, 在Linux kernel 2.5.44首次引入;
这三个系统调用在引入时,相对于前者的优化还是比较明显的:
select首次支持IO多路复用,但最多只能监听1024个套接字;
poll突破了1024的限制,但每次polling都要传入要监听的套接字数组,返回结果包含所有要监听的套接字,需要逐个遍历、检查才能知道哪个有事件发生;
epoll在内核态用红黑树维护要监听的套接字及其状态,只返回有事件发生的套接字,同时优化了polling时用户态和内核态交换的数据量,因为polling不需要传入要监听的套接字数组。
注意:这些系统调用也支持监听普通的文件描述符(打开的文件句柄),但这里我们先不关注这一点。
从操作系统的视角来看,套接字上有数据可读/可写的事件发生是一个中断信号。被中断信号唤醒以后,将其翻译出来,更新监听的套接字数组(select/poll),或将事件添加到已就绪套接字列表(epoll)。
理清概念以后,我们看epoll的用法。
要使用epoll,首先需要创建一个epoll实例,对应系统调用epoll_create:
#include <sys/epoll.h>int epoll_create(int size);
int epoll_create1(int flags);
一般情况下我们用更新的epoll_create1,暂且不管flags的值。如果使用面向对象的思维,epoll实例在逻辑上包含下面4个结构:
interest list: 需要监听的套接字,实际上用红黑树结构存储 struct rb_root_cached rbr;
ready list: 有IO事件发生的套接字,用链表存储 struct list_head rdllist;
wait queue: 它与ep_poll_callback回调函数绑定。当wait queue里的fd有数据ready时,中断信号会唤醒这个对象
file: epoll文件描述符信息
每个epoll实例都对应内核中的一个eventpoll对象,它结构如下:
struct eventpoll {
/*
* This mutex is used to ensure that files are not removed
* while epoll is using them. This is held during the event
* collection loop, the file cleanup path, the epoll file exit
* code and the ctl operations.
*/
struct mutex mtx;/* Wait queue used by sys_epoll_wait() */
wait_queue_head_t wq;/* Wait queue used by file->poll() */
wait_queue_head_t poll_wait;/* List of ready file descriptors */
struct list_head rdllist;/* Lock which protects rdllist and ovflist */
rwlock_t lock;/* RB tree root used to store monitored fd structs */
struct rb_root_cached rbr;/*
* This is a single linked list that chains all the "struct epitem" that
* happened while transferring ready events to userspace w/out
* holding ->lock.
*/
struct epitem *ovflist;/* wakeup_source used when ep_scan_ready_list is running */
struct wakeup_source *ws;/* The user that created the eventpoll descriptor */
struct user_struct *user;struct file *file;
// ... 省略部分代码
};
获取epoll实例以后,可以通过epoll_ctl添加套接字到interest list:
#include <sys/epoll.h>int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
这里epfd是epoll实例的文件描述符,fd是要监听的套接字的文件描述符。epoll_ctl 对应的内核代码是:
int do_epoll_ctl(int epfd, int op, int fd, struct epoll_event *epds,
bool nonblock)
{
int error;
int full_check = 0;
struct fd f, tf;
struct eventpoll *ep;
struct epitem *epi;
struct eventpoll *tep = NULL;
// ... 省略后面的代码
它的运行逻辑是:
根据 epfd 获取对应的struct eventpoll实例
根据 fd 获取对应的struct fd实例,生成对应的struct epitem实例
通过 ep_find 判断fd是否已经被监听
如果fd没有被监听并且op=EPOLL_CTL_ADD,则通过 ep_insert 写入interest list
第三步是通过epoll_wait进行polling:
它对应的内核调用是 do_epoll_wait,核心逻辑代理给 ep_poll 函数,ep_poll 做的事情是: 从 ready list 读取事件,并拷贝到用户态的数组里。如果没有事件,则休眠当前的线程等待下一次被唤醒。
与对 select/poll 的封装类似,Go语言中的 epoll_create1、epoll_ctl、epoll_wait 的代码都是生成的。数据结构都是用 unsafe方法强转:
// 位置: syscall/zsyscall_linux_amd64.go
func EpollCreate1(flag int) (fd int, err error) {
r0, _, e1 := RawSyscall(SYS_EPOLL_CREATE1, uintptr(flag), 0, 0)
fd = int(r0)
if e1 != 0 {
err = errnoErr(e1)
}
return
}func EpollCtl(epfd int, op int, fd int, event *EpollEvent) (err error) {
_, _, e1 := RawSyscall6(SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0)
if e1 != 0 {
err = errnoErr(e1)
}
return
}
不过,比较奇怪的是,在 runtime/netpoll_epoll.go 里,并没有使用syscall下的函数,而是自己声明一波:
// 位置: runtimie/netpoll_epoll.gofunc epollcreate(size int32) int32
func epollcreate1(flags int32) int32
对应的实现是汇编写的:
// 位置: runtime/sys_linux_amd64.s// int32 runtime·epollcreate(int32 size);
TEXT runtime·epollcreate(SB),NOSPLIT,$0
MOVL size+0(FP), DI
MOVL $SYS_epoll_create, AX
SYSCALL
MOVL AX, ret+8(FP)
RET
// int32 runtime·epollcreate1(int32 flags);
TEXT runtime·epollcreate1(SB),NOSPLIT,$0
MOVL flags+0(FP), DI
MOVL $SYS_epoll_create1, AX
SYSCALL
MOVL AX, ret+8(FP)
RET
这么丑的实现,仔细对比一下: 一个是直接用汇编实现epollcreate1,一个是调用RawSyscall,而RawsSyscall本身是汇编实现的,这块效率能提高多少呢?哪位大佬了解的话,帮忙解答一下。
第一步: 创建监听套接字sockfd
// 创建套接字
sockfd, err := syscall.Socket(family, sotype, 0)syscall.CloseOnExec(sockfd)
// epoll edge-triggered 模式支持nonblock
syscall.SetNonblock(sockfd, true)// 接收到Ctrl+C信号后,关闭socket
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
// signal channel 监听
}()// bind
addr, err := ipToSockaddrInet4(serverip, serverport)
err := syscall.Bind(sockfd, &addr)// listen
err := syscall.Listen(sockfd, listenBacklog)
第二步: 创建epoll实例,讲监听套接字sockfd注册到epoll实例
epfd, err := syscall.EpollCreate1(0)
// 默认是 level-triggered,效率更高的poll
epEvent := syscall.EpollEvent{
Fd: int32(sockfd),
Events: uint32(syscall.EPOLLIN) | uint32(-syscall.EPOLLET),
}err := syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, sockfd, &epEvent)
epoll 支持两种模式监听事件,分别是:
edge-triggered (边缘触发): 事件到达以后,只会触发一次(即便该事件的数据一次没有处理完);
level-triggered (水平触发): 事件到达以后,如果数据没有处理完,epoll_wait会持续收到事件;
边缘触发适合nonblock的套接字,不过在read/write时,需要保证套接字上的数据被处理完。
第三步: 在for循环里epoll_wait批量监听套接字。这部分代码有三个点先提一下:
监听套接字和已连接套接字都是nonblock模式,所以syscall.EAGAIN错误需要被特殊处理
采用了边缘触发模式
accept/read的操作都在一个for循环里,保证收到数据后,数据被消费完
// events用来接收有epoll事件,必须提前分配好内存
events := make([]syscall.EpollEvent, 128, 128)
var buf [32 * 1024]bytefor {
// msec < 0, EpollWait 会被阻塞直到有一个 fd 可用
nReady, err := syscall.EpollWait(epfd, events, -1)
if err != nil {
log.Printf("epoll_wait error=%v\n", err)
panic(fmt.Errorf("epoll_wait error=%v", err))
}for i := 0; i < nReady; i++ {
ev := &events[i]
if isError(ev.Events) {
/* An error has occured on this fd, or the socket is not
ready for reading (why were we notified then?) */
log.Printf("epoll error: %s\n", err)
// 取消监听
_ = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_DEL, int(ev.Fd), ev)
_ = syscall.Close(int(events[i].Fd))
continue
}if ev.Fd == int32(sockfd) {
// 监听套接字sockfd(server端套接字
// 处理新创建的tcp connection
for {
clientfd, _, err := syscall.Accept(sockfd)
if err == syscall.EAGAIN {
// 所有新创建的tcp conn均已被处理
break
}
// 设置为nonblock
if err := syscall.SetNonblock(clientfd, true); err != nil {
log.Printf("fails to set client socket %v as nonblock, err=%s\n", clientfd, err)
continue
}
epEvent.Fd = int32(clientfd)
epEvent.Events = uint32(syscall.EPOLLIN) | uint32(-syscall.EPOLLET)
_ = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, clientfd, &epEvent)
}
} else {
// 已连接套接字 tcp conn
for {
nRead, err := syscall.Read(int(ev.Fd), buf[:])
if err == syscall.EAGAIN {
// 数据已经读完了
break
} else if err != nil {
// 非syscall.EAGAIN错误
_ = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_DEL, int(ev.Fd), ev)
_ = syscall.Close(int(ev.Fd))
break
} else if nRead == 0 { // EOF
// Client closed
_ = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_DEL, int(ev.Fd), ev)
_ = syscall.Close(int(ev.Fd))
break
} else {
syscall.Write(int(ev.Fd), buf[:nRead])
}
}
}
}
}
在 epoll(7) — Linux manual page 里有一个epoll的例子,它的监听套接字(LISTEN socket)是阻塞模式,所以默认采用了水平触发;但并不妨碍通过边缘触发模式注册新的已连接套接字(ESTABLISHED socket, tcp conn)。
目前市面上最流行的异步IO库是libuv和libev。libuv支撑了NodeJS等框架中EventLoop的实现,libev只是出现比较早。然而Redis并没有采用这两个方案,而是自己实现了一个。对EventLoop感兴趣的同学,可以去看下对应的源代码。
关于epoll的介绍就先到这里。如果需要查看epoll实现的echo server代码,点击左下角的“查看原文”。下一篇文章,我们探讨下Go语言runtime下的netpoll,看它如何结合了epoll和goroutine,实现高性能的网络IO。
推荐阅读