go 进阶 多路复用支持: 二. Accept/Read/Write

2023-11-17

一. 通过httpServer服务端引用Accept

  1. 参考go 进阶 http标准库相关: 三. HttpServer 服务启动到Accept等待接收连接

二. Listener.Accept 等待连接

  1. Listener.Accept方法最终会调用到netFD的accept方法
    在这里插入图片描述
  2. netFD下的accept()内部
  1. 首先会调用到FD的Accept接收新的 socket 连接,并返回新的socket对应的fd,
  2. 然后调用newFD构造一个新的netfd,
  3. 并通过init 方法完成初始化: netFD.init() --> poll.FD.Init() --> poll.pollDesc.init(),比如在pollDesc.init()中

3.1 执行poll_runtime_pollServerInit:通过该函数最终调用到netpollinit(),封装一个epoll文件描述符实例epollevent,并且使用sync.Once封装保证程序中只会创建一个
3.2 执行poll_runtime_pollOpen: 调用alloc()初始化总大小约为 4KB的pollDesc结构体,调用netpollopen(),将可读,可写,对端断开,边缘触发 的监听事件注册到epollevent中

func (fd *netFD) accept() (netfd *netFD, err error) {
	// 调用netfd.FD的Accept接受新的 socket 连接,返回 socket 的 fd
	d, rsa, errcall, err := fd.pfd.Accept()
	...
	// 构造一个新的netfd
	if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
		poll.CloseFunc(d)
		return nil, err
	}
	// 调用 netFD 的 init 方法完成初始化
	if err = netfd.init(); err != nil {
		netfd.Close()
		return nil, err
	}
	lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
	netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
	return netfd, nil
}
  1. FD.Accept方法会使用 linux 系统调用 accept 接收新连接,创建对应的 socket
  1. 开启了一个无限for循环, 循环内会执行系统的比如linux的accept等待接收用户请求,创建socket
  2. 注意,在调用系统的accep()函数前,会执行一个Accept4Func()将当前fd设置为非阻塞的,所以accept方法会直接返回,
  3. accept()返回的 err 为nil 则表示正常建立新连接,不为nil,则判断err如果等于 syscall.EAGAIN当前 Socket 没有准备好并且"fd.pd.pollable() == true"当前的文件描述符是可轮询的,进入 pollDesc.waitRead 方法
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
	...
	for {
		// 使用 linux 系统调用 accept 接收新连接,创建对应的 socket
		s, rsa, errcall, err := accept(fd.Sysfd)
		//因为listener fd在创建时已经设置成了非阻塞
		//所以accept方法会直接返回,不管有没有新连接到来;如果 err == nil 则表示正常建立新连接,直接返回
		if err == nil {
			return s, rsa, "", err
		}
		//如果 err != nil,则判断 err == syscall.EAGAIN,符合条件则进入 pollDesc.waitRead 方法
		switch err {
		case syscall.EINTR:
			continue
		case syscall.EAGAIN: //syscall.EAGAIN 表示当前 Socket 没有准备好
			if fd.pd.pollable() { //为true表示当前文件描述符是可轮询的
				// 如果当前没有发生期待的 I/O 事件,那么 waitRead 会通过 park goroutine 让逻辑 block 在这里
				//挂起,直到可读就会被唤醒
				if err = fd.pd.waitRead(fd.isFile); err == nil {
					continue
				}
			}
		case syscall.ECONNABORTED:
			continue
		}
		return -1, nil, errcall, err
	}
}

// 使用 linux 的 accept 系统调用接收新连接并把这个 socket fd 设置成非阻塞 I/O
ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
  1. 在pollDesc的 waitRead方法中会执行一个runtime_pollWait()函数(这个函数实际执行的是poll_runtime_pollWait())
func (pd *pollDesc) waitRead(isFile bool) error {
    return pd.wait('r', isFile)
}func (pd *pollDesc) wait(mode int, isFile bool) error {
    if pd.runtimeCtx == 0 {
        return errors.New("waiting for unsupported file type")
    }
    //runtime_pollWait 实际上就是 src/runtime/netpoll.go 中的poll_runtime_pollWait 函数
    res := runtime_pollWait(pd.runtimeCtx, mode)
    return convertErr(res, isFile)
}
  1. poll_runtime_pollWait 方法会检查当前 pollDesc 对象的 netFD 对应的文件描述符是否有 I/O 事件发生
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    errcode := netpollcheckerr(pd, int32(mode))
    if errcode != pollNoError {
        return errcode
    }
    // As for now only Solaris, illumos, and AIX use level-triggered IO.
    if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
        netpollarm(pd, mode)
    }
    //通过for循环调用netpollblock()检查pollDesc 对象的 rg/wg 信号量
    //这里的 for 循环是为了一直等到io事件
    for !netpollblock(pd, int32(mode), false) {
        errcode = netpollcheckerr(pd, int32(mode))
        if errcode != pollNoError {
            return errcode
        }
  
    }
    return pollNoError
}
  1. netpollblock 函数检查 pollDesc 对象的 rg/wg 信号量是否处于 pdReady 就绪状态,如果不是将netpollblockcommit()传递给gopark(),执行gopark()把对应的g保存到pollDesc 的 rg或wg 当中挂起, 并持续等待 I/O 事件发生,也就是如果当前socket没有处于就绪状态,会把goroutinue park住,使其不使用cpu资源进行空转之类的操作
  1. 根据mode获取对应的信号量地址 gpp,判断当前是否pdReady。
  2. 判断当gpp的值如果等于 0 时,将gpp的值更替为pdWait,该操作属于原子操作且内部实现了自旋锁。
  3. 当值为pdWait之后,防止此时可能会有其他的并发操作修改 pd 里的内容,所以需要再次检查错误状态。gopark将当前 goroutine 置于等待状态并等待下一次的调度,但gopark仍有可能因为超时或者关闭会立即返回
  4. 通过原子操作将gpp的值设置为 0,返回修改前的值并判断是否pdReady。
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
	gpp := &pd.rg
	if mode == 'w' {
		gpp = &pd.wg
	}

	// set the gpp semaphore to pdWait
	for {
		//根据mode获取对应的信号量地址 gpp,判断当前是否pdReady
		old := *gpp
		if old == pdReady {
			*gpp = 0
			return true
		}
		if old != 0 {
			throw("runtime: double wait")
		}
		//这段代码的逻辑是当gpp的值如果等于 0 时,将gpp的值更替为pdWait,该操作属于原子操作且内部实现了自旋锁
		if atomic.Casuintptr(gpp, 0, pdWait) {
			break
		}
	}

	//当值为pdWait之后,防止此时可能会有其他的并发操作修改 pd 里的内容,
	//所以需要再次检查错误状态。gopark将当前 goroutine 置于等待状态并等待下一次的调度,
	//但gopark仍有可能因为超时或者关闭会立即返回
	if waitio || netpollcheckerr(pd, mode) == 0 {
		gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
	}
	// 通过原子操作将gpp的值设置为 0,返回修改前的值并判断是否pdReady
	old := atomic.Xchguintptr(gpp, 0)
	if old > pdWait {
		throw("runtime: corrupted polldesc")
	}
	return old == pdReady
}
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
    r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
    if r {
        atomic.Xadd(&netpollWaiters, 1)
    }
    return r
}
  1. 查看 gopark()源码,实现了挂起协程的效果,实际是基于GMP模型,在 Go 的运行时系统中,每个 goroutine 都有一个对应的 M和 P,当调用 gopark 函数时,它会将自己从 M 中移除,并标记为等待状态(下方有专门讲解)
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
	if reason != waitReasonSleep {
		checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
	}
	// 获取当前的m
	mp := acquirem()
	// 当前的g
	gp := mp.curg
	// 获取当前g的状态
	status := readgstatus(gp)
	if status != _Grunning && status != _Gscanrunning {
		throw("gopark: bad g status")
	}
	// 设置变量
	mp.waitlock = lock
	mp.waitunlockf = unlockf
	gp.waitreason = reason
	mp.waittraceev = traceEv
	mp.waittraceskip = traceskip
 
    // 释放m
	releasem(mp)
 
    // 切换到g0栈操作
	// can't do anything that might move the G between Ms here.
	mcall(park_m)
}

