反应器模式
在以前的博文模式设计概述:反应器(Reactor)模式介绍过相关的概念和流程,当时使用了python但是从结果上来看并没有起到很明显的效果。最近在处理有关proxy的项目中,刚刚好涉及到有关性能的问题,故本文探索一下go的反应器模式的探索过程,当前比较知名的项目有两个一个是evio和gnet,都是反应器模式的很好的实现范例,特别是gnet在反应器模式上还加入了协程池从而比evio性能更好,本文就从头开始探索如何一步步优化改进。
go原生服务流程
package main
import (
"fmt"
"io"
"log"
"net"
)
func worker(conn net.Conn) {
defer conn.Close()
b := make([]byte, 512)
for {
size, err := conn.Read(b)
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received %v bytes from %v\n", size, conn.RemoteAddr())
size, err = conn.Write(b[0:size])
if err != nil {
log.Fatal(err)
}
fmt.Printf("Written %v bytes to %v\n", size, conn.RemoteAddr())
}
}
func main() {
listener, err := net.Listen("tcp", "127.0.0.1:1234")
if err != nil {
log.Fatal(err)
}
fmt.Printf("Listering on %v\n", listener.Addr())
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal(err)
}
fmt.Printf("Accepted connection to %v from %v\n", conn.LocalAddr(), conn.RemoteAddr())
go worker(conn)
}
}
此时go run main.go运行该程序,并在新开的终端中直接使用telnet 127.0.0.1 1234。运行效果就是发送的内容与返回内容相同。此时我们先探索一下原生的net库是的流程如何。
net流程-Listen
由于环境是mac环境故源码跳转的都是对应到了posix的相关接口下。
func Listen(network, address string) (Listener, error) {
var lc ListenConfig
return lc.Listen(context.Background(), network, address)
}
listen直接返回了ListenConfig通过Listen方法创建的实例。
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil) // 解析地址
if err != nil {
return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}
}
sl := &sysListener{
ListenConfig: *lc,
network: network,
address: address,
}
var l Listener
la := addrs.first(isIPv4) // 检查监听的地址类型
switch la := la.(type) {
case *TCPAddr:
l, err = sl.listenTCP(ctx, la) // 监听的是TCP
case *UnixAddr:
l, err = sl.listenUnix(ctx, la) // 监听的是套接字
default:
return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}
}
if err != nil {
return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer
}
return l, nil // 返回创建的监听实例
}
此时可以看出整个流程跟平常的都是相同,先是解析监听地址,然后判断是否TCP或者Unix监听的类型。
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control) // 初始化fd
if err != nil {
return nil, err
}
return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}
...
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd" || runtime.GOOS == "nacl") && mode == "dial" && raddr.isWildcard() {
raddr = raddr.toLocal(net)
}
family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn) // 根据不同平台设置地址并初始化
}
...
// socket returns a network file descriptor that is ready for
// asynchronous I/O using the network poller.
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
s, err := sysSocket(family, sotype, proto) // 初始化socket
if err != nil {
return nil, err
}
if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil { // 设置opt
poll.CloseFunc(s)
return nil, err
}
if fd, err = newFD(s, family, sotype, net); err != nil { // 获取fd
poll.CloseFunc(s)
return nil, err
}
// This function makes a network file descriptor for the
// following applications:
//
// - An endpoint holder that opens a passive stream
// connection, known as a stream listener
//
// - An endpoint holder that opens a destination-unspecific
// datagram connection, known as a datagram listener
//
// - An endpoint holder that opens an active stream or a
// destination-specific datagram connection, known as a
// dialer
//
// - An endpoint holder that opens the other connection, such
// as talking to the protocol stack inside the kernel
//
// For stream and datagram listeners, they will only require
// named sockets, so we can assume that it's just a request
// from stream or datagram listeners when laddr is not nil but
// raddr is nil. Otherwise we assume it's just for dialers or
// the other connection holders.
if laddr != nil && raddr == nil {
switch sotype {
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET: // 根据不同类型生成不同的监听类型
if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
case syscall.SOCK_DGRAM:
if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
}
}
if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil { // 设置对应的地址信息
fd.Close()
return nil, err
}
return fd, nil
}
至此,如果参数等都正确就初始化完成了一个监听的fd。接下来就是接受请求。
net流程-Accept
// TCPListener is a TCP network listener. Clients should typically
// use variables of type Listener instead of assuming TCP.
type TCPListener struct {
fd *netFD // 初始化好的fd
lc ListenConfig // 监听的配置
}
// SyscallConn returns a raw network connection.
// This implements the syscall.Conn interface.
//
// The returned RawConn only supports calling Control. Read and
// Write return an error.
func (l *TCPListener) SyscallConn() (syscall.RawConn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
return newRawListener(l.fd)
}
// AcceptTCP accepts the next incoming call and returns the new
// connection.
func (l *TCPListener) AcceptTCP() (*TCPConn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}
// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept() // 接受请求
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}
// Close stops listening on the TCP address.
// Already Accepted connections are not closed.
func (l *TCPListener) Close() error { // 关闭请求
if !l.ok() {
return syscall.EINVAL
}
if err := l.close(); err != nil {
return &OpError{Op: "close", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return nil
}
// Addr returns the listener's network address, a *TCPAddr.
// The Addr returned is shared by all invocations of Addr, so
// do not modify it.
func (l *TCPListener) Addr() Addr { return l.fd.laddr } // 获取地址
// SetDeadline sets the deadline associated with the listener.
// A zero time value disables the deadline.
func (l *TCPListener) SetDeadline(t time.Time) error { // 设置超时时间
if !l.ok() {
return syscall.EINVAL
}
if err := l.fd.pfd.SetDeadline(t); err != nil {
return &OpError{Op: "set", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return nil
}
// File returns a copy of the underlying os.File.
// It is the caller's responsibility to close f when finished.
// Closing l does not affect f, and closing f does not affect l.
//
// The returned os.File's file descriptor is different from the
// connection's. Attempting to change properties of the original
// using this duplicate may or may not have the desired effect.
func (l *TCPListener) File() (f *os.File, err error) { // 获取文件描述符
if !l.ok() {
return nil, syscall.EINVAL
}
f, err = l.file()
if err != nil {
return nil, &OpError{Op: "file", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return
}
通过Accept、Close相关方法查看,都是调用了进一步封装的函数并且通过层层嵌套和不同系统的封装,其中会调用到如下函数。
// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}
...
func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}
...
func (fd *netFD) accept() (netfd *netFD, err error) {
d, rsa, errcall, err := fd.pfd.Accept()
if err != nil {
if errcall != "" {
err = wrapSyscallError(errcall, err)
}
return nil, err
}
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
poll.CloseFunc(d)
return nil, err
}
if err = netfd.init(); err != nil { // 将该fd添加到系统的Poll中,监控是否有读写的事件
fd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}
...
func accept(s int) (int, syscall.Sockaddr, string, error) {
// See ../syscall/exec_unix.go for description of ForkLock.
// It is probably okay to hold the lock across syscall.Accept
// because we have put fd.sysfd into non-blocking mode.
// However, a call to the File method will put it back into
// blocking mode. We can't take that risk, so no use of ForkLock here.
ns, sa, err := AcceptFunc(s)
if err == nil {
syscall.CloseOnExec(ns)
}
if err != nil {
return -1, nil, "accept", err
}
if err = syscall.SetNonblock(ns, true); err != nil { // 接受完成的连接默认设置成了非阻塞
CloseFunc(ns)
return -1, nil, "setnonblock", err
}
return ns, sa, "", nil
}
...
// AcceptFunc is used to hook the accept call.
var AcceptFunc func(int) (int, syscall.Sockaddr, error) = syscall.Accept
...
//syscall/syscall_bsd.go
func Accept(fd int) (nfd int, sa Sockaddr, err error) {
var rsa RawSockaddrAny
var len _Socklen = SizeofSockaddrAny
nfd, err = accept(fd, &rsa, &len) // 接受连接 通过封装不同系统的syscall来接受连接
if err != nil {
return
}
if runtime.GOOS == "darwin" && len == 0 {
// Accepted socket has no address.
// This is likely due to a bug in xnu kernels,
// where instead of ECONNABORTED error socket
// is accepted, but has no address.
Close(nfd)
return 0, nil, ECONNABORTED
}
sa, err = anyToSockaddr(&rsa)
if err != nil {
Close(nfd)
nfd = 0
}
return
}
...
func accept(s int, rsa *RawSockaddrAny, addrlen *_Socklen) (fd int, err error) {
r0, _, e1 := syscall(funcPC(libc_accept_trampoline), uintptr(s), uintptr(unsafe.Pointer(rsa)), uintptr(unsafe.Pointer(addrlen))) // 系统调用访问libc_accept_trampoline函数并传入参数
fd = int(r0)
if e1 != 0 {
err = errnoErr(e1)
}
return
}
从Accept的流程可知,通过了层层的嵌套,最终分发到了syscall中的针对不同平台的accept的系统调用的封装,并且其他的函数也是如此封装调用。其中接受的请求通过netfd.init()函数默认的加入到系统的poll中,从而完成接受的事件通过go系统中的poll来进行事件通知与驱动。
性能对比-go原生与evio
go原生
// Copyright 2017 Joshua J Baker. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package main
import (
"flag"
"fmt"
"io"
"log"
"net"
"os"
"strconv"
"strings"
"time"
)
var res string
type request struct {
proto, method string
path, query string
head, body string
remoteAddr string
}
func worker(conn net.Conn) {
defer conn.Close()
data := make([]byte, 512)
for {
_, err := conn.Read(data)
if err == io.EOF {
break
}
if err != nil {
//log.Fatal(err)
return
}
var req request
var out []byte
leftover, err := parsereq(data, &req)
if err != nil {
// bad thing happened
out = appendresp(out, "500 Error", "", err.Error()+"\n")
break
} else if len(leftover) == len(data) {
// request not ready, yet
break
}
// handle the request
req.remoteAddr = conn.RemoteAddr().String()
out = appendhandle(out, &req)
data = leftover
_, err = conn.Write(out)
if err != nil {
//log.Fatal(err)
}
}
}
func main() {
var port int
var loops int
var aaaa bool
var noparse bool
var unixsocket string
var stdlib bool
flag.StringVar(&unixsocket, "unixsocket", "", "unix socket")
flag.IntVar(&port, "port", 8080, "server port")
flag.BoolVar(&aaaa, "aaaa", false, "aaaaa....")
flag.BoolVar(&noparse, "noparse", true, "do not parse requests")
flag.BoolVar(&stdlib, "stdlib", false, "use stdlib")
flag.IntVar(&loops, "loops", 0, "num loops")
flag.Parse()
if os.Getenv("NOPARSE") == "1" {
noparse = true
}
if aaaa {
res = strings.Repeat("a", 1024)
} else {
res = "Hello World!\r\n"
}
listener, err := net.Listen("tcp", "127.0.0.1:8080")
if err != nil {
log.Fatal(err)
}
fmt.Printf("Listering on %v\n", listener.Addr())
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal(err)
}
go worker(conn)
}
}
// appendhandle handles the incoming request and appends the response to
// the provided bytes, which is then returned to the caller.
func appendhandle(b []byte, req *request) []byte {
return appendresp(b, "200 OK", "", res)
}
// appendresp will append a valid http response to the provide bytes.
// The status param should be the code plus text such as "200 OK".
// The head parameter should be a series of lines ending with "\r\n" or empty.
func appendresp(b []byte, status, head, body string) []byte {
b = append(b, "HTTP/1.1"...)
b = append(b, ' ')
b = append(b, status...)
b = append(b, '\r', '\n')
b = append(b, "Server: evio\r\n"...)
b = append(b, "Date: "...)
b = time.Now().AppendFormat(b, "Mon, 02 Jan 2006 15:04:05 GMT")
b = append(b, '\r', '\n')
if len(body) > 0 {
b = append(b, "Content-Length: "...)
b = strconv.AppendInt(b, int64(len(body)), 10)
b = append(b, '\r', '\n')
}
b = append(b, head...)
b = append(b, '\r', '\n')
if len(body) > 0 {
b = append(b, body...)
}
return b
}
// parsereq is a very simple http request parser. This operation
// waits for the entire payload to be buffered before returning a
// valid request.
func parsereq(data []byte, req *request) (leftover []byte, err error) {
sdata := string(data)
var i, s int
var top string
var clen int
var q = -1
// method, path, proto line
for ; i < len(sdata); i++ {
if sdata[i] == ' ' {
req.method = sdata[s:i]
for i, s = i+1, i+1; i < len(sdata); i++ {
if sdata[i] == '?' && q == -1 {
q = i - s
} else if sdata[i] == ' ' {
if q != -1 {
req.path = sdata[s:q]
req.query = req.path[q+1 : i]
} else {
req.path = sdata[s:i]
}
for i, s = i+1, i+1; i < len(sdata); i++ {
if sdata[i] == '\n' && sdata[i-1] == '\r' {
req.proto = sdata[s:i]
i, s = i+1, i+1
break
}
}
break
}
}
break
}
}
if req.proto == "" {
return data, fmt.Errorf("malformed request")
}
top = sdata[:s]
for ; i < len(sdata); i++ {
if i > 1 && sdata[i] == '\n' && sdata[i-1] == '\r' {
line := sdata[s : i-1]
s = i + 1
if line == "" {
req.head = sdata[len(top)+2 : i+1]
i++
if clen > 0 {
if len(sdata[i:]) < clen {
break
}
req.body = sdata[i : i+clen]
i += clen
}
return data[i:], nil
}
if strings.HasPrefix(line, "Content-Length:") {
n, err := strconv.ParseInt(strings.TrimSpace(line[len("Content-Length:"):]), 10, 64)
if err == nil {
clen = int(n)
}
}
}
}
// not enough data
return data, nil
}
压测结果
wrk -t8 -c200 -d60s --latency http://127.0.0.1:8080
Running 1m test @ http://127.0.0.1:8080
8 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 8.43ms 18.64ms 365.62ms 90.07%
Req/Sec 9.13k 5.54k 40.76k 70.50%
Latency Distribution
50% 1.51ms
75% 4.95ms
90% 26.88ms
99% 90.95ms
4244898 requests in 1.00m, 421.02MB read
Socket errors: connect 0, read 353878, write 30, timeout 0
Requests/sec: 70651.03
Transfer/sec: 7.01MB
evio代码
// Copyright 2017 Joshua J Baker. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package main
import (
"bytes"
"flag"
"fmt"
"log"
"os"
"strconv"
"strings"
"time"
"github.com/tidwall/evio"
)
var res string
type request struct {
proto, method string
path, query string
head, body string
remoteAddr string
}
func main() {
var port int
var loops int
var aaaa bool
var noparse bool
var unixsocket string
var stdlib bool
flag.StringVar(&unixsocket, "unixsocket", "", "unix socket")
flag.IntVar(&port, "port", 8080, "server port")
flag.BoolVar(&aaaa, "aaaa", false, "aaaaa....")
flag.BoolVar(&noparse, "noparse", true, "do not parse requests")
flag.BoolVar(&stdlib, "stdlib", false, "use stdlib")
flag.IntVar(&loops, "loops", 0, "num loops")
flag.Parse()
if os.Getenv("NOPARSE") == "1" {
noparse = true
}
if aaaa {
res = strings.Repeat("a", 1024)
} else {
res = "Hello World!\r\n"
}
var events evio.Events
events.NumLoops = loops
events.Serving = func(srv evio.Server) (action evio.Action) {
log.Printf("http server started on port %d (loops: %d)", port, srv.NumLoops)
if unixsocket != "" {
log.Printf("http server started at %s", unixsocket)
}
if stdlib {
log.Printf("stdlib")
}
return
}
events.Opened = func(c evio.Conn) (out []byte, opts evio.Options, action evio.Action) {
c.SetContext(&evio.InputStream{})
//log.Printf("opened: laddr: %v: raddr: %v", c.LocalAddr(), c.RemoteAddr())
return
}
events.Closed = func(c evio.Conn, err error) (action evio.Action) {
//log.Printf("closed: %s: %s", c.LocalAddr().String(), c.RemoteAddr().String())
return
}
events.Data = func(c evio.Conn, in []byte) (out []byte, action evio.Action) {
if in == nil {
return
}
is := c.Context().(*evio.InputStream)
data := is.Begin(in)
if noparse && bytes.Contains(data, []byte("\r\n\r\n")) {
// for testing minimal single packet request -> response.
out = appendresp(nil, "200 OK", "", res)
return
}
// process the pipeline
var req request
for {
leftover, err := parsereq(data, &req)
if err != nil {
// bad thing happened
out = appendresp(out, "500 Error", "", err.Error()+"\n")
action = evio.Close
break
} else if len(leftover) == len(data) {
// request not ready, yet
break
}
// handle the request
req.remoteAddr = c.RemoteAddr().String()
out = appendhandle(out, &req)
data = leftover
}
is.End(data)
return
}
var ssuf string
if stdlib {
ssuf = "-net"
}
// We at least want the single http address.
addrs := []string{fmt.Sprintf("tcp"+ssuf+"://:%d", port)}
if unixsocket != "" {
addrs = append(addrs, fmt.Sprintf("unix"+ssuf+"://%s", unixsocket))
}
// Start serving!
log.Fatal(evio.Serve(events, addrs...))
}
// appendhandle handles the incoming request and appends the response to
// the provided bytes, which is then returned to the caller.
func appendhandle(b []byte, req *request) []byte {
return appendresp(b, "200 OK", "", res)
}
// appendresp will append a valid http response to the provide bytes.
// The status param should be the code plus text such as "200 OK".
// The head parameter should be a series of lines ending with "\r\n" or empty.
func appendresp(b []byte, status, head, body string) []byte {
b = append(b, "HTTP/1.1"...)
b = append(b, ' ')
b = append(b, status...)
b = append(b, '\r', '\n')
b = append(b, "Server: evio\r\n"...)
b = append(b, "Date: "...)
b = time.Now().AppendFormat(b, "Mon, 02 Jan 2006 15:04:05 GMT")
b = append(b, '\r', '\n')
if len(body) > 0 {
b = append(b, "Content-Length: "...)
b = strconv.AppendInt(b, int64(len(body)), 10)
b = append(b, '\r', '\n')
}
b = append(b, head...)
b = append(b, '\r', '\n')
if len(body) > 0 {
b = append(b, body...)
}
return b
}
// parsereq is a very simple http request parser. This operation
// waits for the entire payload to be buffered before returning a
// valid request.
func parsereq(data []byte, req *request) (leftover []byte, err error) {
sdata := string(data)
var i, s int
var top string
var clen int
var q = -1
// method, path, proto line
for ; i < len(sdata); i++ {
if sdata[i] == ' ' {
req.method = sdata[s:i]
for i, s = i+1, i+1; i < len(sdata); i++ {
if sdata[i] == '?' && q == -1 {
q = i - s
} else if sdata[i] == ' ' {
if q != -1 {
req.path = sdata[s:q]
req.query = req.path[q+1 : i]
} else {
req.path = sdata[s:i]
}
for i, s = i+1, i+1; i < len(sdata); i++ {
if sdata[i] == '\n' && sdata[i-1] == '\r' {
req.proto = sdata[s:i]
i, s = i+1, i+1
break
}
}
break
}
}
break
}
}
if req.proto == "" {
return data, fmt.Errorf("malformed request")
}
top = sdata[:s]
for ; i < len(sdata); i++ {
if i > 1 && sdata[i] == '\n' && sdata[i-1] == '\r' {
line := sdata[s : i-1]
s = i + 1
if line == "" {
req.head = sdata[len(top)+2 : i+1]
i++
if clen > 0 {
if len(sdata[i:]) < clen {
break
}
req.body = sdata[i : i+clen]
i += clen
}
return data[i:], nil
}
if strings.HasPrefix(line, "Content-Length:") {
n, err := strconv.ParseInt(strings.TrimSpace(line[len("Content-Length:"):]), 10, 64)
if err == nil {
clen = int(n)
}
}
}
}
// not enough data
return data, nil
}
启动过程中配置go run main_http_evio.go -loops=3,启动三个协程来进行处理。
通过wrk工具进行压测,测试命令wrk -t8 -c200 -d60s --latency http://127.0.0.1:8080
wrk -t8 -c200 -d60s --latency http://127.0.0.1:8080
Running 1m test @ http://127.0.0.1:8080
8 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 6.12ms 17.23ms 291.61ms 93.60%
Req/Sec 13.83k 4.13k 46.02k 77.73%
Latency Distribution
50% 1.43ms
75% 2.44ms
90% 11.82ms
99% 91.58ms
6564343 requests in 1.00m, 651.07MB read
Requests/sec: 109237.85
Transfer/sec: 10.83MB
|
go原生 |
evio |
请求总数 |
4244898 |
6564343 |
Qps |
70651.03 |
109237.85 |
简单的性能对比可以看出,evio的实现模型比原生的要高大约四五十,具体来查看一下在go下面如何使用自己的事件驱动模型。
evio流程
查看的代码基于mac操作系统。
func Serve(events Events, addr ...string) error {
var lns []*listener
defer func() {
for _, ln := range lns {
ln.close()
}
}()
var stdlib bool
for _, addr := range addr { // 检查是否监听的多个地址
var ln listener
var stdlibt bool
ln.network, ln.addr, ln.opts, stdlibt = parseAddr(addr)
if stdlibt {
stdlib = true
}
if ln.network == "unix" {
os.RemoveAll(ln.addr)
}
var err error
if ln.network == "udp" {
if ln.opts.reusePort {
ln.pconn, err = reuseportListenPacket(ln.network, ln.addr)
} else {
ln.pconn, err = net.ListenPacket(ln.network, ln.addr)
}
} else {
if ln.opts.reusePort {
ln.ln, err = reuseportListen(ln.network, ln.addr)
} else {
ln.ln, err = net.Listen(ln.network, ln.addr) // 获取监听的连接
}
}
if err != nil {
return err
}
if ln.pconn != nil {
ln.lnaddr = ln.pconn.LocalAddr()
} else {
ln.lnaddr = ln.ln.Addr()
}
if !stdlib {
if err := ln.system(); err != nil {
return err
}
}
lns = append(lns, &ln)
}
if stdlib {
return stdserve(events, lns)
}
return serve(events, lns) // 处理连接
}
...
func serve(events Events, listeners []*listener) error {
// figure out the correct number of loops/goroutines to use.
numLoops := events.NumLoops // 获取开启的工作协程数量
if numLoops <= 0 {
if numLoops == 0 {
numLoops = 1
} else {
numLoops = runtime.NumCPU()
}
}
s := &server{}
s.events = events
s.lns = listeners
s.cond = sync.NewCond(&sync.Mutex{})
s.balance = events.LoadBalance // 复杂均衡策略
s.tch = make(chan time.Duration)
//println("-- server starting")
if s.events.Serving != nil {
var svr Server
svr.NumLoops = numLoops
svr.Addrs = make([]net.Addr, len(listeners))
for i, ln := range listeners {
svr.Addrs[i] = ln.lnaddr
}
action := s.events.Serving(svr)
switch action {
case None:
case Shutdown:
return nil
}
}
defer func() {
// wait on a signal for shutdown
s.waitForShutdown() // 等待停止信号
// notify all loops to close by closing all listeners
for _, l := range s.loops {
l.poll.Trigger(errClosing)
}
// wait on all loops to complete reading events
s.wg.Wait()
// close loops and all outstanding connections
for _, l := range s.loops { // 关闭所有还在的连接信息
for _, c := range l.fdconns {
loopCloseConn(s, l, c, nil)
}
l.poll.Close()
}
//println("-- server stopped")
}()
// create loops locally and bind the listeners.
for i := 0; i < numLoops; i++ { // 初始化多个loop,每个loop就是工作的协程
l := &loop{
idx: i,
poll: internal.OpenPoll(),
packet: make([]byte, 0xFFFF),
fdconns: make(map[int]*conn),
}
for _, ln := range listeners {
l.poll.AddRead(ln.fd) // 监听server的id来监听新进来的连接
}
s.loops = append(s.loops, l)
}
// start loops in background
s.wg.Add(len(s.loops))
for _, l := range s.loops {
go loopRun(s, l) // 运行
}
return nil
}
...
func loopRun(s *server, l *loop) {
defer func() {
//fmt.Println("-- loop stopped --", l.idx)
s.signalShutdown()
s.wg.Done()
}()
if l.idx == 0 && s.events.Tick != nil {
go loopTicker(s, l)
}
//fmt.Println("-- loop started --", l.idx)
l.poll.Wait(func(fd int, note interface{}) error {
if fd == 0 {
return loopNote(s, l, note)
}
c := l.fdconns[fd]
switch {
case c == nil:
return loopAccept(s, l, fd) // 处理新进来的请求
case !c.opened:
return loopOpened(s, l, c)
case len(c.out) > 0:
return loopWrite(s, l, c) // 处理写请求
case c.action != None:
return loopAction(s, l, c)
default:
return loopRead(s, l, c) // 处理读请求
}
})
}
...
// Poll ...
type Poll struct {
fd int
changes []syscall.Kevent_t // mac下面的事件驱动模型
notes noteQueue
}
// OpenPoll ...
func OpenPoll() *Poll {
l := new(Poll)
p, err := syscall.Kqueue() // 生成事件实例
if err != nil {
panic(err)
}
l.fd = p
_, err = syscall.Kevent(l.fd, []syscall.Kevent_t{{
Ident: 0,
Filter: syscall.EVFILT_USER,
Flags: syscall.EV_ADD | syscall.EV_CLEAR,
}}, nil, nil) // 添加server的读请求
if err != nil {
panic(err)
}
return l
}
// Close ...
func (p *Poll) Close() error {
return syscall.Close(p.fd) // 关闭fd
}
// Trigger ...
func (p *Poll) Trigger(note interface{}) error {
p.notes.Add(note)
_, err := syscall.Kevent(p.fd, []syscall.Kevent_t{{
Ident: 0,
Filter: syscall.EVFILT_USER,
Fflags: syscall.NOTE_TRIGGER,
}}, nil, nil)
return err
}
// Wait ...
func (p *Poll) Wait(iter func(fd int, note interface{}) error) error {
events := make([]syscall.Kevent_t, 128)
for {
n, err := syscall.Kevent(p.fd, p.changes, events, nil) // 循环获取当前触发的事件
if err != nil && err != syscall.EINTR {
return err
}
p.changes = p.changes[:0] // 清空当前事件
if err := p.notes.ForEach(func(note interface{}) error {
return iter(0, note)
}); err != nil {
return err
}
for i := 0; i < n; i++ {
if fd := int(events[i].Ident); fd != 0 {
if err := iter(fd, nil); err != nil { // 根据回调函数依次处理触发的事件循环
return err
}
}
}
}
}
// AddRead ...
func (p *Poll) AddRead(fd int) {
p.changes = append(p.changes,
syscall.Kevent_t{
Ident: uint64(fd), Flags: syscall.EV_ADD, Filter: syscall.EVFILT_READ,
},
)
}
...
从evio的整个流程可知,evio完全将网络io的所有事情自行处理,完全剥离了网络事件,从而自行实现了多sub-reactor的反应器模式,这也印证了为啥evio的性能比原生高的一个特点。
总结
本文只是简单的探索了一下go中的反应器模式的现状,原生的go服务是共用了go的netpoll(后续有机会会深入学习一下),所有的事件驱动与其他的协程调度等都在一起,而evio则是自己实现了事件驱动,将所有的事件驱动全部自己管理与调度,从而更加高效的响应请求,根据evio的思路大家有兴趣也可以尝试实现一个反应器模式的事件驱动器。由于本人才疏学浅,如有错误请批评指正。