tcp connection 或已连接套接字(Established socket),可以理解为一个逻辑上的双向通道,分别支持读写。不过在读通道上,数据包的读操作一般都是串行的;写通道上,数据包的写入也是串行的。对于net/http库实现的tcp server而言,每次有一个新的客户端connect,server端都会获取一个已连接套接字(等价于一个有效的tcp conn,后面对这两个概念不做区分),为其分配一个独立的goroutine,串行地读取request、处理request并写入response。
goroutine在已连接的tcp conn上读取请求时,在阻塞模式下,会等待有数据时真正开始读套接字缓冲区;非阻塞模式下,需要通过polling机制休眠当前goroutine,直到数据到来后被唤醒,然后开始读套接字缓冲区。
Server端套接字的监听运行在一个独立的goroutine里,如果同时有100个tcp conn,那么就会创建100个goroutine分别去处理conn上的请求。
那么问题来了:如果有1000个tcp conn,就需要1000个goroutine。那么问题来了,用1个行不行?
答案是可以,I/O多路复用的功能就是支持同时检查N个tcp conn,并在任何一个有数据可读时返回。
为了充分说明其区别,首先回顾下5种I/O模型工作方式的图:
这里复用上篇文章提到的概念,讲数据读取流程分为两个阶段:
第一阶段:read数据可用/write缓冲区可用之前,等待的过程
第二阶段:read数据可用/write缓冲区可用之后,数据拷贝的过程
在阻塞式I/O和非阻塞式I/O中,一个tcp conn上的两个阶段是由一个goroutine来处理;
I/O多路复用模型下,一个goroutine就可以支持批量等待多个tcp conn上数据可读的信号。简单的处理方式是,在这个goroutine里遍历所有可用的tcp conn,逐个从socket读取数据、解析数据成request、逻辑处理生成response、向socket写response。
Linux下提供了select、poll、epoll这三类系统调用支持批量等待数据可读信号。他们通用的逻辑有:
需要定义一组要监听的套接字和要监听的事件;
定义polling的timeout值:0值表示不等待,大于0表示最长等待timeout时间;小于零或空指针表示永久等待;
返回值是有事件发生的socket数;
错误码被重置(同时返回值是-1);
函数返回后,接收参数均会被修改;
不同的地方在于:对于要监听的套接字和事件的定义方式不一样,对参数的改动方式也不一样。
select函数的声明如下:
int select(
int nfds,
fd_set *readfds,
fd_set *writefds,
fd_set *errorfds,
struct timeval *timeout);
其中 nfds 指定了监听的套接字数,readfds/writefds/errorfds分别指定了要监听的读/写/异常的套接字集,timeout指定了最长等待时间。
这里值得重点关注的是 struct fd_set,逻辑上它是一个长度为1024的bit数组,在实现过程中可以用长度为32的int32数组表示,也可以用长度为16的int64数组表示。考虑到big endian和little endian的影响,每个操作系统里在不同的硬件架构下采用不同的表示方式。linux下的一个实现是:
typedef struct {
uint32_t fd32[(FD_SETSIZE + 31) / 32];
} fd_set;
FD_SETSIZE 通常是1024。由于已连接套接字的编号从0开始,依次递增;断开连接后,id会被释放出来。所以fd_set 可以支持监听1024个已连接套接字。
可以看到,select最多监听1024个套接字,而且每次调用都必须把三个fd_set(用户态)都传过去,拷贝到内核态进行处理,之后将更新结果再同步到用户态的fd_set。调用完成后,需要遍历fd_set,才能知道哪些套接字发生了改变。
poll函数的声明如下:
int poll(
struct pollfd *fds,
int nfds,
int timeout);struct pollfd {
int fd;
short int events;
short int revents;
};
同样的,nfds 指定了监听的套接字数,但具体哪些套接字上的哪些事件被监听没有按照信号拆分,而是按照套接字去拆分,表现为一个长度为nfds的pollfd数组,收到的事件也通过一个新字段revents来判断,而不是修改传入的字段。
这种表现方式的好处是,能监听的套接字不再受限于1024个,能定义的事件也不止read/write/error三个。poll支持很多类型的事件,并且支持了消息的优先级。每次进行polling时,仍然需要把要监听的所有套接字和事件信息(用户态)都传过去,拷贝到内核态处理,内核将更新结果再同步到用户态的pollfd数组。调用完成后,需要便利pollfd数组,才能知道哪些套接字发生了变化。
epoll针对select和poll的问题进行了优化,主要在于每次polling时,只需要传入一个epoll fd,而不是要监听的套接字集合。实现上包含三个系统调用:
// 创建一个epoll fd
int epoll_create1(int flags);// 增加/删除/更新监听的套接字
int epoll_ctl(
int epfd, // epoll fd
int op, // 操作:add/del/update
int fd, // 监听的套接字
struct epoll_event *event); // 监听哪些事件int epoll_wait(
int epfd, // epoll fd
struct epoll_event *events, // 有事件发生的fd,需要提前分配好内存
int maxevents, // events的长度
int timeout); // 超时事件,-1表示一直block
相对于select和poll,epoll模式下内核承担了维护套接字状态的任务,使用红黑树去实现O(logN)复杂度的查找、插入、删除和更新。用户态层面上,epoll拆分了三个系统调用,通过这种拆分,大大减少了epoll_wait时用户态和内核态之间的数据拷贝。
后面的部分,我们用select去实现echo server。
Go语言中的系统调用代码是通过命令生成的,对于 linux amd64 的代码存放在文件 zsyscall_linux_amd64.go 下,生成命令为:
// mksyscall.pl -tags linux,amd64 syscall_linux.go syscall_linux_amd64.go
之所以能这样做,是因为所有的指令本质上都是向linux系统发送的信号,不同的指令用不同的编号表示,通过函数Syscall或Syscall6向操作系统发送这些信号。以 Select 为例,内部调用是通过Syscall6发送 SYS_SELECT 信号。
// 位置: syscall/zsyscall_linux_amd64.go
func Select(nfd int, r *FdSet, w *FdSet, e *FdSet, timeout *Timeval) (n int, err error) {
r0, _, e1 := Syscall6(SYS_SELECT, uintptr(nfd), uintptr(unsafe.Pointer(r)), uintptr(unsafe.Pointer(w)), uintptr(unsafe.Pointer(e)), uintptr(unsafe.Pointer(timeout)), 0)
n = int(r0)
if e1 != 0 {
err = errnoErr(e1)
}
return
}// 位置: runtime/internal/syscall/syscall_linux.go,具体实现在汇编里
// Syscall6 calls system call number 'num' with arguments a1-6.
func Syscall6(num, a1, a2, a3, a4, a5, a6 uintptr) (r1, r2, errno uintptr)// 位置: runtime/internal/syscall/asm_linux_amd64.s
// func Syscall6(num, a1, a2, a3, a4, a5, a6 uintptr) (r1, r2, errno uintptr)
//
// Syscall # in AX, args in DI SI DX R10 R8 R9, return in AX DX.
//
// Note that this differs from "standard" ABI convention, which would pass 4th
// arg in CX, not R10.
TEXT ·Syscall6(SB),NOSPLIT,$0-80
MOVQ num+0(FP), AX // syscall entry
MOVQ a1+8(FP), DI
MOVQ a2+16(FP), SI
MOVQ a3+24(FP), DX
MOVQ a4+32(FP), R10
MOVQ a5+40(FP), R8
MOVQ a6+48(FP), R9
SYSCALL
// 省略部分代码
可以发现,除了指令ID,额外的6个参数类型都是uintptr,也就是说 *FdSet 和 *Timeval 被强转成C语言的指针,这要求这两个结构体和C语言里struct fd_set 和 struct timeval的内存布局也是一致的。
之前我们用BIO的模式实现了一个echo server,现在增加Select对这个服务进行改造。
第一步是net.Listener的创建流程是一样的,都是socket/bind/listen组合(省略错误处理逻辑):
var (
family = syscall.AF_INET
sotype = syscall.SOCK_STREAM
_ = "tcp"
listenBacklog = syscall.SOMAXCONN
serverip = net.IPv4(0, 0, 0, 0)
serverport = 8080
)sockfd, err := syscall.Socket(family, sotype, 0)
syscall.CloseOnExec(sockfd)
addr, err := ipToSockaddrInet4(serverip, serverport)
err := syscall.Bind(sockfd, &addr)
err := syscall.Listen(sockfd, listenBacklog)
其次是监听新的tcp conn,并处理tcp conn上的请求。
BIO模式下是for循环+Accept实现,然后创建一个新的goroutine处理新的tcp conn;
使用Select以后,使用for循环+Select+Accept/Read实现。监听套接字(Server端)和已连接套接字(新的tcp conn)都被存放到 readfds *syscall.FdSet,在Select看来没有本质区别。
值得注意的是,Select并不会Accept或Read套接字上的数据,只是监听信号。Select函数返回以后,对于监听套接字,我们通过syscall.Accept获取新的已连接套接字;对于已连接套接字,通过syscall.Read读取数据。下面是一个简单的代码实现:
var nfds = sockfd // sockfd是监听套接字
var fdSet syscall.FdSet
// 讲监听套接字加入read fdSet
fdsetutil.SetFdBit(sockfd, &fdSet)
// 已建立套接字存储在一个map里
clientFdMap := make(map[int]struct{}, 1024)for {
// select会修改这个值,所以拷贝一份fdSet
r := fdSet
// timeout = nil, Select 会被阻塞直到有一个 fd 可用
nReady, err := syscall.Select(nfds+1, &r, nil, nil, nil)
if err != nil {
panic("select error")
}// 处理监听套接字
if fdsetutil.IsSetFdBit(sockfd, &r) {
clientSockfd, clientSockAddr, err := syscall.Accept(sockfd)
if err != nil {
log.Printf("accept sockfd %d error=%v\n", sockfd, err)
continue
}
clientSockAddrInet4 := clientSockAddr.(*syscall.SockaddrInet4)
log.Printf("Connected with new client, sock addr = %v:%d\n", clientSockAddrInet4.Addr, clientSockAddrInet4.Port)
clientFdMap[clientSockfd] = struct{}{}
fdsetutil.SetFdBit(clientSockfd, &fdSet)
if clientSockfd > nfds {
nfds = clientSockfd
}
}// 处理已连接套接字
for clientSockFd := range clientFdMap {
if fdsetutil.IsSetFdBit(clientSockFd, &r) {
var buf [32 * 1024]byte
nRead, err := syscall.Read(clientSockFd, buf[:])
if err != nil {
log.Printf("fails to read data from sockfd %d, err=%v\n", clientSockFd, err)
_ = syscall.Close(clientSockFd)
fdsetutil.ClearFdBit(clientSockFd, &fdSet)
delete(clientFdMap, clientSockFd)
} else if nRead == 0 {
// Client closed
log.Printf("client sock %d closed\n", clientSockFd)
_ = syscall.Close(clientSockFd)
fdsetutil.ClearFdBit(clientSockFd, &fdSet)
delete(clientFdMap, clientSockFd)
} else {
log.Printf("read %d bytes from sock %d\n", nRead, clientSockFd)
if _, err := syscall.Write(clientSockFd, buf[:nRead]); err != nil {
log.Printf("fails to write data %s into sockfd %d, err=%v\n", buf[:nRead], sockfd, err)
}
}
}
}
}
在Go语言里,Linux amd64下syscall.FdSet的定义是:
// 位置: syscall/ztypes_linux_amd64.go
type FdSet struct {
Bits [16]int64
}
我们实现fdsetutil库实现FdSet的读写,对应C语言里的宏定义 FD_CLR, FD_COPY, FD_ISSET, FD_SET, FD_ZERO。
点击左下角“查看原文”阅读这段代码完整的版本(代码在gist上,如果网络不好需要多试几次)。
通过Select改造以后,不再对每个新的tcp conn创建goroutine。结果是polling的效率提高了,不过从套接字读取数据、数据处理、向套接字写数据这三个计算过程都落到了一个goroutine上。如果连接数过多或处理逻辑比较耗时,并不能发挥多核的优势。比如下面这两种常见的情形:
处理逻辑包含大量的rpc调用时,当前的goroutine可能会被休眠而不能去处理其他请求;
计算逻辑比较耗时,单个M(GMP里的M)一直在忙没空处理其他tcp conn上的请求;
对于第一种情况,在网络IO场景下,runtime对对goroutine的调度优化完全无法发挥出来;
对于第二种情况,多核CPU的计算优势发挥不出来;
从性能层面上看,Select改造似乎并没有多少优势,针对网络IO密集型的服务,性能可能还不如Go语言采用的BIO模式。
但从历史上来看,网络IO复用是Blocking IO的迭代,性能上肯定会有所提升。那么问题出在哪里呢?
我们不妨跳出Go语言,回到Java/Python等更早期的语言。其中一个差别是在多线程的支持上,Go语言有Goroutine,依仗runtime的GMP模型进行调度;Java依赖操作系统的线程;Python是伪多线程。
在Java里,tcp conn上的数据处理可以交给线程池,Go里面对应的是Goroutine池。Goroutine池化以后的性能相对于线程池,优势可能没那么明显,这就回到了GMP经典面试题:操作系统线程和Goroutine有什么区别?Goroutine是如何实现的?是否有必要池化?
几乎每个人都能答出来:Goroutine更轻量级;Goroutine运行在用户态,线程同时存在用户态和内核态(Linux下);每个P都有一个本地Goroutine队列,所有P共享一个全局Goroutine队列;M数量受限于CPU核数,Goroutine数量却不受此限制;Goroutine处理网络IO时,被休眠和唤醒的成本比较低,poll_runtime_pollWait依赖epoll对所有套接字统一进行polling等等。联想有点远了,后面专门聊聊这个话题。
简单来说,Go runtime和net库已经考虑这些问题,Goroutine的调度优势+基于epoll的netpoller带来的性能优势,既能避免网络IO只占用少量的CPU资源,又能保证其他CPU资源被充分利用,比常规的IO多路复用性能更佳(如果有大量的性能优化,结论可能会有所不同)。
这篇文章聊聊了IO多路复用的基本概念,并使用Select对echo server进行简单的改造。下一篇文章我们继续聊一聊IO多路复用,重点放到epoll上。
推荐阅读