三. Conn.Read读数据/Conn.Write写数据

  1. 参考net/http,在接收到请求后,accept向下执行,会封装一个Conn结构体变量, Conn是一个接口,内部提供了读写数据的方法,在读写数据时最终会执行到此处
  2. 我们先看一下 Conn.Read 方法是如何实现的,原理其实和 Listener.Accept 是一样的,具体调用链: 首先调用 conn 的 netFD.Read ,然后内部再调用 poll.FD.Read ,最后使用 Linux 的系统调用 read: syscall.Read 完成数据读取,
  3. 查看FD.Read方法
  1. 执行系统上的syscall.Read(), 因为 socket 在被 listener accept 的时候设置成了非阻塞 I/O,这里不管有没有可读的数据同样也是直接返回,然后根据一个error判断是否获取到需要的事件,如果error不为空表示没有执行pollDesc.waitRead挂起当前协程
// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
	if !c.ok() {
		return 0, syscall.EINVAL
	}
	//调用netFD下的read方法
	n, err := c.fd.Read(b)
	if err != nil && err != io.EOF {
		err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
	}
	return n, err
}
 
func (fd *netFD) Read(p []byte) (n int, err error) {
	//调用FD下的Read方法
	n, err = fd.pfd.Read(p)
	runtime.KeepAlive(fd)
	return n, wrapSyscallError("read", err)
}
 
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
	if err := fd.readLock(); err != nil {
		return 0, err
	}
	defer fd.readUnlock()
	if len(p) == 0 {
		return 0, nil
	}
	if err := fd.pd.prepareRead(fd.isFile); err != nil {
		return 0, err
	}
	if fd.IsStream && len(p) > maxRW {
		p = p[:maxRW]
	}
	for {
		// 尝试从该 socket 读取数据,因为 socket 在被 listener accept 的时候设置成
		// 了非阻塞 I/O,所以这里同样也是直接返回,不管有没有可读的数据
		n, err := syscall.Read(fd.Sysfd, p)
		if err != nil {
			n = 0
			// err == syscall.EAGAIN 表示当前没有期待的 I/O 事件发生,也就是 socket 不可读
			if err == syscall.EAGAIN && fd.pd.pollable() {
				// 如果当前没有发生期待的 I/O 事件,那么 waitRead 
				// 会通过 park goroutine 让逻辑 block 在这里
				if err = fd.pd.waitRead(fd.isFile); err == nil {
					continue
				}
			}
 
			// On MacOS we can see EINTR here if the user
			// pressed ^Z.  See issue #22838.
			if runtime.GOOS == "darwin" && err == syscall.EINTR {
				continue
			}
		}
		err = fd.eofError(n, err)
		return n, err
	}
}
  1. conn.Write 和 conn.Read 的原理是一致的,它也是通过类似 pollDesc.waitRead 的 pollDesc.waitWrite 来 park 住 goroutine 直至期待的 I/O 事件发生才返回恢复执行
  2. 查看pollDesc.waitRead 内部调用了 poll.runtime_pollWait 也就是 runtime.poll_runtime_pollWait 来达成无 I/O 事件时 park 住 goroutine 的目的
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
	err := netpollcheckerr(pd, int32(mode))
	if err != pollNoError {
		return err
	}
	// As for now only Solaris, illumos, and AIX use level-triggered IO.
	if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
		netpollarm(pd, mode)
	}
	// 进入 netpollblock 并且判断是否有期待的 I/O 事件发生,
	// 这里的 for 循环是为了一直等到 io ready
	for !netpollblock(pd, int32(mode), false) {
		err = netpollcheckerr(pd, int32(mode))
		if err != 0 {
			return err
		}
	}
	return 0
}

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
   	// gpp 保存的是 goroutine 的数据结构 g,这里会根据 mode 的值决定是 rg 还是 wg,
	// 前面提到过,rg 和 wg 是用来保存等待 I/O 就绪的 gorouine 的,后面调用 gopark 之后,
  	// 会把当前的 goroutine 的抽象数据结构 g 存入 gpp 这个指针,也就是 rg 或者 wg
	gpp := &pd.rg
	if mode == 'w' {
		gpp = &pd.wg
	}
 
	// 这个 for 循环是为了等待 io ready 或者 io wait
	for {
		old := *gpp
		// gpp == pdReady 表示此时已有期待的 I/O 事件发生,
		// 可以直接返回 unblock 当前 goroutine 并执行响应的 I/O 操作
		if old == pdReady {
			*gpp = 0
			return true
		}
		if old != 0 {
			throw("runtime: double wait")
		}
		// 如果没有期待的 I/O 事件发生,则通过原子操作把 gpp 的值置为 pdWait 并退出 for 循环
		if atomic.Casuintptr(gpp, 0, pdWait) {
			break
		}
	}
 
	// waitio 此时是 false,netpollcheckerr 方法会检查当前 pollDesc 对应的 fd 是否是正常的,
	// 通常来说  netpollcheckerr(pd, mode) == 0 是成立的,所以这里会执行 gopark 
	// 把当前 goroutine 给 park 住,直至对应的 fd 上发生可读/可写或者其他『期待的』I/O 事件为止,
	// 然后 unpark 返回,在 gopark 内部会把当前 goroutine 的抽象数据结构 g 存入
	// gpp(pollDesc.rg/pollDesc.wg) 指针里,以便在后面的 netpoll 函数取出 pollDesc 之后,
	// 把 g 添加到链表里返回,接着重新调度 goroutine
	if waitio || netpollcheckerr(pd, mode) == 0 {
		// 注册 netpollblockcommit 回调给 gopark,在 gopark 内部会执行它,保存当前 goroutine 到 gpp
		gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
	}
	// be careful to not lose concurrent READY notification
	old := atomic.Xchguintptr(gpp, 0)
	if old > pdWait {
		throw("runtime: corrupted polldesc")
	}
	return old == pdReady
}
  1. pollDesc.waitWrite 的内部实现原理和 pollDesc.waitRead 是一样的,都是基于 poll.runtime_pollWait --> runtime.poll_runtime_pollWait,这里就不再赘述

四. gopark() 阻塞

  1. 在accept()等待接收连接时,或者读写数据时,都会调用netpollblock()函数,函数内,检查 pollDesc 对象的 rg/wg 信号量是否处于 pdReady 就绪状态,如果不是,会将netpollblockcommit()传递给gopark(),执行gopark()把对应的g保存到pollDesc 的 rg或wg 当中实现挂起效果,(可以也要好好看一下netpollblock)
  2. gopark()有一个func类型的入参unlockf,在accept()等待接收连接或者读写数据时,传递的是netpollblockcommit()
  3. 查看 gopark()源码,基于GMP模型实现了挂起协程的效果,在 Go 的运行时系统中,每个 goroutine 都有一个对应的 M和 P,当一个 goroutine 调用 gopark 函数时,它会将自己从 M 中移除,并标记为等待状态
  1. 首先,通过 acquirem() 获取当前 M 和 G 。
  2. 然后,通过 releasem() 释放当前 M,这样当前的 goroutine 就不处于 M 的控制下,在调用 releasem() 之后,当前的 goroutine 依然占用着当前的 P,并尚未从 P 中移除,所以在下面还会调用park_m()将当前 goroutine 从 M 和 P 中移除
  3. 接着,该函数通过 mcall() 执行 park_m() ,在park_m()中首先会将当前g设置为_Gwaiting 等待运行状态 , 然后执行传递进来的netpollblockcommit() 尝试将当前协程g保存到 pollDesc 的 rg 或者 wg 指针里,如果失败返回false,会重新将协程标记为_Grunnable可运行状态等待下一次调度运行, 最终会执行" schedule()"进行调度执行其他可运行的 goroutine
