一. 通过httpServer服务端引用Accept
- 参考go 进阶 http标准库相关: 三. HttpServer 服务启动到Accept等待接收连接
二. Listener.Accept 等待连接
- Listener.Accept方法最终会调用到netFD的accept方法
- netFD下的accept()内部
- 首先会调用到FD的Accept接收新的 socket 连接,并返回新的socket对应的fd,
- 然后调用newFD构造一个新的netfd,
- 并通过init 方法完成初始化: netFD.init() --> poll.FD.Init() --> poll.pollDesc.init(),比如在pollDesc.init()中
3.1 执行poll_runtime_pollServerInit:通过该函数最终调用到netpollinit(),封装一个epoll文件描述符实例epollevent,并且使用sync.Once封装保证程序中只会创建一个
3.2 执行poll_runtime_pollOpen: 调用alloc()初始化总大小约为 4KB的pollDesc结构体,调用netpollopen(),将可读,可写,对端断开,边缘触发 的监听事件注册到epollevent中
func (fd *netFD) accept() (netfd *netFD, err error) {
// 调用netfd.FD的Accept接受新的 socket 连接,返回 socket 的 fd
d, rsa, errcall, err := fd.pfd.Accept()
...
// 构造一个新的netfd
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
poll.CloseFunc(d)
return nil, err
}
// 调用 netFD 的 init 方法完成初始化
if err = netfd.init(); err != nil {
netfd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}
- FD.Accept方法会使用 linux 系统调用 accept 接收新连接,创建对应的 socket
- 开启了一个无限for循环, 循环内会执行系统的比如linux的accept等待接收用户请求,创建socket
- 注意,在调用系统的accep()函数前,会执行一个Accept4Func()将当前fd设置为非阻塞的,所以accept方法会直接返回,
- accept()返回的 err 为nil 则表示正常建立新连接,不为nil,则判断err如果等于 syscall.EAGAIN当前 Socket 没有准备好并且"fd.pd.pollable() == true"当前的文件描述符是可轮询的,进入 pollDesc.waitRead 方法
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
...
for {
// 使用 linux 系统调用 accept 接收新连接,创建对应的 socket
s, rsa, errcall, err := accept(fd.Sysfd)
//因为listener fd在创建时已经设置成了非阻塞
//所以accept方法会直接返回,不管有没有新连接到来;如果 err == nil 则表示正常建立新连接,直接返回
if err == nil {
return s, rsa, "", err
}
//如果 err != nil,则判断 err == syscall.EAGAIN,符合条件则进入 pollDesc.waitRead 方法
switch err {
case syscall.EINTR:
continue
case syscall.EAGAIN: //syscall.EAGAIN 表示当前 Socket 没有准备好
if fd.pd.pollable() { //为true表示当前文件描述符是可轮询的
// 如果当前没有发生期待的 I/O 事件,那么 waitRead 会通过 park goroutine 让逻辑 block 在这里
//挂起,直到可读就会被唤醒
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
continue
}
return -1, nil, errcall, err
}
}
// 使用 linux 的 accept 系统调用接收新连接并把这个 socket fd 设置成非阻塞 I/O
ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
- 在pollDesc的 waitRead方法中会执行一个runtime_pollWait()函数(这个函数实际执行的是poll_runtime_pollWait())
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
//runtime_pollWait 实际上就是 src/runtime/netpoll.go 中的poll_runtime_pollWait 函数
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
- poll_runtime_pollWait 方法会检查当前 pollDesc 对象的 netFD 对应的文件描述符是否有 I/O 事件发生
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
errcode := netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// As for now only Solaris, illumos, and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
}
//通过for循环调用netpollblock()检查pollDesc 对象的 rg/wg 信号量
//这里的 for 循环是为了一直等到io事件
for !netpollblock(pd, int32(mode), false) {
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
}
return pollNoError
}
- netpollblock 函数检查 pollDesc 对象的 rg/wg 信号量是否处于 pdReady 就绪状态,如果不是将netpollblockcommit()传递给gopark(),执行gopark()把对应的g保存到pollDesc 的 rg或wg 当中挂起, 并持续等待 I/O 事件发生,也就是如果当前socket没有处于就绪状态,会把goroutinue park住,使其不使用cpu资源进行空转之类的操作
- 根据mode获取对应的信号量地址 gpp,判断当前是否pdReady。
- 判断当gpp的值如果等于 0 时,将gpp的值更替为pdWait,该操作属于原子操作且内部实现了自旋锁。
- 当值为pdWait之后,防止此时可能会有其他的并发操作修改 pd 里的内容,所以需要再次检查错误状态。gopark将当前 goroutine 置于等待状态并等待下一次的调度,但gopark仍有可能因为超时或者关闭会立即返回
- 通过原子操作将gpp的值设置为 0,返回修改前的值并判断是否pdReady。
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// set the gpp semaphore to pdWait
for {
//根据mode获取对应的信号量地址 gpp,判断当前是否pdReady
old := *gpp
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("runtime: double wait")
}
//这段代码的逻辑是当gpp的值如果等于 0 时,将gpp的值更替为pdWait,该操作属于原子操作且内部实现了自旋锁
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}
//当值为pdWait之后,防止此时可能会有其他的并发操作修改 pd 里的内容,
//所以需要再次检查错误状态。gopark将当前 goroutine 置于等待状态并等待下一次的调度,
//但gopark仍有可能因为超时或者关闭会立即返回
if waitio || netpollcheckerr(pd, mode) == 0 {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// 通过原子操作将gpp的值设置为 0,返回修改前的值并判断是否pdReady
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
if r {
atomic.Xadd(&netpollWaiters, 1)
}
return r
}
- 查看 gopark()源码,实现了挂起协程的效果,实际是基于GMP模型,在 Go 的运行时系统中,每个 goroutine 都有一个对应的 M和 P,当调用 gopark 函数时,它会将自己从 M 中移除,并标记为等待状态(下方有专门讲解)
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
// 获取当前的m
mp := acquirem()
// 当前的g
gp := mp.curg
// 获取当前g的状态
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
// 设置变量
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
// 释放m
releasem(mp)
// 切换到g0栈操作
// can't do anything that might move the G between Ms here.
mcall(park_m)
}
三. Conn.Read读数据/Conn.Write写数据
- 参考net/http,在接收到请求后,accept向下执行,会封装一个Conn结构体变量, Conn是一个接口,内部提供了读写数据的方法,在读写数据时最终会执行到此处
- 我们先看一下 Conn.Read 方法是如何实现的,原理其实和 Listener.Accept 是一样的,具体调用链: 首先调用 conn 的 netFD.Read ,然后内部再调用 poll.FD.Read ,最后使用 Linux 的系统调用 read: syscall.Read 完成数据读取,
- 查看FD.Read方法
- 执行系统上的syscall.Read(), 因为 socket 在被 listener accept 的时候设置成了非阻塞 I/O,这里不管有没有可读的数据同样也是直接返回,然后根据一个error判断是否获取到需要的事件,如果error不为空表示没有执行pollDesc.waitRead挂起当前协程
// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
//调用netFD下的read方法
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}
func (fd *netFD) Read(p []byte) (n int, err error) {
//调用FD下的Read方法
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError("read", err)
}
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if len(p) == 0 {
return 0, nil
}
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
// 尝试从该 socket 读取数据,因为 socket 在被 listener accept 的时候设置成
// 了非阻塞 I/O,所以这里同样也是直接返回,不管有没有可读的数据
n, err := syscall.Read(fd.Sysfd, p)
if err != nil {
n = 0
// err == syscall.EAGAIN 表示当前没有期待的 I/O 事件发生,也就是 socket 不可读
if err == syscall.EAGAIN && fd.pd.pollable() {
// 如果当前没有发生期待的 I/O 事件,那么 waitRead
// 会通过 park goroutine 让逻辑 block 在这里
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
// On MacOS we can see EINTR here if the user
// pressed ^Z. See issue #22838.
if runtime.GOOS == "darwin" && err == syscall.EINTR {
continue
}
}
err = fd.eofError(n, err)
return n, err
}
}
- conn.Write 和 conn.Read 的原理是一致的,它也是通过类似 pollDesc.waitRead 的 pollDesc.waitWrite 来 park 住 goroutine 直至期待的 I/O 事件发生才返回恢复执行
- 查看pollDesc.waitRead 内部调用了 poll.runtime_pollWait 也就是 runtime.poll_runtime_pollWait 来达成无 I/O 事件时 park 住 goroutine 的目的
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
err := netpollcheckerr(pd, int32(mode))
if err != pollNoError {
return err
}
// As for now only Solaris, illumos, and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
}
// 进入 netpollblock 并且判断是否有期待的 I/O 事件发生,
// 这里的 for 循环是为了一直等到 io ready
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
}
return 0
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// gpp 保存的是 goroutine 的数据结构 g,这里会根据 mode 的值决定是 rg 还是 wg,
// 前面提到过,rg 和 wg 是用来保存等待 I/O 就绪的 gorouine 的,后面调用 gopark 之后,
// 会把当前的 goroutine 的抽象数据结构 g 存入 gpp 这个指针,也就是 rg 或者 wg
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// 这个 for 循环是为了等待 io ready 或者 io wait
for {
old := *gpp
// gpp == pdReady 表示此时已有期待的 I/O 事件发生,
// 可以直接返回 unblock 当前 goroutine 并执行响应的 I/O 操作
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("runtime: double wait")
}
// 如果没有期待的 I/O 事件发生,则通过原子操作把 gpp 的值置为 pdWait 并退出 for 循环
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}
// waitio 此时是 false,netpollcheckerr 方法会检查当前 pollDesc 对应的 fd 是否是正常的,
// 通常来说 netpollcheckerr(pd, mode) == 0 是成立的,所以这里会执行 gopark
// 把当前 goroutine 给 park 住,直至对应的 fd 上发生可读/可写或者其他『期待的』I/O 事件为止,
// 然后 unpark 返回,在 gopark 内部会把当前 goroutine 的抽象数据结构 g 存入
// gpp(pollDesc.rg/pollDesc.wg) 指针里,以便在后面的 netpoll 函数取出 pollDesc 之后,
// 把 g 添加到链表里返回,接着重新调度 goroutine
if waitio || netpollcheckerr(pd, mode) == 0 {
// 注册 netpollblockcommit 回调给 gopark,在 gopark 内部会执行它,保存当前 goroutine 到 gpp
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent READY notification
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
- pollDesc.waitWrite 的内部实现原理和 pollDesc.waitRead 是一样的,都是基于 poll.runtime_pollWait --> runtime.poll_runtime_pollWait,这里就不再赘述
四. gopark() 阻塞
- 在accept()等待接收连接时,或者读写数据时,都会调用netpollblock()函数,函数内,检查 pollDesc 对象的 rg/wg 信号量是否处于 pdReady 就绪状态,如果不是,会将netpollblockcommit()传递给gopark(),执行gopark()把对应的g保存到pollDesc 的 rg或wg 当中实现挂起效果,(可以也要好好看一下netpollblock)
- gopark()有一个func类型的入参unlockf,在accept()等待接收连接或者读写数据时,传递的是netpollblockcommit()
- 查看 gopark()源码,基于GMP模型实现了挂起协程的效果,在 Go 的运行时系统中,每个 goroutine 都有一个对应的 M和 P,当一个 goroutine 调用 gopark 函数时,它会将自己从 M 中移除,并标记为等待状态
- 首先,通过 acquirem() 获取当前 M 和 G 。
- 然后,通过 releasem() 释放当前 M,这样当前的 goroutine 就不处于 M 的控制下,在调用 releasem() 之后,当前的 goroutine 依然占用着当前的 P,并尚未从 P 中移除,所以在下面还会调用park_m()将当前 goroutine 从 M 和 P 中移除
- 接着,该函数通过 mcall() 执行 park_m() ,在park_m()中首先会将当前g设置为_Gwaiting 等待运行状态 , 然后执行传递进来的netpollblockcommit() 尝试将当前协程g保存到 pollDesc 的 rg 或者 wg 指针里,如果失败返回false,会重新将协程标记为_Grunnable可运行状态等待下一次调度运行, 最终会执行" schedule()"进行调度执行其他可运行的 goroutine
// gopark 会停住当前的 goroutine 并且调用传递进来的回调函数 unlockf,
//从上面accept等待连接和读写数据的源码中我们可以知道当前这个函数是netpollblockcommit
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
// 检查是否有定时器到期,有则执行回调
if reason != waitReasonSleep {
checkTimeouts()
}
// 获得当前 M 和 G 的信息
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
// 检查当前 G 的状态是否可以进行 park 等待
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
// 将等待状态记录到当前 G 的状态里
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
// 释放当前 M(在调用 releasem() 之后,当前的 goroutine 依然占用着当前的 P,并尚未被从 P 中移除,
//所以在下面还会调用park_m()将当前 goroutine 从 M 和 P 中移除,并标记为等待状态_Gwaiting)
releasem(mp)
// 进行阻塞(切换到g0栈操作)
// gopark 最终会调用 park_m,在这个函数内部会调用 unlockf,也就是 netpollblockcommit,
// 然后会把当前的 goroutine,也就是 g 数据结构保存到 pollDesc 的 rg 或者 wg 指针里
mcall(park_m)
}
// park_m 暂停当前 goroutine 的执行,并从 M 和 P 中移除,标记为等待状态_Gwaiting
func park_m(gp *g) {
_g_ := getg()
if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}
// 将当前 goroutine 从 _Grunning 运行状态切换为 _Gwaiting 等待状态
casgstatus(gp, _Grunning, _Gwaiting)
dropg()
//判断是否存在 waitunlockf 函数
if fn := _g_.m.waitunlockf; fn != nil {
//存在则调用,实际就是调用上方注册的netpollblockcommit(),
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
//如果netpollblockcommit()返回false,说明当前g阻塞条件不满足,不能阻塞
//则表示等待结束,
//把当前的 goroutine, 也就是 g 数据结构保存到 pollDesc 的 rg 或者 wg 指针里
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
//重新将该 goroutine 标记为_Grunnable可运行状态,并在下一次调度时唤醒它。
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
// 进行调度,执行其他可运行的 goroutine。
schedule()
}
- netpollblockcommit() 函数内会通过cas也就是atomic.Casuintptr(),将当前协程g保存到 pollDesc 的 rg 或者 wg 指针里,进入阻塞状态并等待网络 i/o 事件的到来,如果通过atomic.Casuintptr()设置失败,说明 pollDesc 阻塞的 Goroutine 已经被其他线程唤醒或者当前 Goroutine 发生了异常,需要在下一次循环中重新尝试阻塞,此时函数会返回 false(注意我这里一直没看太懂,可能不太正确,慎重参考)
// netpollblockcommit 在 gopark 函数里被调用
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
// 通过原子操作把当前 goroutine 抽象的数据结构 g,也就是这里的参数 gp 存入 gpp 指针,
// 此时 gpp 的值是 pollDesc 的 rg 或者 wg 指针
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
if r {
//计数器累加,主要用于记录当前等待网络 i/o 事件的 Goroutine 数量,为网络轮询器的运行提供支持
atomic.Xadd(&netpollWaiters, 1)
}
return r
}
- netpollblockcommit()不同版本实现方式上可能有所区别,某些版本通过该函数好像还会把需要阻塞的协程g保存到 epoll 的 “interest list” 里,是个由红黑树实现的 eventpoll.rbr,此时 G 的状态由 _Grunning为_Gwaitting ,因此 G 必须被手动唤醒 协程是如何被唤醒的(注意我这里一直没看太懂,也没找到相关的文章,可能不太正确,慎重参考)
五. netpoll 唤醒等待队列中挂起的协程
- 在接收用户请求时会调用Accept()
- 创建初始化: 首先Accept中会创建pollDesc,创建FD文件描述符,调用一系列的init()进行初始化,并将关注的事件注册到epollevent中
- 因为fd在创建时已经设置成了非阻塞,所以accept方法会直接返回,accept()返回的 err 为nil 则表示正常建立新连接,不为nil,则判断err如果等于 syscall.EAGAIN当前 Socket 没有准备好并且"fd.pd.pollable() == true"当前的文件描述符是可轮询的,进入 pollDesc.waitRead 方法
- pollDesc 下的 waitRead 最终会调用到一个netpollblock 函数检查 pollDesc 对象的 rg/wg 信号量是否处于 pdReady 就绪状态,如果不是将netpollblockcommit()传递给gopark(),执行gopark()把对应的g保存到pollDesc 的 rg或wg 当中挂起, 并持续等待 I/O 事件发生,也就是如果当前socket没有处于就绪状态,会把goroutinue park住
- 接收到连接请求Accept()方法返回会拿到一个net.Conn连接,可以理解为拿到了一个基于TCP的HTTP连接,并将conn连接的状态标志为StateNew,Conn是一个接口,内部提供了:Read读取连接数据,Write发生数据,Close关闭连接等方法
- 查看Conn.Write 与 Conn.Read底层,原理大致相同,与accept也有点类似,以Read为例,底层会调用netFD的Read()方法—>FD的Read()方法,该
- 在Read方法中,也会判断是否有关注的事件发生,根据判断也会进入进入pollDesc.waitRead函数中,最终调用netpollblock()函数,检查 pollDesc 对象的 rg/wg 信号量是否处于 pdReady 就绪状态,如果不是,会将netpollblockcommit()传递给gopark(),执行gopark()把对应的g保存到pollDesc 的 rg或wg 当中挂起, 并持续等待 I/O 事件发生,
- 也就是在执行了Accept/Read/Write 操作后,如果没有关注的事件发生,会执行 gopark挂起goroutine,将当前协程保存到pollDesc 对象的 rg或wg属性上,某些版本好像会保存到 epoll 的 “interest list” 里,所以被阻塞的协程如何唤醒?
- 核心逻辑在runtime.netpoll函数中:
什么时候会调用 runtime.netpoll
- 在执行了Accept/Read/Write 操作后,如果没有关注的事件发生,会执行 gopark挂起goroutine,在gopark()函数执行到park_m(),该函数末尾会会调用一个scheduler()调度器函数
- 在启动go项目时底层执行runtime.rt0_go,该函数内会调用runtime.schedinit(SB) 初始化调度相关设置,调用runtime·mstart(SB)启动调度循环,在mstart()中也会调用schedule()开启调度
- runtime.schedule()是调度器运行的核心方法
- 执行getg()获取g0
- 执行sched.gcwaiting != 0,判断当前是不是正在GC,如果是则调用gcstopm休眠当前的M,挂起结束后继续执行
- 避免全局队列中的g被饿死,有一个if判断每隔61次调度轮回执行globrunqget(g.m.p.ptr(), 1)从全局队列找获取可执行g
- 如果上面获取不到,执行runqget()函数,在p.runnext获取g,如果获取不到则通过p的本地队列中获取,先获取队头的
- 如果没有获取到可运行的G,再调用 findrunnable()函数,从全局队列、epoll、别的 P 里获取获取g,找不到的话就将m休眠,等待唤醒
- 当找到一个g后,就会调用 execute 去执行g
- runtime.schedule()中会调用一个 runtime.findrunable() 方法,从全局队列、epoll、别的 P 里获取可运行的G,详细步骤:
- 调用 runqget():尝试从P本地队列中获取G,获取到返回
- 调用 globrunqget (): 尝试从全局队列中获取G,获取到返回
- 调用netpollinited()从网络IO轮询器中找到就绪的G,把这个G变为可运行的G
- 如果还是未获取到可运行的G, 会判断是否有其它非空闲的p,如果有,执行runqsteal()函数随机选一个P,尝试从这个P中allp[enum.position()]中偷去一半的G,注意有一个遍历,最多尝试四次
- 如果当前没有非空闲P,会判断当前M是否运行进入自旋状态,如果运行则进入自旋
- 未获取到可运行G,并且M没有进入自旋状态时跳入stop位置,判断此时P是否处于 GC mark 阶段,同时该 P 上存在一个后台 GC 标记工作的协程时,此时有待处理的标记任务,就会将该等待执行的标记协程转化成 runnable 状态,执行这个 Mark 任务
- 再次尝试从全局队列中,检查所有的P获取G,网络IO轮询器中获取可运行的G
- 还是获取不到会将P和M解绑,调用 stopm() 来休眠该M,那何时会重新唤醒该M呢?这就要看 wakep 函数了。一般来说新建一个goroutine或者有个goroutine准备好时,会调用 wakep 来唤醒M或者新建M
func schedule() {
...
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
}
...
}
func findrunnable() (gp *g, inheritTime bool) {
...
// Poll network.
if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
atomic.Store64(&sched.pollUntil, uint64(pollUntil))
if _g_.m.p != 0 {
throw("findrunnable: netpoll with p")
}
if _g_.m.spinning {
throw("findrunnable: netpoll with spinning")
}
if faketime != 0 {
// When using fake time, just poll.
delta = 0
}
list := netpoll(delta) // 同步阻塞调用 netpoll,直至有可用的 goroutine
atomic.Store64(&sched.pollUntil, 0)
atomic.Store64(&sched.lastpoll, uint64(nanotime()))
if faketime != 0 && list.empty() {
// Using fake time and nothing is ready; stop M.
// When all M's stop, checkdead will call timejump.
stopm()
goto top
}
lock(&sched.lock)
_p_ = pidleget() // 查找是否有空闲的 P 可以来就绪的 goroutine
unlock(&sched.lock)
if _p_ == nil {
injectglist(&list) // 如果当前没有空闲的 P,则把就绪的 goroutine 放入全局调度队列等待被执行
} else {
// 如果当前有空闲的 P,则 pop 出一个 g,返回给调度器去执行,
// 并通过调用 injectglist 把剩下的 g 放入全局调度队列或者当前 P 本地调度队列
acquirep(_p_)
if !list.empty() {
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
goto top
}
} else if pollUntil != 0 && netpollinited() {
pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
if pollerPollUntil == 0 || pollerPollUntil > pollUntil {
netpollBreak()
}
}
stopm()
goto top
}
- 另外go项目在运行时,会单独启动一个sysmon监控线程,用来做抢占式调度等操作, 在循环执行过程中检查距离上一次 runtime.netpoll 被调用是否超过了 10ms,若是则会去调用它拿到可运行的 goroutine 列表并通过调用 injectglist 把 g 列表放入全局调度队列或者当前 P 本地调度队列等待被执行
//go:nowritebarrierrec
func sysmon() {
...
// poll network if not polled for more than 10ms
lastpoll := int64(atomic.Load64(&sched.lastpoll))
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
list := netpoll(0) // non-blocking - returns list of goroutines
if !list.empty() {
incidlelocked(-1)
injectglist(&list)
incidlelocked(1)
}
}
...
}
runtime.netpoll 返回一组已经准备就绪的 Goroutine
- 查看 runtime.netpoll源码
- 创建size=128的epollevent数组
- 调用epollwait()从 epoll 的 eventpoll.rdllist双向列表中获取就绪的fd列表,到epollevent数组中,底层依赖epoll_wait系统调用函数
- 遍历epollevent数组也就是就绪的fd,调用netpollready,找到对应的goroutine,并将其状态从pdWait修改为pdReady
- 最后返回pdReady状态的 goroutine列表 gList
- 接下来将就绪的 goroutine 加入到调度队列中,等待调度运行
// netpoll检查是否有网络连接
//返回可运行的goroutine列表
// delay < 0:无限期阻塞
// delay == 0:不阻塞,只是轮询
// delay > 0:阻塞的时间不超过该纳秒
func netpoll(delay int64) gList {
if epfd == -1 {
return gList{}
}
// 根据特定的规则把 delay 值转换为 epollwait 的 timeout 值
var waitms int32
if delay < 0 {
waitms = -1
} else if delay == 0 {
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
//定时器等待时间的任意上限
// 1e9 ms == ~11.5天
waitms = 1e9
}
var events [128]epollevent
retry:
// 超时等待就绪的 fd 读写事件
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
//如果定时睡眠被打断,请返回到
//重新计算我们现在应该睡多久
if waitms > 0 {
return gList{}
}
goto retry
}
// toRun 是一个 g 的链表,存储要恢复的 goroutines,最后返回给调用方
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
// Go scheduler 在调用 findrunnable() 寻找 goroutine 去执行的时候,
// 在调用 netpoll 之时会检查当前是否有其他线程同步阻塞在 netpoll,
// 若是,则调用 netpollBreak 来唤醒那个线程,避免它长时间阻塞
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
if ev.events != _EPOLLIN {
println("runtime: netpoll: break fd ready for", ev.events)
throw("runtime: netpoll: break fd ready for something unexpected")
}
if delay != 0 {
// netpollBreak could be picked up by a
// nonblocking poll. Only read the byte
// if blocking.
var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
atomic.Store(&netpollWakeSig, 0)
}
continue
}
// 判断发生的事件类型,读类型或者写类型等,然后给 mode 复制相应的值,
// mode 用来决定从 pollDesc 里的 rg 还是 wg 里取出 goroutine
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
// 取出保存在 epollevent 里的 pollDesc
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
if ev.events == _EPOLLERR {
pd.everr = true
}
// 调用 netpollready,传入就绪 fd 的 pollDesc,
// 把 fd 对应的 goroutine 添加到链表 toRun 中
netpollready(&toRun, pd, mode)
}
}
//toRun是一个 g 的链表,存储要恢复的 goroutines
return toRun
}
// netpollready 调用 netpollunblock 返回就绪 fd 对应的 goroutine 的抽象数据结构 g
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
//获取对应的g的指针
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
//将对应的g加入到toRun列表中
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}
// netpollunblock 会依据传入的 mode 决定从 pollDesc 的 rg 或者 wg 取出当时 gopark 之时存入的
// goroutine 抽象数据结构 g 并返回
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
//mode == 'r' 代表当时 gopark 是为了等待读事件,而 mode == 'w' 则代表是等待写事件
gpp := &pd.rg
//根据传入的mode判断事件类型
if mode == 'w' {
gpp = &pd.wg
}
for {
// 取出 gpp 存储的 g
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
// Only set READY for ioready. runtime_pollWait
// will check for timeout/cancel before waiting.
return nil
}
var new uintptr
if ioready {
new = pdReady
}
// 重置 pollDesc 的 rg 或者 wg,将读或者写信号量转换成 pdReady
if atomic.Casuintptr(gpp, old, new) {
// 如果该 goroutine 还是必须等待,则返回 nil
if old == pdWait {
old = 0
}
// 返回对应的 g指针
return (*g)(unsafe.Pointer(old))
}
}
}
// netpollBreak 往通信管道里写入信号去唤醒 epollwait
func netpollBreak() {
// 通过 CAS 避免重复的唤醒信号被写入管道,
// 从而减少系统调用并节省一些系统资源
if atomic.Cas(&netpollWakeSig, 0, 1) {
for {
var b byte
n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
if n == 1 {
break
}
if n == -_EINTR {
continue
}
if n == -_EAGAIN {
return
}
println("runtime: netpollBreak write failed with", -n)
throw("runtime: netpollBreak write failed")
}
}
}
六. 总结
复习前面
- 在通过net/http编写服务端时, 首先调用NewServeMux()创建多路复用器,编写对外接收请求的接口函数也就是处理器,然后调用多路复用器上的HandleFunc()方法,将接口与接口路径进行绑定,注册路由, 最后调用ListenAndServe()函数在指定端口开启监听,启动服务
- "net/http"下的ListenAndServe()函数,在该函数中首先会封装一个Server结构体变量,调用Server的ListenAndServe()方法,该方法中
- " net.Listen(“tcp”, addr)": 多路复用相关初始化,初始化socket,端口连接绑定,开启监听
- “srv.Serve(ln)”: 等待接收客户端连接Accept(),与接收到连接后的处理流程
- 查看srv.Serve(ln)内部逻辑,该方法内可以重点简化为:
- 方法内开启了一个无限for循环
- 在循环内部,调用Listener的Accept()方法,假设当前是TCP连接调用的就是TCPListener下的Accept(),阻塞监听客户端连接,是阻塞的(该方法内部有多路复用的相关逻辑,此处先不关注)
- 当有接收到连接请求后Accept()方法返回,拿到一个新的net.Conn连接实例,继续向下执行,封装net.Conn连接,设置连接状态为StateNew
- 通过协程执行连接的serve()方法,每一个连接开启一个goroutine来处理
- 然后for循环继续执行等待下一个连接
- 最后当接收到连接请求Accept()方法返回会拿到一个net.Conn连接,可以理解为拿到了一个基于TCP的HTTP连接,并将conn连接的状态标志为StateNew,Conn是一个接口,内部提供了:Read读取连接数据,Write发生数据,Close关闭连接等方法,比较重要,底层就是通过这几个方法处理请求读写的
-
然后开启一个goroutine执行它的serve()方法,这里对每一个连接开启一个goroutine来处理,这就是并发的体现,查看conn上的serve(),该方法内重点执行了:
- 首先调用newBufioReader() 封装了一个bufio.Reader
- 开启了一个无限for循环,循环内
- 调用conn的readRequest(ctx)方法读取请求的内容,比如解析HTTP请求协议,读取请求头,请求参数,封装Request和response,在解析时会读取请求头的 Content-Length,不为 0会通过TCPConn.Read() 方法读取指定长度的数据并存入请求体中,如果 Content-Length 为 0 或者没有设置,则请求体为空
- 封装serverHandler调用serverHandler上的ServeHTTP(w, w.req)方法进行路由匹配,找到对应的处理函数,执行我们写的业务逻辑
- 调用response的finishRequest()方法进行最后处理工作,当底层 bufio.Writer 缓冲区的大小达到阈值或者Flush() 被显式调用时,就会将缓冲区内的数据写入到底层连接中,并触发 Conn 的 Write() 方法将数据发送到客户端,另外finishRequest()方法还会进行一些比如异常处理,资源回收,状态更新等操作
- 最后调用conn的setState()设置连接状态为StateIdle,方便后续重用连接
引出后面
1. Accept 总结
- 首先了解 Accept 要先了解FD, 了解pollDesc,与pollDesc内部的rg,wg(前面的文档中有接收,此处不再赘述)
- Listener.Accept()可以分:创建初始化,与调度运行两部分去理解
- 创建初始化: 首先Accept中会创建pollDesc,创建FD文件描述符,调用一系列的init()进行初始化,并将关注的事件注册到epollevent中
- 调度运行: 当接收到连接后accept继续向下执行,会通过netpollblock()检查是否有关注的事件发生,调用gopark()把对应的g保存到pollDesc 的 rg或wg 当中挂起,持续等待关注的事件发生
- Listener.Accept() 创建初始化详解: 首先Listener.Accept() 方法内会调用netFD下的accept(),该函数中
- 首先会调用到FD的Accept接收新的 socket 连接,函数内部
1.1 开启了一个无限for循环, 循环内会执行系统的比如linux的accept等待接收用户请求,创建socket
1.2 注意,在调用系统的accep()函数前,会执行一个Accept4Func()将当前fd设置为非阻塞的,所以accept方法会直接返回
1.3 accept()返回的 err 为nil 则表示正常建立新连接,不为nil,则判断err如果等于 syscall.EAGAIN表示当前 Socket 没有准备好并且"fd.pd.pollable() == true"当前的文件描述符是可轮询的,进入 pollDesc.waitRead 方法
1.4 pollDesc 下的 waitRead 中会判断是否有需要的事件发生,如果没有会通过 park goroutine 让逻辑 block 在这里,直到接收到需要的io事件为止
- 然后调用newFD()构造一个新的netfd,
- 调用一系列的init 方法完成初始化: netFD.init() --> poll.FD.Init() --> poll.pollDesc.init(),在pollDesc.init()中
3.1 执行poll_runtime_pollServerInit:通过该函数最终调用到netpollinit(),封装一个epoll文件描述符实例epollevent,并且使用sync.Once封装保证程序中只会创建一个
3.2 执行poll_runtime_pollOpen: 调用alloc()初始化总大小约为 4KB的pollDesc结构体,调用netpollopen(),将可读,可写,对端断开,边缘触发 的监听事件注册到epollevent中
- Listener.Accept() 调度运行详解,netFD下的accept()调用的FD的Accept, 该函数中调用对应系统的accept()接收新的socket, 注意这个accept()是非阻塞的,当accept返回后,会判断一个error,如果是nil表示连接成功,如果不是nil,判断err如果等于syscall.EAGAIN当前 Socket 没有准备好并且"fd.pd.pollable() == true"当前的文件描述符是可轮询的,进入 pollDesc.waitRead(),在waitRead函数中调用了wait函数:
- 判断当前的文件描述符是否支持等待,如果不支持则返回错误;否则调用runtime_pollWait。
- runtime_pollWait实际上就是调用poll_runtime_pollWait函数,poll_runtime_pollWait()中会调用netpollblock()函数,检查 pollDesc 对象的 rg/wg 信号量是否处于 pdReady 就绪状态,如果不是,会将netpollblockcommit()传递给gopark(),执行gopark()把对应的g保存到pollDesc 的 rg或wg 当中挂起, 并持续等待 I/O 事件发生,也就是如果当前socket没有处于就绪状态,会把goroutinue park住,使其不使用cpu资源进行空转之类的操作
- gopark 与 netpollblockcommit()下方有专门讲解此处不赘述
2. 数据的读写总结
- 前面了解到在通过net/http编写服务端时, 首先调用NewServeMux()创建多路复用器,编写对外接收请求的接口函数也就是处理器,然后调用多路复用器上的HandleFunc()方法,将接口与接口路径进行绑定,注册路由, 最后调用ListenAndServe()函数在指定端口开启监听,启动服务
- "net/http"下的ListenAndServe()函数,在该函数中首先会封装一个Server结构体变量,调用Server的ListenAndServe()方法,该方法中
- " net.Listen(“tcp”, addr)": 多路复用相关初始化,初始化socket,端口连接绑定,开启监听
- “srv.Serve(ln)”: 等待接收客户端连接Accept(),与接收到连接后的处理流程
- 查看srv.Serve(ln)内部逻辑,该方法内可以重点简化为:
- 方法内开启了一个无限for循环
- 在循环内部,调用Listener的Accept()方法,假设当前是TCP连接调用的就是TCPListener下的Accept(),阻塞监听客户端连接,是阻塞的(该方法内部有多路复用的相关逻辑,此处先不关注)
- 当接收到连接请求Accept()方法返回会拿到一个net.Conn连接,可以理解为拿到了一个基于TCP的HTTP连接,并将conn连接的状态标志为StateNew,Conn是一个接口,内部提供了:Read读取连接数据,Write发生数据,Close关闭连接等方法,比较重要,底层就是通过这几个方法处理请求读写的
- 然后开启一个goroutine执行conn的serve()方法
- 通过以上我们了解到接收到请求后Accept()函数返回,会封装一个Conn连接,并将conn连接的状态标志为StateNew,Conn是一个接口,内部提供了:Read读取连接数据,Write发生数据,Close关闭连接等方法,了解到了会开启协程执行Conn上的Server()方法处理用户请求,在这个处理用户请求的server方法中
- 首先调用newBufioReader() 封装了一个bufio.Reader
- 开启了一个无限for循环,循环内
- 调用conn的readRequest(ctx)方法读取请求的内容,比如解析HTTP请求协议,读取请求头,请求参数,封装Request和response,在解析时会读取请求头的 Content-Length,不为 0会通过TCPConn.Read() 方法读取指定长度的数据并存入请求体中,如果 Content-Length 为 0 或者没有设置,则请求体为空
- 封装serverHandler调用serverHandler上的ServeHTTP(w, w.req)方法进行路由匹配,找到对应的处理函数,执行我们写的业务逻辑
- 调用response的finishRequest()方法进行最后处理工作,当底层 bufio.Writer 缓冲区的大小达到阈值或者Flush() 被显式调用时,就会将缓冲区内的数据写入到底层连接中,并触发 Conn 的 Write() 方法将数据发送到客户端,另外finishRequest()方法还会进行一些比如异常处理,资源回收,状态更新等操作
- 最后调用conn的setState()设置连接状态为StateIdle,方便后续重用连接
- 进而我们了解到在Conn上的Server()方法执行处理用户请求时,该方法内部会间接的调用到Conn上的:Read或Write实现数据的读写,查看Conn.Write 与 Conn.Read底层,原理大致相同,与accept也有点类似,以Read为例,底层会调用netFD的Read()方法—>FD的Read()方法,该Read方法中
- 开启了一个for循环,
- 在fd创建时就设置成了非阻塞的,执行"syscall.Read(fd.Sysfd, p)"系统的Read函数尝试通过socket读取数据,如果Read函数返回的err为空表示数据读取成功
- 如果err不为空,判断err为syscall.EAGAIN未准备好,并且当前fd是可轮询的fd.pd.pollable()返回true,进入pollDesc.waitRead函数中
- pollDesc.waitRead 内部调用了 poll.runtime_pollWait 也就是 runtime.poll_runtime_pollWait(),内部:
- 先调用netpollcheckerr()检查当前是否有错误发生。如果发生了错误,则返回错误的代码(比如说 EAGAIN)。
- 调用netpollblock()函数,检查 pollDesc 对象的 rg/wg 信号量是否处于 pdReady 就绪状态,如果不是,会将netpollblockcommit()传递给gopark(),执行gopark()把对应的g保存到pollDesc 的 rg或wg 当中挂起, 并持续等待 I/O 事件发生,也就是如果当前socket没有处于就绪状态,会把goroutinue park住,使其不使用cpu资源进行空转之类的操作
3. 阻塞总结
- 在accept()等待接收连接时,或者读写数据时,都会调用netpollblock()函数,函数内,检查 pollDesc 对象的 rg/wg 信号量是否处于 pdReady 就绪状态,如果不是,会将netpollblockcommit()传递给gopark(),执行gopark()把对应的g保存到pollDesc 的 rg或wg 当中实现挂起效果,(可以也要好好看一下netpollblock)
- gopark()有一个func类型的入参unlockf,在accept()等待接收连接或者读写数据时,传递的是netpollblockcommit()
- 查看 gopark()源码,基于GMP模型实现了挂起协程的效果,在 Go 的运行时系统中,每个 goroutine 都有一个对应的 M和 P,当一个 goroutine 调用 gopark 函数时,它会将自己从 M 中移除,并标记为等待状态
- 首先,通过 acquirem() 获取当前 M 和 G 。
- 然后,通过 releasem() 释放当前 M,这样当前的 goroutine 就不处于 M 的控制下,在调用 releasem() 之后,当前的 goroutine 依然占用着当前的 P,并尚未从 P 中移除,所以在下面还会调用park_m()将当前 goroutine 从 M 和 P 中移除
- 接着,该函数通过 mcall() 执行 park_m() ,在park_m()中首先会将当前g设置为_Gwaiting 等待运行状态 , 然后执行传递进来的netpollblockcommit() 尝试将当前协程g保存到 pollDesc 的 rg 或者 wg 指针里,如果失败返回false,会重新将协程标记为_Grunnable可运行状态等待下一次调度运行, 最终会执行" schedule()"进行调度执行其他可运行的 goroutine
- netpollblockcommit() 函数内会通过cas也就是atomic.Casuintptr(),将当前协程g保存到 pollDesc 的 rg 或者 wg 指针里,进入阻塞状态并等待网络 i/o 事件的到来,如果通过atomic.Casuintptr()设置失败,说明 pollDesc 阻塞的 Goroutine 已经被其他线程唤醒或者当前 Goroutine 发生了异常,需要在下一次循环中重新尝试阻塞,此时函数会返回 false
4. 唤醒
- 在执行了Accept/Read/Write 操作后,如果没有关注的事件发生,会执行 gopark挂起goroutine,在gopark()函数执行到park_m(),该函数末尾会会调用一个scheduler()调度器函数
- 在启动go项目时底层执行runtime.rt0_go,该函数内会调用runtime.schedinit(SB) 初始化调度相关设置,调用runtime·mstart(SB)启动调度循环,在mstart()中也会调用schedule()开启调度
- runtime.schedule()是调度器运行的核心方法
- 执行getg()获取g0
- 执行sched.gcwaiting != 0,判断当前是不是正在GC,如果是则调用gcstopm休眠当前的M,挂起结束后继续执行
- 避免全局队列中的g被饿死,有一个if判断每隔61次调度轮回执行globrunqget(g.m.p.ptr(), 1)从全局队列找获取可执行g
- 如果上面获取不到,执行runqget()函数,在p.runnext获取g,如果获取不到则通过p的本地队列中获取,先获取队头的
- 如果没有获取到可运行的G,再调用 findrunnable()函数,从全局队列、epoll、别的 P 里获取获取g,找不到的话就将m休眠,等待唤醒
- 当找到一个g后,就会调用 execute 去执行g
- runtime.schedule()中会调用一个 runtime.findrunable() 方法,从全局队列、epoll、别的 P 里获取可运行的G,详细步骤:
- 调用 runqget():尝试从P本地队列中获取G,获取到返回
- 调用 globrunqget (): 尝试从全局队列中获取G,获取到返回
- 调用netpollinited()从网络IO轮询器中找到就绪的G,把这个G变为可运行的G
- 如果还是未获取到可运行的G, 会判断是否有其它非空闲的p,如果有,执行runqsteal()函数随机选一个P,尝试从这个P中allp[enum.position()]中偷去一半的G,注意有一个遍历,最多尝试四次
- 如果当前没有非空闲P,会判断当前M是否运行进入自旋状态,如果运行则进入自旋
- 未获取到可运行G,并且M没有进入自旋状态时跳入stop位置,判断此时P是否处于 GC mark 阶段,同时该 P 上存在一个后台 GC 标记工作的协程时,此时有待处理的标记任务,就会将该等待执行的标记协程转化成 runnable 状态,执行这个 Mark 任务
- 再次尝试从全局队列中,检查所有的P获取G,网络IO轮询器中获取可运行的G
- 还是获取不到会将P和M解绑,调用 stopm() 来休眠该M,那何时会重新唤醒该M呢?这就要看 wakep 函数了。一般来说新建一个goroutine或者有个goroutine准备好时,会调用 wakep 来唤醒M或者新建M
- 另外go项目在运行时,会单独启动一个sysmon监控线程,用来做抢占式调度等操作, 在循环执行过程中检查距离上一次 runtime.netpoll 被调用是否超过了 10ms,若是则会去调用它拿到可运行的 goroutine 列表并通过调用 injectglist 把 g 列表放入全局调度队列或者当前 P 本地调度队列等待被执行
- 查看 runtime.netpoll源码
- 创建size=128的epollevent数组
- 调用epollwait()从 epoll 的 eventpoll.rdllist双向列表中获取就绪的fd列表,到epollevent数组中,底层依赖epoll_wait系统调用函数
- 遍历epollevent数组也就是就绪的fd,调用netpollready,找到对应的goroutine,并将其状态从pdWait修改为pdReady
- 最后返回pdReady状态的 goroutine列表 gList
- 接下来将就绪的 goroutine 加入到调度队列中,等待调度运行