反应器(Reactor)模式-golang探索

2023-11-09

反应器模式

在以前的博文模式设计概述:反应器(Reactor)模式介绍过相关的概念和流程,当时使用了python但是从结果上来看并没有起到很明显的效果。最近在处理有关proxy的项目中,刚刚好涉及到有关性能的问题,故本文探索一下go的反应器模式的探索过程,当前比较知名的项目有两个一个是eviognet,都是反应器模式的很好的实现范例,特别是gnet在反应器模式上还加入了协程池从而比evio性能更好,本文就从头开始探索如何一步步优化改进。

go原生服务流程

package main

import (
	"fmt"
	"io"
	"log"
	"net"
)

func worker(conn net.Conn) {
	defer conn.Close()
	b := make([]byte, 512)
	for {
		size, err := conn.Read(b)
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatal(err)
		}

		fmt.Printf("Received %v bytes from %v\n", size, conn.RemoteAddr())
		size, err = conn.Write(b[0:size])
		if err != nil {
			log.Fatal(err)
		}
		fmt.Printf("Written %v bytes to %v\n", size, conn.RemoteAddr())
	}
}

func main() {
	listener, err := net.Listen("tcp", "127.0.0.1:1234")
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("Listering on %v\n", listener.Addr())
	for {
		conn, err := listener.Accept()
		if err != nil {
			log.Fatal(err)
		}
		fmt.Printf("Accepted connection to %v from %v\n", conn.LocalAddr(), conn.RemoteAddr())
		go worker(conn)
	}
}

此时go run main.go运行该程序,并在新开的终端中直接使用telnet 127.0.0.1 1234。运行效果就是发送的内容与返回内容相同。此时我们先探索一下原生的net库是的流程如何。

net流程-Listen

由于环境是mac环境故源码跳转的都是对应到了posix的相关接口下。

func Listen(network, address string) (Listener, error) {
	var lc ListenConfig
	return lc.Listen(context.Background(), network, address)
}

listen直接返回了ListenConfig通过Listen方法创建的实例。

func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
	addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil) // 解析地址
	if err != nil {
		return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}
	}
	sl := &sysListener{
		ListenConfig: *lc,
		network:      network,
		address:      address,
	}
	var l Listener
	la := addrs.first(isIPv4)    	// 检查监听的地址类型
	switch la := la.(type) {
	case *TCPAddr:
		l, err = sl.listenTCP(ctx, la)   	// 监听的是TCP
	case *UnixAddr:
		l, err = sl.listenUnix(ctx, la)   // 监听的是套接字
	default:
		return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}
	}
	if err != nil {
		return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer
	}
	return l, nil    // 返回创建的监听实例
}

此时可以看出整个流程跟平常的都是相同,先是解析监听地址,然后判断是否TCP或者Unix监听的类型。

func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
	fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)   // 初始化fd
	if err != nil {
		return nil, err
	}
	return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}


...

func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
	if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd" || runtime.GOOS == "nacl") && mode == "dial" && raddr.isWildcard() {
		raddr = raddr.toLocal(net)
	}
	family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
	return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)  // 根据不同平台设置地址并初始化
}


...

// socket returns a network file descriptor that is ready for
// asynchronous I/O using the network poller.
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
	s, err := sysSocket(family, sotype, proto)  // 初始化socket
	if err != nil {
		return nil, err
	}
	if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {  // 设置opt
		poll.CloseFunc(s)
		return nil, err
	}
	if fd, err = newFD(s, family, sotype, net); err != nil {  // 获取fd
		poll.CloseFunc(s)
		return nil, err
	}

	// This function makes a network file descriptor for the
	// following applications:
	//
	// - An endpoint holder that opens a passive stream
	//   connection, known as a stream listener
	//
	// - An endpoint holder that opens a destination-unspecific
	//   datagram connection, known as a datagram listener
	//
	// - An endpoint holder that opens an active stream or a
	//   destination-specific datagram connection, known as a
	//   dialer
	//
	// - An endpoint holder that opens the other connection, such
	//   as talking to the protocol stack inside the kernel
	//
	// For stream and datagram listeners, they will only require
	// named sockets, so we can assume that it's just a request
	// from stream or datagram listeners when laddr is not nil but
	// raddr is nil. Otherwise we assume it's just for dialers or
	// the other connection holders.

	if laddr != nil && raddr == nil {
		switch sotype {
		case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:     // 根据不同类型生成不同的监听类型
			if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
				fd.Close()
				return nil, err
			}
			return fd, nil
		case syscall.SOCK_DGRAM:
			if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
				fd.Close()
				return nil, err
			}
			return fd, nil
		}
	}
	if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {  // 设置对应的地址信息
		fd.Close()
		return nil, err
	}
	return fd, nil
}

至此,如果参数等都正确就初始化完成了一个监听的fd。接下来就是接受请求。