// gopark 会停住当前的 goroutine 并且调用传递进来的回调函数 unlockf,
//从上面accept等待连接和读写数据的源码中我们可以知道当前这个函数是netpollblockcommit
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
    // 检查是否有定时器到期,有则执行回调
    if reason != waitReasonSleep {
        checkTimeouts()
    }
    // 获得当前 M 和 G 的信息
    mp := acquirem()
    gp := mp.curg
    status := readgstatus(gp)
    // 检查当前 G 的状态是否可以进行 park 等待
    if status != _Grunning && status != _Gscanrunning {
        throw("gopark: bad g status")
    }
    // 将等待状态记录到当前 G 的状态里
    mp.waitlock = lock
    mp.waitunlockf = unlockf
    gp.waitreason = reason
    mp.waittraceev = traceEv
    mp.waittraceskip = traceskip
    // 释放当前 M(在调用 releasem() 之后,当前的 goroutine 依然占用着当前的 P,并尚未被从 P 中移除,
    //所以在下面还会调用park_m()将当前 goroutine 从 M 和 P 中移除,并标记为等待状态_Gwaiting)
    releasem(mp)
    // 进行阻塞(切换到g0栈操作)
    // gopark 最终会调用 park_m,在这个函数内部会调用 unlockf,也就是 netpollblockcommit,
	// 然后会把当前的 goroutine,也就是 g 数据结构保存到 pollDesc 的 rg 或者 wg 指针里
    mcall(park_m)
}

// park_m 暂停当前 goroutine 的执行,并从 M 和 P 中移除,标记为等待状态_Gwaiting
func park_m(gp *g) {
	_g_ := getg()
 
	if trace.enabled {
		traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
	}
 
	// 将当前 goroutine 从 _Grunning 运行状态切换为 _Gwaiting 等待状态 
	casgstatus(gp, _Grunning, _Gwaiting)
	dropg()
 
	//判断是否存在 waitunlockf 函数
	if fn := _g_.m.waitunlockf; fn != nil {
		//存在则调用,实际就是调用上方注册的netpollblockcommit(),
		ok := fn(gp, _g_.m.waitlock)
		_g_.m.waitunlockf = nil
		_g_.m.waitlock = nil
		//如果netpollblockcommit()返回false,说明当前g阻塞条件不满足,不能阻塞
		//则表示等待结束,
		//把当前的 goroutine, 也就是 g 数据结构保存到 pollDesc 的 rg 或者 wg 指针里
		if !ok {
			if trace.enabled {
				traceGoUnpark(gp, 2)
			}

			//重新将该 goroutine 标记为_Grunnable可运行状态,并在下一次调度时唤醒它。 
			casgstatus(gp, _Gwaiting, _Grunnable)
			execute(gp, true) // Schedule it back, never returns.
		}
	}

	// 进行调度,执行其他可运行的 goroutine。
	schedule()
}
  1. netpollblockcommit() 函数内会通过cas也就是atomic.Casuintptr(),将当前协程g保存到 pollDesc 的 rg 或者 wg 指针里,进入阻塞状态并等待网络 i/o 事件的到来,如果通过atomic.Casuintptr()设置失败,说明 pollDesc 阻塞的 Goroutine 已经被其他线程唤醒或者当前 Goroutine 发生了异常,需要在下一次循环中重新尝试阻塞,此时函数会返回 false(注意我这里一直没看太懂,可能不太正确,慎重参考)
// netpollblockcommit 在 gopark 函数里被调用
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
	// 通过原子操作把当前 goroutine 抽象的数据结构 g,也就是这里的参数 gp 存入 gpp 指针,
	// 此时 gpp 的值是 pollDesc 的 rg 或者 wg 指针
	r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
	if r {
		//计数器累加,主要用于记录当前等待网络 i/o 事件的 Goroutine 数量,为网络轮询器的运行提供支持
		atomic.Xadd(&netpollWaiters, 1)
	}
	return r
}
  1. netpollblockcommit()不同版本实现方式上可能有所区别,某些版本通过该函数好像还会把需要阻塞的协程g保存到 epoll 的 “interest list” 里,是个由红黑树实现的 eventpoll.rbr,此时 G 的状态由 _Grunning为_Gwaitting ,因此 G 必须被手动唤醒 协程是如何被唤醒的(注意我这里一直没看太懂,也没找到相关的文章,可能不太正确,慎重参考)

五. netpoll 唤醒等待队列中挂起的协程

  1. 在接收用户请求时会调用Accept()
  1. 创建初始化: 首先Accept中会创建pollDesc,创建FD文件描述符,调用一系列的init()进行初始化,并将关注的事件注册到epollevent中
  2. 因为fd在创建时已经设置成了非阻塞,所以accept方法会直接返回,accept()返回的 err 为nil 则表示正常建立新连接,不为nil,则判断err如果等于 syscall.EAGAIN当前 Socket 没有准备好并且"fd.pd.pollable() == true"当前的文件描述符是可轮询的,进入 pollDesc.waitRead 方法
  3. pollDesc 下的 waitRead 最终会调用到一个netpollblock 函数检查 pollDesc 对象的 rg/wg 信号量是否处于 pdReady 就绪状态,如果不是将netpollblockcommit()传递给gopark(),执行gopark()把对应的g保存到pollDesc 的 rg或wg 当中挂起, 并持续等待 I/O 事件发生,也就是如果当前socket没有处于就绪状态,会把goroutinue park住
  1. 接收到连接请求Accept()方法返回会拿到一个net.Conn连接,可以理解为拿到了一个基于TCP的HTTP连接,并将conn连接的状态标志为StateNew,Conn是一个接口,内部提供了:Read读取连接数据,Write发生数据,Close关闭连接等方法
  1. 查看Conn.Write 与 Conn.Read底层,原理大致相同,与accept也有点类似,以Read为例,底层会调用netFD的Read()方法—>FD的Read()方法,该
  2. 在Read方法中,也会判断是否有关注的事件发生,根据判断也会进入进入pollDesc.waitRead函数中,最终调用netpollblock()函数,检查 pollDesc 对象的 rg/wg 信号量是否处于 pdReady 就绪状态,如果不是,会将netpollblockcommit()传递给gopark(),执行gopark()把对应的g保存到pollDesc 的 rg或wg 当中挂起, 并持续等待 I/O 事件发生,
  1. 也就是在执行了Accept/Read/Write 操作后,如果没有关注的事件发生,会执行 gopark挂起goroutine,将当前协程保存到pollDesc 对象的 rg或wg属性上,某些版本好像会保存到 epoll 的 “interest list” 里,所以被阻塞的协程如何唤醒?
  2. 核心逻辑在runtime.netpoll函数中:

什么时候会调用 runtime.netpoll

  1. 在执行了Accept/Read/Write 操作后,如果没有关注的事件发生,会执行 gopark挂起goroutine,在gopark()函数执行到park_m(),该函数末尾会会调用一个scheduler()调度器函数
  2. 在启动go项目时底层执行runtime.rt0_go,该函数内会调用runtime.schedinit(SB) 初始化调度相关设置,调用runtime·mstart(SB)启动调度循环,在mstart()中也会调用schedule()开启调度
  3. runtime.schedule()是调度器运行的核心方法
  1. 执行getg()获取g0
  2. 执行sched.gcwaiting != 0,判断当前是不是正在GC,如果是则调用gcstopm休眠当前的M,挂起结束后继续执行
  3. 避免全局队列中的g被饿死,有一个if判断每隔61次调度轮回执行globrunqget(g.m.p.ptr(), 1)从全局队列找获取可执行g
  4. 如果上面获取不到,执行runqget()函数,在p.runnext获取g,如果获取不到则通过p的本地队列中获取,先获取队头的
  5. 如果没有获取到可运行的G,再调用 findrunnable()函数,从全局队列、epoll、别的 P 里获取获取g,找不到的话就将m休眠,等待唤醒
  6. 当找到一个g后,就会调用 execute 去执行g
  1. runtime.schedule()中会调用一个 runtime.findrunable() 方法,从全局队列、epoll、别的 P 里获取可运行的G,详细步骤:
  1. 调用 runqget():尝试从P本地队列中获取G,获取到返回
  2. 调用 globrunqget (): 尝试从全局队列中获取G,获取到返回
  3. 调用netpollinited()从网络IO轮询器中找到就绪的G,把这个G变为可运行的G
  4. 如果还是未获取到可运行的G, 会判断是否有其它非空闲的p,如果有,执行runqsteal()函数随机选一个P,尝试从这个P中allp[enum.position()]中偷去一半的G,注意有一个遍历,最多尝试四次
  5. 如果当前没有非空闲P,会判断当前M是否运行进入自旋状态,如果运行则进入自旋
  6. 未获取到可运行G,并且M没有进入自旋状态时跳入stop位置,判断此时P是否处于 GC mark 阶段,同时该 P 上存在一个后台 GC 标记工作的协程时,此时有待处理的标记任务,就会将该等待执行的标记协程转化成 runnable 状态,执行这个 Mark 任务
  7. 再次尝试从全局队列中,检查所有的P获取G,网络IO轮询器中获取可运行的G
  8. 还是获取不到会将P和M解绑,调用 stopm() 来休眠该M,那何时会重新唤醒该M呢?这就要看 wakep 函数了。一般来说新建一个goroutine或者有个goroutine准备好时,会调用 wakep 来唤醒M或者新建M
func schedule() {
	...
  if gp == nil {
		gp, inheritTime = findrunnable() // blocks until work is available
	}
	...
}
 
func findrunnable() (gp *g, inheritTime bool) {
  ...
  
  	// Poll network.
	if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
		atomic.Store64(&sched.pollUntil, uint64(pollUntil))
		if _g_.m.p != 0 {
			throw("findrunnable: netpoll with p")
		}
		if _g_.m.spinning {
			throw("findrunnable: netpoll with spinning")
		}
		if faketime != 0 {
			// When using fake time, just poll.
			delta = 0
		}
		list := netpoll(delta) // 同步阻塞调用 netpoll,直至有可用的 goroutine
		atomic.Store64(&sched.pollUntil, 0)
		atomic.Store64(&sched.lastpoll, uint64(nanotime()))
		if faketime != 0 && list.empty() {
			// Using fake time and nothing is ready; stop M.
			// When all M's stop, checkdead will call timejump.
			stopm()
			goto top
		}
		lock(&sched.lock)
		_p_ = pidleget() // 查找是否有空闲的 P 可以来就绪的 goroutine
		unlock(&sched.lock)
		if _p_ == nil {
			injectglist(&list) // 如果当前没有空闲的 P,则把就绪的 goroutine 放入全局调度队列等待被执行
		} else {
			// 如果当前有空闲的 P,则 pop 出一个 g,返回给调度器去执行,
			// 并通过调用 injectglist 把剩下的 g 放入全局调度队列或者当前 P 本地调度队列
			acquirep(_p_)
			if !list.empty() {
				gp := list.pop()
				injectglist(&list)
				casgstatus(gp, _Gwaiting, _Grunnable)
				if trace.enabled {
					traceGoUnpark(gp, 0)
				}
				return gp, false
			}
			if wasSpinning {
				_g_.m.spinning = true
				atomic.Xadd(&sched.nmspinning, 1)
			}
			goto top
		}
	} else if pollUntil != 0 && netpollinited() {
		pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
		if pollerPollUntil == 0 || pollerPollUntil > pollUntil {
			netpollBreak()
		}
	}
	stopm()
	goto top
}
  1. 另外go项目在运行时,会单独启动一个sysmon监控线程,用来做抢占式调度等操作, 在循环执行过程中检查距离上一次 runtime.netpoll 被调用是否超过了 10ms,若是则会去调用它拿到可运行的 goroutine 列表并通过调用 injectglist 把 g 列表放入全局调度队列或者当前 P 本地调度队列等待被执行
//go:nowritebarrierrec
func sysmon() {
		...
		// poll network if not polled for more than 10ms
		lastpoll := int64(atomic.Load64(&sched.lastpoll))
		if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
			atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
			list := netpoll(0) // non-blocking - returns list of goroutines
			if !list.empty() {
			
				incidlelocked(-1)
				injectglist(&list)
				incidlelocked(1)
			}
		}
  
  ...
}

runtime.netpoll 返回一组已经准备就绪的 Goroutine

  1. 查看 runtime.netpoll源码
  1. 创建size=128的epollevent数组
  2. 调用epollwait()从 epoll 的 eventpoll.rdllist双向列表中获取就绪的fd列表,到epollevent数组中,底层依赖epoll_wait系统调用函数
  3. 遍历epollevent数组也就是就绪的fd,调用netpollready,找到对应的goroutine,并将其状态从pdWait修改为pdReady
  4. 最后返回pdReady状态的 goroutine列表 gList
  5. 接下来将就绪的 goroutine 加入到调度队列中,等待调度运行
// netpoll检查是否有网络连接
//返回可运行的goroutine列表
// delay < 0:无限期阻塞
// delay == 0:不阻塞,只是轮询
// delay > 0:阻塞的时间不超过该纳秒
func netpoll(delay int64) gList {
	if epfd == -1 {
		return gList{}
	}
 
	// 根据特定的规则把 delay 值转换为 epollwait 的 timeout 值
	var waitms int32
	if delay < 0 {
		waitms = -1
	} else if delay == 0 {
		waitms = 0
	} else if delay < 1e6 {
		waitms = 1
	} else if delay < 1e15 {
		waitms = int32(delay / 1e6)
	} else {
		//定时器等待时间的任意上限
		// 1e9 ms == ~11.5天
		waitms = 1e9
	}
	var events [128]epollevent
retry:
	// 超时等待就绪的 fd 读写事件
	n := epollwait(epfd, &events[0], int32(len(events)), waitms)
	if n < 0 {
		if n != -_EINTR {
			println("runtime: epollwait on fd", epfd, "failed with", -n)
			throw("runtime: netpoll failed")
		}
		//如果定时睡眠被打断,请返回到
		//重新计算我们现在应该睡多久
		if waitms > 0 {
			return gList{}
		}
		goto retry
	}
 
	// toRun 是一个 g 的链表,存储要恢复的 goroutines,最后返回给调用方
	var toRun gList
	for i := int32(0); i < n; i++ {
		ev := &events[i]
		if ev.events == 0 {
			continue
		}
 
		// Go scheduler 在调用 findrunnable() 寻找 goroutine 去执行的时候,
		// 在调用 netpoll 之时会检查当前是否有其他线程同步阻塞在 netpoll,
		// 若是,则调用 netpollBreak 来唤醒那个线程,避免它长时间阻塞
		if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
			if ev.events != _EPOLLIN {
				println("runtime: netpoll: break fd ready for", ev.events)
				throw("runtime: netpoll: break fd ready for something unexpected")
			}
			if delay != 0 {
				// netpollBreak could be picked up by a
				// nonblocking poll. Only read the byte
				// if blocking.
				var tmp [16]byte
				read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
				atomic.Store(&netpollWakeSig, 0)
			}
			continue
		}
 
		// 判断发生的事件类型,读类型或者写类型等,然后给 mode 复制相应的值,
    	// mode 用来决定从 pollDesc 里的 rg 还是 wg 里取出 goroutine
		var mode int32
		if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
			mode += 'r'
		}
		if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
			mode += 'w'
		}
		if mode != 0 {
			// 取出保存在 epollevent 里的 pollDesc
			pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
			pd.everr = false
			if ev.events == _EPOLLERR {
				pd.everr = true
			}
			// 调用 netpollready,传入就绪 fd 的 pollDesc,
			// 把 fd 对应的 goroutine 添加到链表 toRun 中
			netpollready(&toRun, pd, mode)
		}
	}
	//toRun是一个 g 的链表,存储要恢复的 goroutines
	return toRun
}
 
// netpollready 调用 netpollunblock 返回就绪 fd 对应的 goroutine 的抽象数据结构 g
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
	var rg, wg *g
	//获取对应的g的指针
	if mode == 'r' || mode == 'r'+'w' {
		rg = netpollunblock(pd, 'r', true)
	}
	if mode == 'w' || mode == 'r'+'w' {
		wg = netpollunblock(pd, 'w', true)
	}
	//将对应的g加入到toRun列表中
	if rg != nil {
		toRun.push(rg)
	}
	if wg != nil {
		toRun.push(wg)
	}
}
 