net流程-Accept
// TCPListener is a TCP network listener. Clients should typically
// use variables of type Listener instead of assuming TCP.
type TCPListener struct {
	fd *netFD   					// 初始化好的fd
	lc ListenConfig  			// 监听的配置
}

// SyscallConn returns a raw network connection.
// This implements the syscall.Conn interface.
//
// The returned RawConn only supports calling Control. Read and
// Write return an error.
func (l *TCPListener) SyscallConn() (syscall.RawConn, error) {
	if !l.ok() {
		return nil, syscall.EINVAL
	}
	return newRawListener(l.fd)
}

// AcceptTCP accepts the next incoming call and returns the new
// connection.
func (l *TCPListener) AcceptTCP() (*TCPConn, error) {
	if !l.ok() {
		return nil, syscall.EINVAL
	}
	c, err := l.accept()
	if err != nil {
		return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
	}
	return c, nil
}

// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
	if !l.ok() {
		return nil, syscall.EINVAL
	}
	c, err := l.accept()   // 接受请求
	if err != nil {
		return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
	}
	return c, nil
}

// Close stops listening on the TCP address.
// Already Accepted connections are not closed.
func (l *TCPListener) Close() error {  // 关闭请求
	if !l.ok() {
		return syscall.EINVAL
	}
	if err := l.close(); err != nil {
		return &OpError{Op: "close", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
	}
	return nil
}

// Addr returns the listener's network address, a *TCPAddr.
// The Addr returned is shared by all invocations of Addr, so
// do not modify it.
func (l *TCPListener) Addr() Addr { return l.fd.laddr }  // 获取地址

// SetDeadline sets the deadline associated with the listener.
// A zero time value disables the deadline.
func (l *TCPListener) SetDeadline(t time.Time) error {  // 设置超时时间
	if !l.ok() {
		return syscall.EINVAL
	}
	if err := l.fd.pfd.SetDeadline(t); err != nil {
		return &OpError{Op: "set", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
	}
	return nil
}

// File returns a copy of the underlying os.File.
// It is the caller's responsibility to close f when finished.
// Closing l does not affect f, and closing f does not affect l.
//
// The returned os.File's file descriptor is different from the
// connection's. Attempting to change properties of the original
// using this duplicate may or may not have the desired effect.
func (l *TCPListener) File() (f *os.File, err error) {   // 获取文件描述符
	if !l.ok() {
		return nil, syscall.EINVAL
	}
	f, err = l.file()
	if err != nil {
		return nil, &OpError{Op: "file", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
	}
	return
}

通过Accept、Close相关方法查看,都是调用了进一步封装的函数并且通过层层嵌套和不同系统的封装,其中会调用到如下函数。

// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
	if !l.ok() {
		return nil, syscall.EINVAL
	}
	c, err := l.accept()
	if err != nil {
		return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
	}
	return c, nil
}

...

func (ln *TCPListener) accept() (*TCPConn, error) {
	fd, err := ln.fd.accept()
	if err != nil {
		return nil, err
	}
	tc := newTCPConn(fd)
	if ln.lc.KeepAlive >= 0 {
		setKeepAlive(fd, true)
		ka := ln.lc.KeepAlive
		if ln.lc.KeepAlive == 0 {
			ka = defaultTCPKeepAlive
		}
		setKeepAlivePeriod(fd, ka)
	}
	return tc, nil
}

...

func (fd *netFD) accept() (netfd *netFD, err error) {
	d, rsa, errcall, err := fd.pfd.Accept()
	if err != nil {
		if errcall != "" {
			err = wrapSyscallError(errcall, err)
		}
		return nil, err
	}

	if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
		poll.CloseFunc(d)
		return nil, err
	}
	if err = netfd.init(); err != nil {   // 将该fd添加到系统的Poll中,监控是否有读写的事件
		fd.Close()
		return nil, err
	}
	lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
	netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
	return netfd, nil
}

...

func accept(s int) (int, syscall.Sockaddr, string, error) {
	// See ../syscall/exec_unix.go for description of ForkLock.
	// It is probably okay to hold the lock across syscall.Accept
	// because we have put fd.sysfd into non-blocking mode.
	// However, a call to the File method will put it back into
	// blocking mode. We can't take that risk, so no use of ForkLock here.
	ns, sa, err := AcceptFunc(s)
	if err == nil {
		syscall.CloseOnExec(ns)
	}
	if err != nil {
		return -1, nil, "accept", err
	}
	if err = syscall.SetNonblock(ns, true); err != nil {   // 接受完成的连接默认设置成了非阻塞
		CloseFunc(ns)
		return -1, nil, "setnonblock", err
	}
	return ns, sa, "", nil
}

...

// AcceptFunc is used to hook the accept call.
var AcceptFunc func(int) (int, syscall.Sockaddr, error) = syscall.Accept

...

//syscall/syscall_bsd.go
func Accept(fd int) (nfd int, sa Sockaddr, err error) {
	var rsa RawSockaddrAny
	var len _Socklen = SizeofSockaddrAny
	nfd, err = accept(fd, &rsa, &len)  // 接受连接 通过封装不同系统的syscall来接受连接
	if err != nil {
		return
	}
	if runtime.GOOS == "darwin" && len == 0 {
		// Accepted socket has no address.
		// This is likely due to a bug in xnu kernels,
		// where instead of ECONNABORTED error socket
		// is accepted, but has no address.
		Close(nfd)
		return 0, nil, ECONNABORTED
	}
	sa, err = anyToSockaddr(&rsa)
	if err != nil {
		Close(nfd)
		nfd = 0
	}
	return
}

...

func accept(s int, rsa *RawSockaddrAny, addrlen *_Socklen) (fd int, err error) {
	r0, _, e1 := syscall(funcPC(libc_accept_trampoline), uintptr(s), uintptr(unsafe.Pointer(rsa)), uintptr(unsafe.Pointer(addrlen)))  // 系统调用访问libc_accept_trampoline函数并传入参数
	fd = int(r0)
	if e1 != 0 {
		err = errnoErr(e1)
	}
	return
}

从Accept的流程可知,通过了层层的嵌套,最终分发到了syscall中的针对不同平台的accept的系统调用的封装,并且其他的函数也是如此封装调用。其中接受的请求通过netfd.init()函数默认的加入到系统的poll中,从而完成接受的事件通过go系统中的poll来进行事件通知与驱动。

性能对比-go原生与evio

go原生
// Copyright 2017 Joshua J Baker. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.

package main

import (
	"flag"
	"fmt"
	"io"
	"log"
	"net"
	"os"
	"strconv"
	"strings"
	"time"
)

var res string

type request struct {
	proto, method string
	path, query   string
	head, body    string
	remoteAddr    string
}



func worker(conn net.Conn) {
	defer conn.Close()
	data := make([]byte, 512)
	for {
		_, err := conn.Read(data)
		if err == io.EOF {
			break
		}
		if err != nil {
			//log.Fatal(err)
			return
		}
		var req request
		var out []byte
		leftover, err := parsereq(data, &req)
		if err != nil {
			// bad thing happened
			out = appendresp(out, "500 Error", "", err.Error()+"\n")
			break
		} else if len(leftover) == len(data) {
			// request not ready, yet
			break
		}
		// handle the request
		req.remoteAddr = conn.RemoteAddr().String()

		out = appendhandle(out, &req)
		data = leftover

		_, err = conn.Write(out)
		if err != nil {
			//log.Fatal(err)
		}
	}
}


func main() {
	var port int
	var loops int
	var aaaa bool
	var noparse bool
	var unixsocket string
	var stdlib bool

	flag.StringVar(&unixsocket, "unixsocket", "", "unix socket")
	flag.IntVar(&port, "port", 8080, "server port")
	flag.BoolVar(&aaaa, "aaaa", false, "aaaaa....")
	flag.BoolVar(&noparse, "noparse", true, "do not parse requests")
	flag.BoolVar(&stdlib, "stdlib", false, "use stdlib")
	flag.IntVar(&loops, "loops", 0, "num loops")
	flag.Parse()

	if os.Getenv("NOPARSE") == "1" {
		noparse = true
	}

	if aaaa {
		res = strings.Repeat("a", 1024)
	} else {
		res = "Hello World!\r\n"
	}


	listener, err := net.Listen("tcp", "127.0.0.1:8080")
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("Listering on %v\n", listener.Addr())
	for {
		conn, err := listener.Accept()
		if err != nil {
			log.Fatal(err)
		}
		go worker(conn)
	}
}

// appendhandle handles the incoming request and appends the response to
// the provided bytes, which is then returned to the caller.
func appendhandle(b []byte, req *request) []byte {
	return appendresp(b, "200 OK", "", res)
}

// appendresp will append a valid http response to the provide bytes.
// The status param should be the code plus text such as "200 OK".
// The head parameter should be a series of lines ending with "\r\n" or empty.
func appendresp(b []byte, status, head, body string) []byte {
	b = append(b, "HTTP/1.1"...)
	b = append(b, ' ')
	b = append(b, status...)
	b = append(b, '\r', '\n')
	b = append(b, "Server: evio\r\n"...)
	b = append(b, "Date: "...)
	b = time.Now().AppendFormat(b, "Mon, 02 Jan 2006 15:04:05 GMT")
	b = append(b, '\r', '\n')
	if len(body) > 0 {
		b = append(b, "Content-Length: "...)
		b = strconv.AppendInt(b, int64(len(body)), 10)
		b = append(b, '\r', '\n')
	}
	b = append(b, head...)
	b = append(b, '\r', '\n')
	if len(body) > 0 {
		b = append(b, body...)
	}
	return b
}

// parsereq is a very simple http request parser. This operation
// waits for the entire payload to be buffered before returning a
// valid request.
func parsereq(data []byte, req *request) (leftover []byte, err error) {
	sdata := string(data)
	var i, s int
	var top string
	var clen int
	var q = -1
	// method, path, proto line
	for ; i < len(sdata); i++ {
		if sdata[i] == ' ' {
			req.method = sdata[s:i]
			for i, s = i+1, i+1; i < len(sdata); i++ {
				if sdata[i] == '?' && q == -1 {
					q = i - s
				} else if sdata[i] == ' ' {
					if q != -1 {
						req.path = sdata[s:q]
						req.query = req.path[q+1 : i]
					} else {
						req.path = sdata[s:i]
					}
					for i, s = i+1, i+1; i < len(sdata); i++ {
						if sdata[i] == '\n' && sdata[i-1] == '\r' {
							req.proto = sdata[s:i]
							i, s = i+1, i+1
							break
						}
					}
					break
				}
			}
			break
		}
	}
	if req.proto == "" {
		return data, fmt.Errorf("malformed request")
	}
	top = sdata[:s]
	for ; i < len(sdata); i++ {
		if i > 1 && sdata[i] == '\n' && sdata[i-1] == '\r' {
			line := sdata[s : i-1]
			s = i + 1
			if line == "" {
				req.head = sdata[len(top)+2 : i+1]
				i++
				if clen > 0 {
					if len(sdata[i:]) < clen {
						break
					}
					req.body = sdata[i : i+clen]
					i += clen
				}
				return data[i:], nil
			}
			if strings.HasPrefix(line, "Content-Length:") {
				n, err := strconv.ParseInt(strings.TrimSpace(line[len("Content-Length:"):]), 10, 64)
				if err == nil {
					clen = int(n)
				}
			}
		}
	}
	// not enough data
	return data, nil
}

压测结果

 wrk -t8 -c200 -d60s --latency  http://127.0.0.1:8080
Running 1m test @ http://127.0.0.1:8080
  8 threads and 200 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     8.43ms   18.64ms 365.62ms   90.07%
    Req/Sec     9.13k     5.54k   40.76k    70.50%
  Latency Distribution
     50%    1.51ms
     75%    4.95ms
     90%   26.88ms
     99%   90.95ms
  4244898 requests in 1.00m, 421.02MB read
  Socket errors: connect 0, read 353878, write 30, timeout 0
Requests/sec:  70651.03
Transfer/sec:      7.01MB
evio代码
// Copyright 2017 Joshua J Baker. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.

package main

import (
	"bytes"
	"flag"
	"fmt"
	"log"
	"os"
	"strconv"
	"strings"
	"time"

	"github.com/tidwall/evio"
)

var res string

type request struct {
	proto, method string
	path, query   string
	head, body    string
	remoteAddr    string
}

func main() {
	var port int
	var loops int
	var aaaa bool
	var noparse bool
	var unixsocket string
	var stdlib bool

	flag.StringVar(&unixsocket, "unixsocket", "", "unix socket")
	flag.IntVar(&port, "port", 8080, "server port")
	flag.BoolVar(&aaaa, "aaaa", false, "aaaaa....")
	flag.BoolVar(&noparse, "noparse", true, "do not parse requests")
	flag.BoolVar(&stdlib, "stdlib", false, "use stdlib")
	flag.IntVar(&loops, "loops", 0, "num loops")
	flag.Parse()

	if os.Getenv("NOPARSE") == "1" {
		noparse = true
	}

	if aaaa {
		res = strings.Repeat("a", 1024)
	} else {
		res = "Hello World!\r\n"
	}

	var events evio.Events
	events.NumLoops = loops
	events.Serving = func(srv evio.Server) (action evio.Action) {
		log.Printf("http server started on port %d (loops: %d)", port, srv.NumLoops)
		if unixsocket != "" {
			log.Printf("http server started at %s", unixsocket)
		}
		if stdlib {
			log.Printf("stdlib")
		}
		return
	}

	events.Opened = func(c evio.Conn) (out []byte, opts evio.Options, action evio.Action) {
		c.SetContext(&evio.InputStream{})
		//log.Printf("opened: laddr: %v: raddr: %v", c.LocalAddr(), c.RemoteAddr())
		return
	}

	events.Closed = func(c evio.Conn, err error) (action evio.Action) {
		//log.Printf("closed: %s: %s", c.LocalAddr().String(), c.RemoteAddr().String())
		return
	}

	events.Data = func(c evio.Conn, in []byte) (out []byte, action evio.Action) {
		if in == nil {
			return
		}
		is := c.Context().(*evio.InputStream)
		data := is.Begin(in)
		if noparse && bytes.Contains(data, []byte("\r\n\r\n")) {
			// for testing minimal single packet request -> response.
			out = appendresp(nil, "200 OK", "", res)
			return
		}
		// process the pipeline
		var req request
		for {
			leftover, err := parsereq(data, &req)
			if err != nil {
				// bad thing happened
				out = appendresp(out, "500 Error", "", err.Error()+"\n")
				action = evio.Close
				break
			} else if len(leftover) == len(data) {
				// request not ready, yet
				break
			}
			// handle the request
			req.remoteAddr = c.RemoteAddr().String()
			out = appendhandle(out, &req)
			data = leftover
		}
		is.End(data)
		return
	}
	var ssuf string
	if stdlib {
		ssuf = "-net"
	}
	// We at least want the single http address.
	addrs := []string{fmt.Sprintf("tcp"+ssuf+"://:%d", port)}
	if unixsocket != "" {
		addrs = append(addrs, fmt.Sprintf("unix"+ssuf+"://%s", unixsocket))
	}
	// Start serving!
	log.Fatal(evio.Serve(events, addrs...))
}

// appendhandle handles the incoming request and appends the response to
// the provided bytes, which is then returned to the caller.
func appendhandle(b []byte, req *request) []byte {
	return appendresp(b, "200 OK", "", res)
}

// appendresp will append a valid http response to the provide bytes.
// The status param should be the code plus text such as "200 OK".
// The head parameter should be a series of lines ending with "\r\n" or empty.
func appendresp(b []byte, status, head, body string) []byte {
	b = append(b, "HTTP/1.1"...)
	b = append(b, ' ')
	b = append(b, status...)
	b = append(b, '\r', '\n')
	b = append(b, "Server: evio\r\n"...)
	b = append(b, "Date: "...)
	b = time.Now().AppendFormat(b, "Mon, 02 Jan 2006 15:04:05 GMT")
	b = append(b, '\r', '\n')
	if len(body) > 0 {
		b = append(b, "Content-Length: "...)
		b = strconv.AppendInt(b, int64(len(body)), 10)
		b = append(b, '\r', '\n')
	}
	b = append(b, head...)
	b = append(b, '\r', '\n')
	if len(body) > 0 {
		b = append(b, body...)
	}
	return b
}

// parsereq is a very simple http request parser. This operation
// waits for the entire payload to be buffered before returning a
// valid request.
func parsereq(data []byte, req *request) (leftover []byte, err error) {
	sdata := string(data)
	var i, s int
	var top string
	var clen int
	var q = -1
	// method, path, proto line
	for ; i < len(sdata); i++ {
		if sdata[i] == ' ' {
			req.method = sdata[s:i]
			for i, s = i+1, i+1; i < len(sdata); i++ {
				if sdata[i] == '?' && q == -1 {
					q = i - s
				} else if sdata[i] == ' ' {
					if q != -1 {
						req.path = sdata[s:q]
						req.query = req.path[q+1 : i]
					} else {
						req.path = sdata[s:i]
					}
					for i, s = i+1, i+1; i < len(sdata); i++ {
						if sdata[i] == '\n' && sdata[i-1] == '\r' {
							req.proto = sdata[s:i]
							i, s = i+1, i+1
							break
						}
					}
					break
				}
			}
			break
		}
	}
	if req.proto == "" {
		return data, fmt.Errorf("malformed request")
	}
	top = sdata[:s]
	for ; i < len(sdata); i++ {
		if i > 1 && sdata[i] == '\n' && sdata[i-1] == '\r' {
			line := sdata[s : i-1]
			s = i + 1
			if line == "" {
				req.head = sdata[len(top)+2 : i+1]
				i++
				if clen > 0 {
					if len(sdata[i:]) < clen {
						break
					}
					req.body = sdata[i : i+clen]
					i += clen
				}
				return data[i:], nil
			}
			if strings.HasPrefix(line, "Content-Length:") {
				n, err := strconv.ParseInt(strings.TrimSpace(line[len("Content-Length:"):]), 10, 64)
				if err == nil {
					clen = int(n)
				}
			}
		}
	}
	// not enough data
	return data, nil
}

启动过程中配置go run main_http_evio.go -loops=3,启动三个协程来进行处理。

通过wrk工具进行压测,测试命令wrk -t8 -c200 -d60s --latency http://127.0.0.1:8080

wrk -t8 -c200 -d60s --latency  http://127.0.0.1:8080
Running 1m test @ http://127.0.0.1:8080
  8 threads and 200 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     6.12ms   17.23ms 291.61ms   93.60%
    Req/Sec    13.83k     4.13k   46.02k    77.73%
  Latency Distribution
     50%    1.43ms
     75%    2.44ms
     90%   11.82ms
     99%   91.58ms
  6564343 requests in 1.00m, 651.07MB read
Requests/sec: 109237.85
Transfer/sec:     10.83MB
go原生 evio
请求总数 4244898 6564343
Qps 70651.03 109237.85

简单的性能对比可以看出,evio的实现模型比原生的要高大约四五十,具体来查看一下在go下面如何使用自己的事件驱动模型。

evio流程

查看的代码基于mac操作系统。

func Serve(events Events, addr ...string) error {
	var lns []*listener
	defer func() {
		for _, ln := range lns {
			ln.close()
		}
	}()
	var stdlib bool
	for _, addr := range addr {  // 检查是否监听的多个地址
		var ln listener
		var stdlibt bool
		ln.network, ln.addr, ln.opts, stdlibt = parseAddr(addr)
		if stdlibt {
			stdlib = true
		}
		if ln.network == "unix" {
			os.RemoveAll(ln.addr)
		}
		var err error
		if ln.network == "udp" {
			if ln.opts.reusePort {
				ln.pconn, err = reuseportListenPacket(ln.network, ln.addr)
			} else {
				ln.pconn, err = net.ListenPacket(ln.network, ln.addr)
			}
		} else {
			if ln.opts.reusePort {
				ln.ln, err = reuseportListen(ln.network, ln.addr)
			} else {
				ln.ln, err = net.Listen(ln.network, ln.addr)  // 获取监听的连接
			}
		}
		if err != nil {
			return err
		}
		if ln.pconn != nil {
			ln.lnaddr = ln.pconn.LocalAddr()
		} else {
			ln.lnaddr = ln.ln.Addr()
		}
		if !stdlib {
			if err := ln.system(); err != nil {
				return err
			}
		}
		lns = append(lns, &ln)
	}
	if stdlib {
		return stdserve(events, lns)
	}
	return serve(events, lns)   // 处理连接
}

...

func serve(events Events, listeners []*listener) error {
	// figure out the correct number of loops/goroutines to use.
	numLoops := events.NumLoops   // 获取开启的工作协程数量
	if numLoops <= 0 {
		if numLoops == 0 {
			numLoops = 1
		} else {
			numLoops = runtime.NumCPU()
		}
	}

	s := &server{}
	s.events = events
	s.lns = listeners
	s.cond = sync.NewCond(&sync.Mutex{})
	s.balance = events.LoadBalance  // 复杂均衡策略
	s.tch = make(chan time.Duration)

	//println("-- server starting")
	if s.events.Serving != nil {
		var svr Server
		svr.NumLoops = numLoops
		svr.Addrs = make([]net.Addr, len(listeners))
		for i, ln := range listeners {
			svr.Addrs[i] = ln.lnaddr
		}
		action := s.events.Serving(svr)
		switch action {
		case None:
		case Shutdown:
			return nil
		}
	}

	defer func() {
		// wait on a signal for shutdown
		s.waitForShutdown()    // 等待停止信号

		// notify all loops to close by closing all listeners
		for _, l := range s.loops {
			l.poll.Trigger(errClosing)
		}

		// wait on all loops to complete reading events
		s.wg.Wait()

		// close loops and all outstanding connections
		for _, l := range s.loops {  // 关闭所有还在的连接信息
			for _, c := range l.fdconns {
				loopCloseConn(s, l, c, nil)
			}
			l.poll.Close()
		}
		//println("-- server stopped")
	}()

	// create loops locally and bind the listeners.
	for i := 0; i < numLoops; i++ {   // 初始化多个loop,每个loop就是工作的协程
		l := &loop{
			idx:     i,
			poll:    internal.OpenPoll(),
			packet:  make([]byte, 0xFFFF),
			fdconns: make(map[int]*conn),
		}
		for _, ln := range listeners {
			l.poll.AddRead(ln.fd)    			// 监听server的id来监听新进来的连接
		}
		s.loops = append(s.loops, l)
	}
	// start loops in background
	s.wg.Add(len(s.loops))
	for _, l := range s.loops {
		go loopRun(s, l)     // 运行
	}
	return nil
}

...

func loopRun(s *server, l *loop) {
	defer func() {
		//fmt.Println("-- loop stopped --", l.idx)
		s.signalShutdown()
		s.wg.Done()
	}()

	if l.idx == 0 && s.events.Tick != nil {
		go loopTicker(s, l)
	}

	//fmt.Println("-- loop started --", l.idx)
	l.poll.Wait(func(fd int, note interface{}) error {
		if fd == 0 {
			return loopNote(s, l, note)
		}
		c := l.fdconns[fd]  
		switch {
		case c == nil:
			return loopAccept(s, l, fd)   // 处理新进来的请求
		case !c.opened:
			return loopOpened(s, l, c)
		case len(c.out) > 0:
			return loopWrite(s, l, c) 		// 处理写请求
		case c.action != None:
			return loopAction(s, l, c)
		default:
			return loopRead(s, l, c) 			// 处理读请求
		}
	})
}

...

// Poll ...
type Poll struct {
	fd      int
	changes []syscall.Kevent_t   // mac下面的事件驱动模型
	notes   noteQueue
}

// OpenPoll ...
func OpenPoll() *Poll {
	l := new(Poll)
	p, err := syscall.Kqueue()    // 生成事件实例
	if err != nil {
		panic(err)
	}
	l.fd = p
	_, err = syscall.Kevent(l.fd, []syscall.Kevent_t{{
		Ident:  0,
		Filter: syscall.EVFILT_USER,
		Flags:  syscall.EV_ADD | syscall.EV_CLEAR,
	}}, nil, nil)    // 添加server的读请求
	if err != nil {
		panic(err)
	}

	return l
}

// Close ...
func (p *Poll) Close() error {
	return syscall.Close(p.fd)   // 关闭fd
}

// Trigger ...
func (p *Poll) Trigger(note interface{}) error {
	p.notes.Add(note)
	_, err := syscall.Kevent(p.fd, []syscall.Kevent_t{{
		Ident:  0,
		Filter: syscall.EVFILT_USER,
		Fflags: syscall.NOTE_TRIGGER,
	}}, nil, nil)
	return err
}

// Wait ...
func (p *Poll) Wait(iter func(fd int, note interface{}) error) error {
	events := make([]syscall.Kevent_t, 128)
	for {
		n, err := syscall.Kevent(p.fd, p.changes, events, nil)  // 循环获取当前触发的事件
		if err != nil && err != syscall.EINTR {
			return err
		}
		p.changes = p.changes[:0] 			// 清空当前事件 
		if err := p.notes.ForEach(func(note interface{}) error {
			return iter(0, note) 
		}); err != nil {
			return err
		}
		for i := 0; i < n; i++ {
			if fd := int(events[i].Ident); fd != 0 {
				if err := iter(fd, nil); err != nil {   // 根据回调函数依次处理触发的事件循环
					return err
				}
			}
		}
	}
}

// AddRead ...
func (p *Poll) AddRead(fd int) {
	p.changes = append(p.changes,
		syscall.Kevent_t{
			Ident: uint64(fd), Flags: syscall.EV_ADD, Filter: syscall.EVFILT_READ,
		},
	)
}
...

从evio的整个流程可知,evio完全将网络io的所有事情自行处理,完全剥离了网络事件,从而自行实现了多sub-reactor的反应器模式,这也印证了为啥evio的性能比原生高的一个特点。

总结

本文只是简单的探索了一下go中的反应器模式的现状,原生的go服务是共用了go的netpoll(后续有机会会深入学习一下),所有的事件驱动与其他的协程调度等都在一起,而evio则是自己实现了事件驱动,将所有的事件驱动全部自己管理与调度,从而更加高效的响应请求,根据evio的思路大家有兴趣也可以尝试实现一个反应器模式的事件驱动器。由于本人才疏学浅,如有错误请批评指正。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

反应器(Reactor)模式-golang探索 的相关文章

  • Java EE 企业级应用 复习 Spring中Bean的管理

    Bean的实例化 什么是Bean的实例化 Spring容器自动地帮助我们生成对应的Bean对象 Bean的实例化方法 构造方法实例化 静态工厂实例化 实例工厂实例化 构造方法实例化 package com itheima public cl

随机推荐

  • http-server安装成功后,提示command not found

    版权声明 本文为博主原创文章 未经博主允许不得转载 http server安装成功后 提示command not found 如图所示 解决方法 执行vim zshrc 加上红框框住的内容 然后在项目目录下执行http server就可以了
  • 操作系统-在分页式管理方式下采用位示图来表示主存分配情况,实现主存空间的分配和回收。

    实验六 一 实验题目 在分页式管理方式下采用位示图来表示主存分配情况 实现主存空间的分配和回收 二 实验内容 1 分页式存储器把主存分成大小相等的若干块 作业的信息也按块的大小分页 作业装入主存时可把作业的信息按页分散存放在主存的空闲块中
  • UIUC同学Jia-Bin Huang收集的计算机视觉代码合集(ZZ)

    转自 http www cnblogs com idaidai archive 2012 03 01 2375800 html UIUC的Jia Bin Huang同学收集了很多计算机视觉方面的代码 链接如下 https netfiles
  • django2.x报错No module named 'django.core.urlresolvers'

    解决方法就是 from django urls import reverse 最近从django1 9迁移到django2 0中出现一个意外的报错 这个报错的原因在stack overflow上有很直接的解释 但是百度上并没有直接的答案 简
  • 华为OD机试真题--解压原始报文JavaScript

    1 题目 为了提升数据传输的效率 会对传输的报文进行压缩处理 输入一个压缩后的报文 请返回它解压后的原始报文 压缩规则 n str 表示方括号内部的 str 正好重复 n 次 注意 n 为正整数 0 lt n lt 100 str只包含小写
  • Python 字符串Ⅱ

    Python 字符串格式化 Python 支持格式化字符串的输出 尽管这样可能会用到非常复杂的表达式 但最基本的用法是将一个值插入到一个有字符串格式符 s 的字符串中 在 Python 中 字符串格式化使用与 C 中 sprintf 函数一
  • Python之算法与时间复杂度

    目录 一 算法的概念 1 1 算法是计算机处理信息的本质 二 时间复杂度T n 2 1 程序执行的基本操作与时间复杂度 2 3 大O记法 2 4 常见时间复杂度 2 5 时间复杂度的几条基本计算规则 重点 2 6 python内置类型时间复
  • Palindrome(补全回文串+最长公共子序列的应用)hdu1513+poj1159+动态规划

    Palindrome Time Limit 4000 2000 MS Java Others Memory Limit 65536 32768 K Java Others Total Submission s 4277 Accepted S
  • 计算机网络基础知识归纳总结整理

    计算机网络基础 基础知识 1 网络模型 OSI分层 7层 物理层 数据链路层 网络层 传输层 会话层 表示层 应用层 TCP IP分层 4层 网络接口层 网际层 运输层 应用层 五层协议 5层 物理层 数据链路层 网络层 运输层 应用层 每
  • typeid与decltype

    C 在C 98标准中就部分支持动态类型了 C 98对动态类型支持就是C 中的运行时类型识别RTTI RTTI的机制是为每个类型产生一个type info类型的数据 程序员可以在程序中使用typeid随时查询一个变量的类型 typeid就会返
  • modelsim crack找不到文件packages on . Failed to load package info... 找不到文件 - mgls.dll 找不到文件 - mgls64.dll

    问题描述 最近下载了Modelsim SE 64 2020 4版本 按照提示操作后显示找不到文件packages on Failed to load package info 找不到文件 mgls dll 找不到文件 mgls64 dll
  • MATLAB 软件功能简介

    MATLAB 的名称源自 Matrix Laboratory 1984 年由美国 Mathworks 公司推向市场 它是一种科学计算软件 专门以矩阵的形式处理数据 MATLAB 将高性能的数值计算和可 视化集成在一起 并提供了大量的内置函数
  • imx6ull移植mplayer

    linux开发板播放许嵩的温泉 本文在imx6ul上移植mplayer 软件包 https download csdn net download qq 32605451 12510469 文件包含mplayer zlib alsa lib和
  • java包机制

    包机制是java中管理类的重要手段 开发中 我们会遇到大量同名的类 通过包我们很容易对解决类重名的问题 也可以实现对类的有效管理 包对于类 相当于文件夹对于文件的作用 我们通过package实现对类的管理 package的使用有两个要点 1
  • VS2013写代码时几个常用的快捷键

    0 查看函数具体实现 说明 组合键是同时按 非组合键是按住Ctrl依次按后面的键 1 格式化 格式化全部代码 Ctrl A K F 格式化选中的代码 Ctrl K F 2 注释代码 注释代码 Ctrl K C comment 反注释代码 C
  • Python基本函数:np.multiply()

    Python基本函数 np multiply 一 函数说明 二 函数用法 格式 np multiply a b 注意 文中用到了arange dot reshape函数以及转置 T 一 函数说明 由于multiply是ufunc函数 ufu
  • 【C++】string类浅拷贝的解决方式

    1 浅拷贝 对内存地址的复制 让目标对象指针和源对象指向同一片内存空间 最终在释放的时候造成了多次释放导致程序崩溃 如果类中设计到资源管理时 用户必须要显式实现拷贝构造函数以及赋值运算符重载 因为编译器默认是按照浅拷贝的方式生成的 2 深拷
  • MySQL错误:1146-table 'mysql.proc' doesn't exist

    出现错误原因为 误删除了mysql数据库 解决方案 运行安装程序setup exe修复 repair 博主在删除其他数据库的时候 调用命令drop 误删mysql原数据库 使用mysql安装程序的修复解决 其他mysql命令 source
  • Interview Questions : Linux Device Drivers and Linux Kernel

    本文转载至 http priyaranjan technicalzone blogspot com 2014 01 interview questions embedded system html 1 Describe different
  • 反应器(Reactor)模式-golang探索

    反应器模式 在以前的博文模式设计概述 反应器 Reactor 模式介绍过相关的概念和流程 当时使用了python但是从结果上来看并没有起到很明显的效果 最近在处理有关proxy的项目中 刚刚好涉及到有关性能的问题 故本文探索一下go的反应器