// netpollunblock 会依据传入的 mode 决定从 pollDesc 的 rg 或者 wg 取出当时 gopark 之时存入的
// goroutine 抽象数据结构 g 并返回
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
	//mode == 'r' 代表当时 gopark 是为了等待读事件,而 mode == 'w' 则代表是等待写事件
	gpp := &pd.rg
	//根据传入的mode判断事件类型
	if mode == 'w' {
		gpp = &pd.wg
	}
 
	for {
		// 取出 gpp 存储的 g
		old := *gpp
		if old == pdReady {
			return nil
		}
		if old == 0 && !ioready {
			// Only set READY for ioready. runtime_pollWait
			// will check for timeout/cancel before waiting.
			return nil
		}
		var new uintptr
		if ioready {
			new = pdReady
		}
		// 重置 pollDesc 的 rg 或者 wg,将读或者写信号量转换成 pdReady
		if atomic.Casuintptr(gpp, old, new) {
      		// 如果该 goroutine 还是必须等待,则返回 nil
			if old == pdWait {
				old = 0
			}
			// 返回对应的 g指针
			return (*g)(unsafe.Pointer(old))
		}
	}
}
 
// netpollBreak 往通信管道里写入信号去唤醒 epollwait
func netpollBreak() {
	// 通过 CAS 避免重复的唤醒信号被写入管道,
	// 从而减少系统调用并节省一些系统资源
	if atomic.Cas(&netpollWakeSig, 0, 1) {
		for {
			var b byte
			n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
			if n == 1 {
				break
			}
			if n == -_EINTR {
				continue
			}
			if n == -_EAGAIN {
				return
			}
			println("runtime: netpollBreak write failed with", -n)
			throw("runtime: netpollBreak write failed")
		}
	}
}

六. 总结

复习前面

  1. 在通过net/http编写服务端时, 首先调用NewServeMux()创建多路复用器,编写对外接收请求的接口函数也就是处理器,然后调用多路复用器上的HandleFunc()方法,将接口与接口路径进行绑定,注册路由, 最后调用ListenAndServe()函数在指定端口开启监听,启动服务
  2. "net/http"下的ListenAndServe()函数,在该函数中首先会封装一个Server结构体变量,调用Server的ListenAndServe()方法,该方法中
  1. " net.Listen(“tcp”, addr)": 多路复用相关初始化,初始化socket,端口连接绑定,开启监听
  2. “srv.Serve(ln)”: 等待接收客户端连接Accept(),与接收到连接后的处理流程
  1. 查看srv.Serve(ln)内部逻辑,该方法内可以重点简化为:
  1. 方法内开启了一个无限for循环
  2. 在循环内部,调用Listener的Accept()方法,假设当前是TCP连接调用的就是TCPListener下的Accept(),阻塞监听客户端连接,是阻塞的(该方法内部有多路复用的相关逻辑,此处先不关注)
  3. 当有接收到连接请求后Accept()方法返回,拿到一个新的net.Conn连接实例,继续向下执行,封装net.Conn连接,设置连接状态为StateNew
  4. 通过协程执行连接的serve()方法,每一个连接开启一个goroutine来处理
  5. 然后for循环继续执行等待下一个连接
  1. 最后当接收到连接请求Accept()方法返回会拿到一个net.Conn连接,可以理解为拿到了一个基于TCP的HTTP连接,并将conn连接的状态标志为StateNew,Conn是一个接口,内部提供了:Read读取连接数据,Write发生数据,Close关闭连接等方法,比较重要,底层就是通过这几个方法处理请求读写的
  2. 然后开启一个goroutine执行它的serve()方法,这里对每一个连接开启一个goroutine来处理,这就是并发的体现,查看conn上的serve(),该方法内重点执行了:
  1. 首先调用newBufioReader() 封装了一个bufio.Reader
  2. 开启了一个无限for循环,循环内
  3. 调用conn的readRequest(ctx)方法读取请求的内容,比如解析HTTP请求协议,读取请求头,请求参数,封装Request和response,在解析时会读取请求头的 Content-Length,不为 0会通过TCPConn.Read() 方法读取指定长度的数据并存入请求体中,如果 Content-Length 为 0 或者没有设置,则请求体为空
  4. 封装serverHandler调用serverHandler上的ServeHTTP(w, w.req)方法进行路由匹配,找到对应的处理函数,执行我们写的业务逻辑
  5. 调用response的finishRequest()方法进行最后处理工作,当底层 bufio.Writer 缓冲区的大小达到阈值或者Flush() 被显式调用时,就会将缓冲区内的数据写入到底层连接中,并触发 Conn 的 Write() 方法将数据发送到客户端,另外finishRequest()方法还会进行一些比如异常处理,资源回收,状态更新等操作
  6. 最后调用conn的setState()设置连接状态为StateIdle,方便后续重用连接

引出后面

1. Accept 总结

  1. 首先了解 Accept 要先了解FD, 了解pollDesc,与pollDesc内部的rg,wg(前面的文档中有接收,此处不再赘述)
  2. Listener.Accept()可以分:创建初始化,与调度运行两部分去理解
  1. 创建初始化: 首先Accept中会创建pollDesc,创建FD文件描述符,调用一系列的init()进行初始化,并将关注的事件注册到epollevent中
  2. 调度运行: 当接收到连接后accept继续向下执行,会通过netpollblock()检查是否有关注的事件发生,调用gopark()把对应的g保存到pollDesc 的 rg或wg 当中挂起,持续等待关注的事件发生
  1. Listener.Accept() 创建初始化详解: 首先Listener.Accept() 方法内会调用netFD下的accept(),该函数中
  1. 首先会调用到FD的Accept接收新的 socket 连接,函数内部

1.1 开启了一个无限for循环, 循环内会执行系统的比如linux的accept等待接收用户请求,创建socket
1.2 注意,在调用系统的accep()函数前,会执行一个Accept4Func()将当前fd设置为非阻塞的,所以accept方法会直接返回
1.3 accept()返回的 err 为nil 则表示正常建立新连接,不为nil,则判断err如果等于 syscall.EAGAIN表示当前 Socket 没有准备好并且"fd.pd.pollable() == true"当前的文件描述符是可轮询的,进入 pollDesc.waitRead 方法
1.4 pollDesc 下的 waitRead 中会判断是否有需要的事件发生,如果没有会通过 park goroutine 让逻辑 block 在这里,直到接收到需要的io事件为止

  1. 然后调用newFD()构造一个新的netfd,
  2. 调用一系列的init 方法完成初始化: netFD.init() --> poll.FD.Init() --> poll.pollDesc.init(),在pollDesc.init()中

3.1 执行poll_runtime_pollServerInit:通过该函数最终调用到netpollinit(),封装一个epoll文件描述符实例epollevent,并且使用sync.Once封装保证程序中只会创建一个
3.2 执行poll_runtime_pollOpen: 调用alloc()初始化总大小约为 4KB的pollDesc结构体,调用netpollopen(),将可读,可写,对端断开,边缘触发 的监听事件注册到epollevent中

  1. Listener.Accept() 调度运行详解,netFD下的accept()调用的FD的Accept, 该函数中调用对应系统的accept()接收新的socket, 注意这个accept()是非阻塞的,当accept返回后,会判断一个error,如果是nil表示连接成功,如果不是nil,判断err如果等于syscall.EAGAIN当前 Socket 没有准备好并且"fd.pd.pollable() == true"当前的文件描述符是可轮询的,进入 pollDesc.waitRead(),在waitRead函数中调用了wait函数:
  1. 判断当前的文件描述符是否支持等待,如果不支持则返回错误;否则调用runtime_pollWait。
  2. runtime_pollWait实际上就是调用poll_runtime_pollWait函数,poll_runtime_pollWait()中会调用netpollblock()函数,检查 pollDesc 对象的 rg/wg 信号量是否处于 pdReady 就绪状态,如果不是,会将netpollblockcommit()传递给gopark(),执行gopark()把对应的g保存到pollDesc 的 rg或wg 当中挂起, 并持续等待 I/O 事件发生,也就是如果当前socket没有处于就绪状态,会把goroutinue park住,使其不使用cpu资源进行空转之类的操作
  1. gopark 与 netpollblockcommit()下方有专门讲解此处不赘述

2. 数据的读写总结

  1. 前面了解到在通过net/http编写服务端时, 首先调用NewServeMux()创建多路复用器,编写对外接收请求的接口函数也就是处理器,然后调用多路复用器上的HandleFunc()方法,将接口与接口路径进行绑定,注册路由, 最后调用ListenAndServe()函数在指定端口开启监听,启动服务
  2. "net/http"下的ListenAndServe()函数,在该函数中首先会封装一个Server结构体变量,调用Server的ListenAndServe()方法,该方法中
  1. " net.Listen(“tcp”, addr)": 多路复用相关初始化,初始化socket,端口连接绑定,开启监听
  2. “srv.Serve(ln)”: 等待接收客户端连接Accept(),与接收到连接后的处理流程
  1. 查看srv.Serve(ln)内部逻辑,该方法内可以重点简化为:
  1. 方法内开启了一个无限for循环
  2. 在循环内部,调用Listener的Accept()方法,假设当前是TCP连接调用的就是TCPListener下的Accept(),阻塞监听客户端连接,是阻塞的(该方法内部有多路复用的相关逻辑,此处先不关注)
  3. 当接收到连接请求Accept()方法返回会拿到一个net.Conn连接,可以理解为拿到了一个基于TCP的HTTP连接,并将conn连接的状态标志为StateNew,Conn是一个接口,内部提供了:Read读取连接数据,Write发生数据,Close关闭连接等方法,比较重要,底层就是通过这几个方法处理请求读写的
  4. 然后开启一个goroutine执行conn的serve()方法
  1. 通过以上我们了解到接收到请求后Accept()函数返回,会封装一个Conn连接,并将conn连接的状态标志为StateNew,Conn是一个接口,内部提供了:Read读取连接数据,Write发生数据,Close关闭连接等方法,了解到了会开启协程执行Conn上的Server()方法处理用户请求,在这个处理用户请求的server方法中
  1. 首先调用newBufioReader() 封装了一个bufio.Reader
  2. 开启了一个无限for循环,循环内
  3. 调用conn的readRequest(ctx)方法读取请求的内容,比如解析HTTP请求协议,读取请求头,请求参数,封装Request和response,在解析时会读取请求头的 Content-Length,不为 0会通过TCPConn.Read() 方法读取指定长度的数据并存入请求体中,如果 Content-Length 为 0 或者没有设置,则请求体为空
  4. 封装serverHandler调用serverHandler上的ServeHTTP(w, w.req)方法进行路由匹配,找到对应的处理函数,执行我们写的业务逻辑
  5. 调用response的finishRequest()方法进行最后处理工作,当底层 bufio.Writer 缓冲区的大小达到阈值或者Flush() 被显式调用时,就会将缓冲区内的数据写入到底层连接中,并触发 Conn 的 Write() 方法将数据发送到客户端,另外finishRequest()方法还会进行一些比如异常处理,资源回收,状态更新等操作
  6. 最后调用conn的setState()设置连接状态为StateIdle,方便后续重用连接
  1. 进而我们了解到在Conn上的Server()方法执行处理用户请求时,该方法内部会间接的调用到Conn上的:Read或Write实现数据的读写,查看Conn.Write 与 Conn.Read底层,原理大致相同,与accept也有点类似,以Read为例,底层会调用netFD的Read()方法—>FD的Read()方法,该Read方法中
  1. 开启了一个for循环,
  2. 在fd创建时就设置成了非阻塞的,执行"syscall.Read(fd.Sysfd, p)"系统的Read函数尝试通过socket读取数据,如果Read函数返回的err为空表示数据读取成功
  3. 如果err不为空,判断err为syscall.EAGAIN未准备好,并且当前fd是可轮询的fd.pd.pollable()返回true,进入pollDesc.waitRead函数中
  1. pollDesc.waitRead 内部调用了 poll.runtime_pollWait 也就是 runtime.poll_runtime_pollWait(),内部:
  1. 先调用netpollcheckerr()检查当前是否有错误发生。如果发生了错误,则返回错误的代码(比如说 EAGAIN)。
  2. 调用netpollblock()函数,检查 pollDesc 对象的 rg/wg 信号量是否处于 pdReady 就绪状态,如果不是,会将netpollblockcommit()传递给gopark(),执行gopark()把对应的g保存到pollDesc 的 rg或wg 当中挂起, 并持续等待 I/O 事件发生,也就是如果当前socket没有处于就绪状态,会把goroutinue park住,使其不使用cpu资源进行空转之类的操作

3. 阻塞总结

  1. 在accept()等待接收连接时,或者读写数据时,都会调用netpollblock()函数,函数内,检查 pollDesc 对象的 rg/wg 信号量是否处于 pdReady 就绪状态,如果不是,会将netpollblockcommit()传递给gopark(),执行gopark()把对应的g保存到pollDesc 的 rg或wg 当中实现挂起效果,(可以也要好好看一下netpollblock)
  2. gopark()有一个func类型的入参unlockf,在accept()等待接收连接或者读写数据时,传递的是netpollblockcommit()
  3. 查看 gopark()源码,基于GMP模型实现了挂起协程的效果,在 Go 的运行时系统中,每个 goroutine 都有一个对应的 M和 P,当一个 goroutine 调用 gopark 函数时,它会将自己从 M 中移除,并标记为等待状态
  1. 首先,通过 acquirem() 获取当前 M 和 G 。
  2. 然后,通过 releasem() 释放当前 M,这样当前的 goroutine 就不处于 M 的控制下,在调用 releasem() 之后,当前的 goroutine 依然占用着当前的 P,并尚未从 P 中移除,所以在下面还会调用park_m()将当前 goroutine 从 M 和 P 中移除
  3. 接着,该函数通过 mcall() 执行 park_m() ,在park_m()中首先会将当前g设置为_Gwaiting 等待运行状态 , 然后执行传递进来的netpollblockcommit() 尝试将当前协程g保存到 pollDesc 的 rg 或者 wg 指针里,如果失败返回false,会重新将协程标记为_Grunnable可运行状态等待下一次调度运行, 最终会执行" schedule()"进行调度执行其他可运行的 goroutine
  1. netpollblockcommit() 函数内会通过cas也就是atomic.Casuintptr(),将当前协程g保存到 pollDesc 的 rg 或者 wg 指针里,进入阻塞状态并等待网络 i/o 事件的到来,如果通过atomic.Casuintptr()设置失败,说明 pollDesc 阻塞的 Goroutine 已经被其他线程唤醒或者当前 Goroutine 发生了异常,需要在下一次循环中重新尝试阻塞,此时函数会返回 false

4. 唤醒

  1. 在执行了Accept/Read/Write 操作后,如果没有关注的事件发生,会执行 gopark挂起goroutine,在gopark()函数执行到park_m(),该函数末尾会会调用一个scheduler()调度器函数
  2. 在启动go项目时底层执行runtime.rt0_go,该函数内会调用runtime.schedinit(SB) 初始化调度相关设置,调用runtime·mstart(SB)启动调度循环,在mstart()中也会调用schedule()开启调度
  3. runtime.schedule()是调度器运行的核心方法
  1. 执行getg()获取g0
  2. 执行sched.gcwaiting != 0,判断当前是不是正在GC,如果是则调用gcstopm休眠当前的M,挂起结束后继续执行
  3. 避免全局队列中的g被饿死,有一个if判断每隔61次调度轮回执行globrunqget(g.m.p.ptr(), 1)从全局队列找获取可执行g
  4. 如果上面获取不到,执行runqget()函数,在p.runnext获取g,如果获取不到则通过p的本地队列中获取,先获取队头的
  5. 如果没有获取到可运行的G,再调用 findrunnable()函数,从全局队列、epoll、别的 P 里获取获取g,找不到的话就将m休眠,等待唤醒
  6. 当找到一个g后,就会调用 execute 去执行g
  1. runtime.schedule()中会调用一个 runtime.findrunable() 方法,从全局队列、epoll、别的 P 里获取可运行的G,详细步骤:
  1. 调用 runqget():尝试从P本地队列中获取G,获取到返回
  2. 调用 globrunqget (): 尝试从全局队列中获取G,获取到返回
  3. 调用netpollinited()从网络IO轮询器中找到就绪的G,把这个G变为可运行的G
  4. 如果还是未获取到可运行的G, 会判断是否有其它非空闲的p,如果有,执行runqsteal()函数随机选一个P,尝试从这个P中allp[enum.position()]中偷去一半的G,注意有一个遍历,最多尝试四次
  5. 如果当前没有非空闲P,会判断当前M是否运行进入自旋状态,如果运行则进入自旋
  6. 未获取到可运行G,并且M没有进入自旋状态时跳入stop位置,判断此时P是否处于 GC mark 阶段,同时该 P 上存在一个后台 GC 标记工作的协程时,此时有待处理的标记任务,就会将该等待执行的标记协程转化成 runnable 状态,执行这个 Mark 任务
  7. 再次尝试从全局队列中,检查所有的P获取G,网络IO轮询器中获取可运行的G
  8. 还是获取不到会将P和M解绑,调用 stopm() 来休眠该M,那何时会重新唤醒该M呢?这就要看 wakep 函数了。一般来说新建一个goroutine或者有个goroutine准备好时,会调用 wakep 来唤醒M或者新建M
  1. 另外go项目在运行时,会单独启动一个sysmon监控线程,用来做抢占式调度等操作, 在循环执行过程中检查距离上一次 runtime.netpoll 被调用是否超过了 10ms,若是则会去调用它拿到可运行的 goroutine 列表并通过调用 injectglist 把 g 列表放入全局调度队列或者当前 P 本地调度队列等待被执行
  2. 查看 runtime.netpoll源码
  1. 创建size=128的epollevent数组
  2. 调用epollwait()从 epoll 的 eventpoll.rdllist双向列表中获取就绪的fd列表,到epollevent数组中,底层依赖epoll_wait系统调用函数
  3. 遍历epollevent数组也就是就绪的fd,调用netpollready,找到对应的goroutine,并将其状态从pdWait修改为pdReady
  4. 最后返回pdReady状态的 goroutine列表 gList
  5. 接下来将就绪的 goroutine 加入到调度队列中,等待调度运行
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

go 进阶 多路复用支持: 二. Accept/Read/Write 的相关文章

  • 6类典型场景的无线AP选型和部署方案

    你们好 我的网工朋友 前段时间刚给你们来了篇解决无线频繁断网的技术文 解决无线频繁断网 这个办法值得收藏 不少朋友私聊 说想再聊聊无线AP的选型和部署方案 这不就安排上了 无线网络覆盖项目中 无线AP的合理选型和部署非常重要 在设计施工中
  • Linux中如何查看开启了哪些端口?

    在Linux中 端口是设备与外界通讯交流的出口 常用于指TCP IP协议中的端口 其按照端口号可以分为三类 分别是 公认端口 注册端口 动态端口 那么Linux中如何查看开启了哪些端口 以下是常用命令介绍 1 使用netstat命令 net
  • 自定义编写zabbix_agent脚本

    vi usr lib systemd system zabbix agent servicce Unit Description Zabbix Agent After syslog target After network target S
  • 掌握内网渗透之道,成为实战高手,看《内网渗透实战攻略》就够了

    文末送书 文末送书 今天推荐一本网络安全领域优质书籍 内网渗透实战攻略 文章目录 前言 如何阅读本书 目录 文末送书 前言 当今 网络系统面临着越来越严峻的安全挑战 在众多的安全挑战中 一种有组织 有特定目标 长时间持续的新型网络攻击日益猖
  • 使用Hypothesis生成测试数据

    Hypothesis是Python的一个高级测试库 它允许编写 测试用例 时参数化 然后生成使测试失败的简单易懂的测试数据 可以用更少的工作在代码中发现更多的bug 安装 pip install hypothesis 如何设计 测试数据 通
  • 基于成本和服务质量考虑的不确定性下,电动汽车充电网络基础设施需求预测和迭代优化的分层框架研究(Python代码实现)

    欢迎来到本博客 博主优势 博客内容尽量做到思维缜密 逻辑清晰 为了方便读者 座右铭 行百里者 半于九十 本文目录如下 目录 1 概述 2 运行结果 3 参考文献 4 Python代码 数据
  • Web 安全漏洞之 OS 命令注入

    什么是 OS 命令注入 上周我们分享了一篇 Web 安全漏洞之 SQL 注入 其原理简单来说就是因为 SQL 是一种结构化字符串语言 攻击者利用可以随意构造语句的漏洞构造了开发者意料之外的语句 而今天要讲的 OS 命令注入其实原理和 SQL
  • 5个步骤,教你瞬间明白线程和线程安全

    记得今年3月份刚来杭州面试的时候 有一家公司的技术总监问了我这样一个问题 你来说说有哪些线程安全的类 我心里一想 这我早都背好了 稀里哗啦说了一大堆 他又接着问 那你再来说说什么是线程安全 然后我就GG了 说真的 我们整天说线程安全 但是对
  • 【信道估计】【MIMO】【FBMC】未来移动通信的滤波器组多载波调制方案(Matlab代码实现)

    欢迎来到本博客 博主优势 博客内容尽量做到思维缜密 逻辑清晰 为了方便读者 座右铭 行百里者 半于九十 本文目录如下 目录 1 概述 2 运行结果 3 参考文献 4 Matlab代码及文章
  • Python爬虫实战:IP代理池助你突破限制,高效采集数据

    当今互联网环境中 为了应对反爬虫 匿名访问或绕过某些地域限制等需求 IP代理池成为了一种常用的解决方案 IP代理池是一个包含多个可用代理IP地址的集合 可以通过该代理池随机选择可用IP地址来进行网络请求 IP代理池是一组可用的代理IP地址
  • Jmeter 性能-并发量计算

    并发概念 指网站在同一时间访问的人数 人数越大瞬间带宽要求更高 服务器并发量分为 业务并发用户数 最大并发访问数 系统用户数 同时在线用户数 估算业务并发量的公式 C nL T C C 3 C的平方根 说明 C是平均的业务并发用户数 n是l
  • 「网络安全渗透」如果你还不懂CSRF?这一篇让你彻底掌握

    1 什么是 CSRF 面试的时候的著名问题 谈一谈你对 CSRF 与 SSRF 区别的看法 这个问题 如果我们用非常通俗的语言讲的话 CSRF 更像是钓鱼的举动 是用户攻击用户的 而对于 SSRF 来说 是由服务器发出请求 用户 日 服务器
  • 如何使用Imagewheel搭建一个简单的的私人图床无公网ip也能访问

    文章目录 1 前言 2 Imagewheel网站搭建 2 1 Imagewheel下载和安装 2 2 Imagewheel网页测试 2 3 cpolar的安装和注册 3 本地网页发布 3 1 Cpolar临时数据隧道
  • 基于java的物业管理系统设计与实现

    基于java的物业管理系统设计与实现 I 引言 A 研究背景和动机 物业管理系统是指对物业进行管理和服务的系统 该系统需要具备对物业信息 人员信息 财务信息等进行管理的能力 基于Java的物业管理系统设计与实现的研究背景和动机主要体现在以下
  • 基于java的物业管理系统设计与实现

    基于java的物业管理系统设计与实现 I 引言 A 研究背景和动机 物业管理系统是指对物业进行管理和服务的系统 该系统需要具备对物业信息 人员信息 财务信息等进行管理的能力 基于Java的物业管理系统设计与实现的研究背景和动机主要体现在以下
  • 揭秘网络世界的幕后密码——Wireshark网络协议分析软件

    在我们日常生活中 计算机和互联网已经成为不可或缺的一部分 然而 很少有人真正了解网络背后复杂的工作原理和通信协议 幸运的是 有一款强大而实用的软件 Wireshark 可以帮助我们深入了解网络世界的幕后密码 Wireshark是一款免费的网
  • 【go语言】AST抽象语法树详解&实践之扫描代码生成错误码文档

    背景 为了能识别出代码中抛出错误码的地址和具体的错误码值 再根据错误码文件获取到错误码的具体值和注释 方便后续的排错 这里使用AST进行语法分析获取到代码中的目标对象 一 编译过程 在开始解析代码之前先补充了解一下编译过程 编译过程是将高级
  • 光波导结构

    摘要 增强现实和混合现实 AR MR 领域的新应用引起了人们对带有光栅区域的光波导系统的越来越多的关注 这些光波导系统用于输入和输出耦合以及扩瞳目的 VirtualLab Fusion为这类系统的仿真和设计提供了几个强大的工具 其中一个是具
  • 网工内推 | 上市公司同程、科达,五险一金,年终奖,最高12k*15薪

    01 同程旅行 招聘岗位 网络工程师 职责描述 1 负责职场 门店网络规划 建设 维护 2 负责网络安全及访问控制 上网行为管理和VPN设备的日常运维 3 负责内部相关网络自动化和系统化建设 4 优化与提升网络运行质量 制定应急预案 人员培
  • 【安全】网络安全态势感知

    文章目录 一 态势感知简介 1 概念 2 形象举例 3 应具备的能力 二 为什么要态势感知 为什么网络安全态势感知很重要 三 态势感知系统的功能 四 如何评估态势感知的建设结果 五 什么是态势感知的三个层级 四 业界的态势感知产品 1 安全

随机推荐

  • 【华为OD机试真题 JAVA】最多的连续胡杨棵树

    标题 最多的连续胡杨棵树 时间限制 1秒 内存限制 262144K 语言限制 不限 近些年来 我国防沙治沙取得显著成果 某沙漠新种植N棵胡杨 编号1 N 排成一排 一个月后 有M棵胡杨未能成活 现可补种胡杨K棵 请问如何补种 只能补种 不能
  • 【JS基础】通俗易懂的讲清楚去抖/防抖、节流。外加手写深度比较

    文章目录 去抖 防抖 思路解析 节流 两者在vue中结合计算属性使用 深度比较 去抖 防抖 去抖也叫防抖 为了照顾JS初学者的理解和记忆 我就简单的说明一下 我们生活中很多出现抖动的现象 都是没有规律的 例如人的发抖 树叶在风中的抖动 海浪
  • java mysql 断开连接_mysql java连接异常及断开解决秘籍

    3 The last packet sent successfully to the server was 0 milliseconds ago The driver has not received any packets from th
  • 前端一年的经验,面试官都会问一些什么问题呢?都是这样一些的问题

    面试准备阶段 学习以及复习基础知识 这一定是第一步需要做的事情 先制定规划 然后按照这一条既定的规划去学习以及复习 可分为六部分去准备 css部分 像 css这一部分 面试必问 但是它的东西很杂很多 我不知道有多少人和我感觉一样 学习前端最
  • Oracle中Delete和Commit操作的流程分析

    以后还会陆续加入其他各种操作的实现机制 1 删除 Delete 流程 Oracle读Block 数据块 到Buffer Cache 缓冲区 如果该Block在Buffer中不存在 在Redo Log Buffer 重做日志缓冲区 中记录De
  • Leetcode【DFS BFS】

    Leetcode 200 岛屿数量 题目 解题 思路 DFS解法 BFS解法 题目 给你一个由 1 陆地 和 0 水 组成的的二维网格 请你计算网格中岛屿的数量 岛屿总是被水包围 并且每座岛屿只能由水平方向和 或竖直方向上相邻的陆地连接形成
  • ES6 method写法与TypeError: is not a constructor

    公司前端最近开始强推ESlint 很多文件需要逐步修改为符合ESlint规则的形式 结果遇到了一个神奇的问题 有一段类似这样的代码 let obj init function el 此处ESlint检查提示 Expect method sh
  • k8s部署tomcat及web应用_在k8s部署tomcat

    小试牛刀 准备编排文件tomcat yaml 包含两部分 副本rc和service配置可为两个文件 不过我们此处合并为一个 rc副本相关 apiVersion extensions v1beta1 表示Deployment调度配置 kind
  • Keras默认权值初始化方式

    20230117 在最初使用Keras进行神经网络编程的时候 除了设置神经元个数 层数 或者激活函数之后 基本上对神经网络内部就不怎么管了 所以最后很多参数都是默认的 这种情况一般遇到的数据集问题 都能轻易解决 一般不是层数非常深的神经网络
  • 【华为OD统一考试A卷

    华为OD统一考试A卷 B卷 新题库说明 2023年5月份 华为官方已经将的 2022 0223Q 1 2 3 4 统一修改为OD统一考试 A卷 和OD统一考试 B卷 你收到的链接上面会标注A卷还是B卷 请注意 根据反馈 目前大部分收到的都是
  • Kali系统(Debian 10.3) 遇到的问题

    目录 问题一 Kali系统 相关技术网站 博客 文章 论坛 工具包 包跟踪 提交BUG 问题二 黑客入门 手痒地方 问题三 Kali系统 MySQL问题Can t connect to local MySQL server through
  • 边缘计算操作系统安装及测试实验报告

    边缘计算操作系统安装及测试 一 实验目的 二 实验环境 三 实验原理 1 系统组成部分 2 总体数据流程 四 实验步骤及结果 1 安装 Docker 和 Docker Compose 2 下载 EdgeX compose 文件 3 运行Ed
  • qt中clicked(bool checked)和toggled(bool checked)的区别

    先来看qt文档的解释 上面看出 共同点是 当点击按钮时 状态信号都会被发送 不同点 clicked this signal is not emitted if you call setDown setChecked or toggle to
  • 5年测试面试要20K,面试三个问题把我打发走了···

    都说金三银四 金九银十跳槽涨薪季 我是着急忙慌的准备简历 5年软件测试经验 可独立测试大型产品项目 熟悉项目测试流程 薪资要求 5年测试经验起码能要个20K吧 我加班肝了一页半简历 投出去一周 面试电话倒是不少 自信满满去面试 现场被问了这
  • Nmap源码分析(服务与版本扫描)

    Nmap源码分析 服务与版本扫描 2012年8月23日 在进行端口扫描后 Nmap可以进一步探测出运行在端口上的服务类型及应用程序的版本 目前Nmap可以识别几千种服务程序的签名 Signature 覆盖了180多种应用协议 比如 端口扫描
  • java写后端接口中mapper的一些操作

    文章目录 Mybatis Mapper的动态SQL语句问题 一 if 二 choose when otherwise 三 where 四 trim 元素来定制 where 元素的功能 五 set 动态地在行首插入 SET 关键字 六 for
  • PTA 7-4 统计学生平均成绩与及格人数 (15 分)

    本题要求编写程序 计算学生们的平均成绩 并统计及格 成绩不低于60分 的人数 题目保证输入与输出均在整型范围内 输入格式 输入在第一行中给出非负整数N 即学生人数 第二行给出N个非负整数 即这N位学生的成绩 其间以空格分隔 输出格式 按照以
  • C语言函数大全-- y 开头的函数

    y 开头的函数 1 yperror 1 1 函数说明 1 2 演示示例 2 yp match 2 1 函数说明 2 2 演示示例 3 y0 零阶第二类贝塞尔函数 3 1 函数说明 3 2 演示示例 3 3 运行结果 4 y1 一阶第二类贝塞
  • 在Vue中使用flex布局 echarts多图标不能自适应缩放问题

    前言 最近有个项目需要用到echarts绘制多个图表 需求是要支持大屏展示 还有需要支持不同比例的缩放和任意手动缩放 因此 深入学习了echarts和flex布局 虽然遇到很多问题 但都一一解决了收获良多 故此写下遇到的问题与坑 与之共勉
  • go 进阶 多路复用支持: 二. Accept/Read/Write

    目录 一 通过httpServer服务端引用Accept 二 Listener Accept 等待连接 三 Conn Read读数据 Conn Write写数据 四 gopark 阻塞 五 netpoll 唤醒等待队列中挂起的协程 什么时候