以太坊 p2p Server 原理及实现

2023-11-01

以太坊p2p原理与实现

区块链技术的去中心依赖于底层组网技术,以太坊的底层实现了p2pServer,大约可以分为这样三层。

  • 底层路由表。封装了kad路由,节点的数据结构以及计算记录,节点搜索,验证等功能。
  • 中层peer抽象,message开放发送接口,server对外提供peer检测,初始化,事件订阅,peer状态查询,启动,停止等功能
  • 以太坊最上层peer,peerset再封装,通过协议的Run函数,在中层启动peer时,获取peer,最终通过一个循环截取稳定peer,包装在peerset中使用。

底层路由表

这里简化问题仅讨论Node Discovery Protocol。 这一层维护了一个buckets桶,总共有17个桶,每个桶有16个节点和10个替换节点。 Node放入时先要计算hash和localNode的距离。再按距离选择一个桶放进去,取的时候逐个计算target和每个桶中对象的举例,详细参考closest函数,后面会贴出来。

距离公式满足:f(x,y)=256-8*n-map(x[n+1]^y[n+1]) 注:n为相同节点数量 map为一个负相关的映射关系。

简单来说就是相似越多,值越小。细节参考Node.go的logdist函数。 这里需要了解算法Kademlia,

.
├── database.go         //封装node数据库相关操作
├── node.go             //节点数据结构
├── ntp.go              //同步时间  
├── table.go            //路由表
├── udp.go              //网络相关操作

其中最重要的就是table对象,table公共方法有:

  • newTable 实例创建
  • Self local节点获取
  • ReadRandomNodes 随机读取几个节点
  • Close 关闭
  • Resolve 在周边查找某个节点
  • Lookup 查找某个节点的邻近节点

逐个来分析这些方法:

newTable

  • 1:生成对象实例(获取数据库客户端,LocalNode etc)
    // If no node database was given, use an in-memory one
    db, err := newNodeDB(nodeDBPath, Version, ourID)
    if err != nil {
        return nil, err
    }
    tab := &Table{
        net:        t,
        db:         db,
        self:       NewNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)),
        bonding:    make(map[NodeID]*bondproc),
        bondslots:  make(chan struct{}, maxBondingPingPongs),
        refreshReq: make(chan chan struct{}),
        initDone:   make(chan struct{}),
        closeReq:   make(chan struct{}),
        closed:     make(chan struct{}),
        rand:       mrand.New(mrand.NewSource(0)),
        ips:        netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
    }
  • 2:载入引导节点,初始化k桶。
    if err := tab.setFallbackNodes(bootnodes); err != nil {
        return nil, err
    }
    for i := 0; i < cap(tab.bondslots); i++ {
        tab.bondslots <- struct{}{}
    }
    for i := range tab.buckets {
        tab.buckets[i] = &bucket{
            ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
        }
    }
  • 3:将节点放入到桶里,生成一条协程用于刷新,验证节点。
    tab.seedRand()
    tab.loadSeedNodes(false)  //载入种子节点
    // Start the background expiration goroutine after loading seeds so that the search for
    // seed nodes also considers older nodes that would otherwise be removed by the
    // expiration.
    tab.db.ensureExpirer()
    go tab.loop()

载入种子节点

    func (tab *Table) loadSeedNodes(bond bool) {
        seeds := tab.db.querySeeds(seedCount, seedMaxAge)
        //数据库中的种子节点和引导节点合并
        seeds = append(seeds, tab.nursery...) 
        if bond {
            seeds = tab.bondall(seeds)   //节点验证
        }
        for i := range seeds {
            seed := seeds[i]
            age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.bondTime(seed.ID)) }}
            log.Debug("Found seed node in database", "id", seed.ID, "addr", seed.addr(), "age", age)
            tab.add(seed)               //节点入桶
        }
    }

节点入桶,同时也要检查ip等限制。

    func (tab *Table) add(new *Node) {
        tab.mutex.Lock()
        defer tab.mutex.Unlock()

        b := tab.bucket(new.sha)   //获取当前节点对应的桶
        if !tab.bumpOrAdd(b, new) {
            // Node is not in table. Add it to the replacement list.
            tab.addReplacement(b, new)
        }
    }

桶的选择

    func (tab *Table) bucket(sha common.Hash) *bucket {
        d := logdist(tab.self.sha, sha)  //计算hash举例
        if d <= bucketMinDistance {
            //这里按算法来看,只要hash前三位相等就会到第一个buckets
            return tab.buckets[0]
        }
        return tab.buckets[d-bucketMinDistance-1]
    }

Resolve

根据Node的Id查找Node,先在当前的桶里面查找,查找一遍之后没找到就在周边的节点里面搜索一遍再找。

    // Resolve searches for a specific node with the given ID.
    // It returns nil if the node could not be found.
    func (tab *Table) Resolve(targetID NodeID) *Node {
        // If the node is present in the local table, no
        // network interaction is required.
        hash := crypto.Keccak256Hash(targetID[:])
        tab.mutex.Lock()
        //查找最近节点
        cl := tab.closest(hash, 1)
        tab.mutex.Unlock()
        if len(cl.entries) > 0 && cl.entries[0].ID == targetID {
            return cl.entries[0]
        }
        // Otherwise, do a network lookup.
        //不存在 搜索邻居节点
        result := tab.Lookup(targetID)
        for _, n := range result {
            if n.ID == targetID {
                return n
            }
        }
        return nil
    }

这里需要理解的函数是 closest,遍历所有桶的所有节点,查找最近的一个

    // closest returns the n nodes in the table that are closest to the
    // given id. The caller must hold tab.mutex.
    func (tab *Table) closest(target common.Hash, nresults int) *nodesByDistance {
        // This is a very wasteful way to find the closest nodes but
        // obviously correct. I believe that tree-based buckets would make
        // this easier to implement efficiently.
        close := &nodesByDistance{target: target}
        for _, b := range tab.buckets {
            for _, n := range b.entries {
                close.push(n, nresults)
            }
        }
        return close
    }

    func (h *nodesByDistance) push(n *Node, maxElems int) {
        ix := sort.Search(len(h.entries), func(i int) bool {
            return distcmp(h.target, h.entries[i].sha, n.sha) > 0
        })
        if len(h.entries) < maxElems {
            h.entries = append(h.entries, n)
        }
        if ix == len(h.entries) {
            // farther away than all nodes we already have.
            // if there was room for it, the node is now the last element.
        } else {
            // slide existing entries down to make room
            // this will overwrite the entry we just appended.
            //近的靠前边
            copy(h.entries[ix+1:], h.entries[ix:])
            h.entries[ix] = n
        }
    }

ReadRandomNodes

整体思路是先拷贝出来,再逐个桶的抽最上面的一个,剩下空桶移除,剩下的桶合并后,下一轮再抽桶的第一个节点,直到填满给定数据或者桶全部空掉。最后返回填到数组里面的数量。

    // ReadRandomNodes fills the given slice with random nodes from the
    // table. It will not write the same node more than once. The nodes in
    // the slice are copies and can be modified by the caller.
    func (tab *Table) ReadRandomNodes(buf []*Node) (n int) {
        if !tab.isInitDone() {
            return 0
        }
        tab.mutex.Lock()
        defer tab.mutex.Unlock()

        // Find all non-empty buckets and get a fresh slice of their entries.
        var buckets [][]*Node
        //拷贝节点
        for _, b := range tab.buckets {
            if len(b.entries) > 0 {
                buckets = append(buckets, b.entries[:])
            }
        }
        if len(buckets) == 0 {
            return 0
        }
        // Shuffle the buckets.
        for i := len(buckets) - 1; i > 0; i-- {
            j := tab.rand.Intn(len(buckets))
            buckets[i], buckets[j] = buckets[j], buckets[i]
        }
        // Move head of each bucket into buf, removing buckets that become empty.
        var i, j int
        for ; i < len(buf); i, j = i+1, (j+1)%len(buckets) {
            b := buckets[j]
            buf[i] = &(*b[0])  //取第一个节点
            buckets[j] = b[1:] //移除第一个
            if len(b) == 1 {
                //空桶移除
                buckets = append(buckets[:j], buckets[j+1:]...)  
            }
            if len(buckets) == 0 {
                break          
            }
        }
        return i + 1
    }

Lookup

lookup会要求已知节点查找邻居节点,查找的邻居节点又递归的找它周边的节点

    for {
        // ask the alpha closest nodes that we haven't asked yet
        for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
            n := result.entries[i]
            if !asked[n.ID] {
                asked[n.ID] = true
                pendingQueries++   
                go func() {
                    // Find potential neighbors to bond with
                    r, err := tab.net.findnode(n.ID, n.addr(), targetID)
                    if err != nil {
                        // Bump the failure counter to detect and evacuate non-bonded entries
                        fails := tab.db.findFails(n.ID) + 1
                        tab.db.updateFindFails(n.ID, fails)
                        log.Trace("Bumping findnode failure counter", "id", n.ID, "failcount", fails)

                        if fails >= maxFindnodeFailures {
                            log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails)
                            tab.delete(n)
                        }
                    }
                    reply <- tab.bondall(r)
                }()
            }
        }
        if pendingQueries == 0 {
            // we have asked all closest nodes, stop the search
            break
        }
        // wait for the next reply
        for _, n := range <-reply {    //此处会阻塞请求
            if n != nil && !seen[n.ID] {
                seen[n.ID] = true
                result.push(n, bucketSize)
            }
        }
        pendingQueries--
    }

桶的维护

桶初始化完成后会进入一个循环逻辑,其中通过三个timer控制调整周期。

  • 验证timer 间隔 10s左右
  • 刷新timer 间隔 30 min
  • 持久化timer 间隔 30s
    revalidate     = time.NewTimer(tab.nextRevalidateTime())
    refresh        = time.NewTicker(refreshInterval)
    copyNodes      = time.NewTicker(copyNodesInterval)

刷新逻辑:重新加载种子节点,查找周边节点,随机三个节点,并查找这三个节点的周围节点。

    func (tab *Table) doRefresh(done chan struct{}) {
        defer close(done)

        tab.loadSeedNodes(true)

        tab.lookup(tab.self.ID, false)

        for i := 0; i < 3; i++ {
            var target NodeID
            crand.Read(target[:])
            tab.lookup(target, false)
        }
    }

验证逻辑:验证每个桶的最末尾节点,如果该节点通过验证则放到队首(验证过程是本地节点向它发送ping请求,如果回应pong则通过)

    last, bi := tab.nodeToRevalidate()  //取最后一个节点
    if last == nil {
        // No non-empty bucket found.
        return
    }

    // Ping the selected node and wait for a pong.
    err := tab.ping(last.ID, last.addr())   //通信验证

    tab.mutex.Lock()
    defer tab.mutex.Unlock()
    b := tab.buckets[bi]
    if err == nil {
        // The node responded, move it to the front.
        log.Debug("Revalidated node", "b", bi, "id", last.ID)
        b.bump(last)    //提到队首
        return
    }

Peer/Server

相关文件

.
├── dial.go          //封装一个任务生成处理结构以及三种任务结构中(此处命名不太精确)
├── message.go       //定义一些数据的读写接口,以及对外的Send/SendItem函数
├── peer.go          //封装了Peer 包括消息读取  
├── rlpx.go          //内部的握手协议
├── server.go        //初始化,维护Peer网络,还有一些对外的接口

这一层会不断的从路由中提取节点,提取出来的节点要经过身份验证,协议检查之后加入到peer里面,紧接着如果没有人使用这个peer,这个peer就会被删除,再重新选择一些节点出来继续这个流程,peer再其中是随生随销,这样做是为了平均的使用所有的节点,而不是仅仅依赖于特定的几个节点。因而这里从Server开始入手分析整个流程

    Peers()                             //peer对象
    PeerCount()                         //peer数量
    AddPeer(node *discover.Node)        //添加节点
    RemovePeer(node *discover.Node)     //删除节点
    SubscribeEvents(ch chan *PeerEvent) //订阅内部的事件(节点的增加,删除)
    //以上四个属于对外的接口,不影响内部逻辑
    Start()                             //server开始工作
    SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node)  //启动一个连接,经过两次验证之后,如果通过则加入到peer之中。

Start初始化

Start做了三件事,生成路由表于建立底层网络。生成DialState用于驱动维护本地peer的更新与死亡,监听本地接口用于信息应答。这里主要分析peer的维护过程。函数是run函数。

    func (srv *Server) Start() (err error) {
        
        //**************初始化代码省略
        if !srv.NoDiscovery && srv.DiscoveryV5 {
            unhandled = make(chan discover.ReadPacket, 100)
            sconn = &sharedUDPConn{conn, unhandled}
        }

        // node table
        if !srv.NoDiscovery {
            //路由表生成
            cfg := discover.Config{
                PrivateKey:   srv.PrivateKey,
                AnnounceAddr: realaddr,
                NodeDBPath:   srv.NodeDatabase,
                NetRestrict:  srv.NetRestrict,
                Bootnodes:    srv.BootstrapNodes,
                Unhandled:    unhandled,
            }
            ntab, err := discover.ListenUDP(conn, cfg)
            if err != nil {
                return err
            }
            srv.ntab = ntab
        }

        if srv.DiscoveryV5 {
            //路由表生成
            var (
                ntab *discv5.Network
                err  error
            )
            if sconn != nil {
                ntab, err = discv5.ListenUDP(srv.PrivateKey, sconn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
            } else {
                ntab, err = discv5.ListenUDP(srv.PrivateKey, conn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
            }
            if err != nil {
                return err
            }
            if err := ntab.SetFallbackNodes(srv.BootstrapNodesV5); err != nil {
                return err
            }
            srv.DiscV5 = ntab
        }

        dynPeers := srv.maxDialedConns()
        //newDialState 对象生成,这个对象包含Peer的实际维护代码
        dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)

        // handshake  协议加载
        srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: discover.PubkeyID(&srv.PrivateKey.PublicKey)}
        for _, p := range srv.Protocols {
            srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
        }
        // listen/dial
        //监听本地端口
        if srv.ListenAddr != "" {
            if err := srv.startListening(); err != nil {
                return err
            }
        }
        if srv.NoDial && srv.ListenAddr == "" {
            srv.log.Warn("P2P server will be useless, neither dialing nor listening")
        }

        srv.loopWG.Add(1)
        //重要的一句,开个协程,在其中做peer的维护
        go srv.run(dialer)
        srv.running = true
        return nil
    }

run 开始peer的生成

该函数中定义了两个队列

    runningTasks []task //正在执行的任务
    queuedTasks  []task //尚未执行的任务

定义了三个匿名函数

    //从正在执行任务中删除任务
	delTask := func(t task) {
		for i := range runningTasks {
			if runningTasks[i] == t {
				runningTasks = append(runningTasks[:i], runningTasks[i+1:]...)
				break
			}
		}
	}
	//开始一批任务
	startTasks := func(ts []task) (rest []task) {
		i := 0
		for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
			t := ts[i]
			srv.log.Trace("New dial task", "task", t)
			go func() {
				 t.Do(srv); taskdone <- t  
			}()
			runningTasks = append(runningTasks, t)
		}
		return ts[i:]
	}
    //启动开始一批任务再调用dialstate的newTasks函数生成一批任务,加载到任务队列里面
	scheduleTasks := func() {
		// Start from queue first.
		queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...)
		// Query dialer for new tasks and start as many as possible now.
		if len(runningTasks) < maxActiveDialTasks {
			nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now())
			queuedTasks = append(queuedTasks, startTasks(nt)...)
		}
	}

定义了一个循环,分不同的chanel执行对应的逻辑

	for {
        //调度开始找生成任务
		scheduleTasks()

		select {
		case <-srv.quit://退出
			break running
		case n := <-srv.addstatic: 
            //增加一个节点  该节点最终会生成一个dialTask 
            //并在newTasks的时候加入到读列
			srv.log.Debug("Adding static node", "node", n)
			dialstate.addStatic(n)
		case n := <-srv.removestatic:
            //直接删除该节点 节点不再参与维护,很快就会死掉了
			dialstate.removeStatic(n)
			if p, ok := peers[n.ID]; ok {
				p.Disconnect(DiscRequested)
			}
		case op := <-srv.peerOp:
			//  Peers 和 PeerCount 两个外部接口,只是读取peer信息
			op(peers)
			srv.peerOpDone <- struct{}{}
		case t := <-taskdone:
		    //task完成后会根据不同的任务类型进行相应的处理
			srv.log.Trace("Dial task done", "task", t)
			dialstate.taskDone(t, time.Now())
			delTask(t)
		case c := <-srv.posthandshake:
			//身份验证通过 
			if trusted[c.id] {
				// Ensure that the trusted flag is set before checking against MaxPeers.
				c.flags |= trustedConn
			}
			select {
			case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c):
			case <-srv.quit:
				break running
			}
		case c := <-srv.addpeer:
			//身份协议验证通过 加入队列
			err := srv.protoHandshakeChecks(peers, inboundCount, c)
			if err == nil {
				// The handshakes are done and it passed all checks.
				p := newPeer(c, srv.Protocols)
				// If message events are enabled, pass the peerFeed
				// to the peer
				if srv.EnableMsgEvents {
					p.events = &srv.peerFeed
				}
				name := truncateName(c.name)
				srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
				go srv.runPeer(p)   //触发事件 此处是最上层截取peer的位置,如果此物没有外部影响,那么这个peer很快就被销毁了
				peerAdd++
				fmt.Printf("--count %d--- add %d-- del %d--\n",len(peers),peerAdd,peerDel)
				
				peers[c.id] = p
				if p.Inbound() {
					inboundCount++
				}
			}
			// The dialer logic relies on the assumption that
			// dial tasks complete after the peer has been added or
			// discarded. Unblock the task last.
			select {
			case c.cont <- err:
			case <-srv.quit:
				break running
			}
		case pd := <-srv.delpeer:
			//移除peer
			d := common.PrettyDuration(mclock.Now() - pd.created)
			pd.log.Debug("Removing p2p peer", "duration", d, "peers", len(peers)-1, "req", pd.requested, "err", pd.err)
			delete(peers, pd.ID())
			peerDel++
			fmt.Printf("--count %d--- add %d-- del %d--\n",len(peers),peerAdd,peerDel)
			if pd.Inbound() {
				inboundCount--
			}
		}
	}

记住上面的代码,再来逐个的看:

scheduleTasks

scheduleTasks调度生成任务,生成的任务中有一种dialTask的任务,该任务结构如下

    type dialTask struct {
        flags        connFlag
        dest         *discover.Node
        lastResolved time.Time
        resolveDelay time.Duration
    }

    func (t *dialTask) Do(srv *Server) {
        if t.dest.Incomplete() {
            if !t.resolve(srv) {
                return
            }
        }
        err := t.dial(srv, t.dest)  //此处会调用到setupConn函数
        if err != nil {
            log.Trace("Dial error", "task", t, "err", err)
            // Try resolving the ID of static nodes if dialing failed.
            if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 {
                if t.resolve(srv) {
                    t.dial(srv, t.dest)
                }
            }
        }
    }

dial最终回调用到setupConn函数,函数只保留重点的几句,篇幅有点长了

    func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *discover.Node) error {

        //身份验证码 获取设备,标识等信息
        if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != 
        //此处会往chanel中添加连接对象,最终触发循环中的posthandshake分支
        err = srv.checkpoint(c, srv.posthandshake)  
        //协议验证
        phs, err := c.doProtoHandshake(srv.ourHandshake)
        c.caps, c.name = phs.Caps, phs.Name
        //此处会往chanel中添加连接对象 最终触发循环中的addpeer分支
        err = srv.checkpoint(c, srv.addpeer)
    }

posthandshake 分支仅仅做了验证,addpeer做的事情就比较多了,重要的就是执行runPeer函数

    func (srv *Server) runPeer(p *Peer) {
        // 广播 peer add
        srv.peerFeed.Send(&PeerEvent{
            Type: PeerEventTypeAdd,
            Peer: p.ID(),
        })

        // run the protocol
        remoteRequested, err := p.run() //

        // 广播 peer drop
        srv.peerFeed.Send(&PeerEvent{
            Type:  PeerEventTypeDrop,
            Peer:  p.ID(),
            Error: err.Error(),
        })
        //移除peer
        srv.delpeer <- peerDrop{p, err, remoteRequested}
    }

    func (p *Peer) run() (remoteRequested bool, err error) {
        //*************
        writeStart <- struct{}{}
        p.startProtocols(writeStart, writeErr)
        //*************
        //这一句阻塞性确保了peer的存活
        p.wg.Wait()  
    }

    func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) {
        p.wg.Add(len(p.running))
        for _, proto := range p.running {
            proto := proto
            proto.closed = p.closed
            proto.wstart = writeStart
            proto.werr = writeErr
            var rw MsgReadWriter = proto
            if p.events != nil {
                rw = newMsgEventer(rw, p.events, p.ID(), proto.Name)
            }
            p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version))
            go func() {
                //其他的都是为这一句做准备的,在以太坊中p2p就是靠这一句对上层暴露peer对象
                err := proto.Run(p, rw)
                if err == nil {
                    p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version))
                    err = errProtocolReturned
                } else if err != io.EOF {
                    p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err)
                }
                p.protoErr <- err
                p.wg.Done()  
            }()
        }
    }

这样就可以可理出一条思路 scheduleTasks执行生成dialTask任务 dialTask任务执行过程中逐个填充posthandshake,addPeer这两个chanel。 addPeer执行时对上层暴露了Peer对象,完成后填充了delpeer,最后删除了Peer。

任务的生成

具体看代码中的注释

    func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task {
        if s.start.IsZero() {
            s.start = now
        }

        var newtasks []task
        //这里声明了一个添加任务的函数  
        addDial := func(flag connFlag, n *discover.Node) bool {
            if err := s.checkDial(n, peers); err != nil {
                log.Trace("Skipping dial candidate", "id", n.ID, "addr", &net.TCPAddr{IP: n.IP, Port: int(n.TCP)}, "err", err)
                return false
            }
            s.dialing[n.ID] = flag  //排除掉已经再测试的
            newtasks = append(newtasks, &dialTask{flags: flag, dest: n})
            return true
        }

        // Compute number of dynamic dials necessary at this point.
        needDynDials := s.maxDynDials     //当前系统中最大连接数目
        for _, p := range peers {        //扣除已建立链接的peer
            if p.rw.is(dynDialedConn) {
                needDynDials--
            }
        }
        for _, flag := range s.dialing {  //扣除已建立链接的peer
            if flag&dynDialedConn != 0 {
                needDynDials--
            }
        }

        //外部命令添加的节点 这种节点不占用needDynDials数目,
        //是为了保证手动加的节点能够起效
        for id, t := range s.static {
            err := s.checkDial(t.dest, peers)
            switch err {
            case errNotWhitelisted, errSelf:
                log.Warn("Removing static dial candidate", "id", t.dest.ID, "addr", &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)}, "err", err)
                delete(s.static, t.dest.ID)
            case nil:
                s.dialing[id] = t.flags
                newtasks = append(newtasks, t)
            }
        }
        // If we don't have any peers whatsoever, try to dial a random bootnode. This
        // scenario is useful for the testnet (and private networks) where the discovery
        // table might be full of mostly bad peers, making it hard to find good ones.
        if len(peers) == 0 && len(s.bootnodes) > 0 && needDynDials > 0 && 
        //检查引导节点  因为引导节点比搜索到的节点更大概率靠谱 因而比较靠前
        now.Sub(s.start) > fallbackInterval {
            bootnode := s.bootnodes[0]
            s.bootnodes = append(s.bootnodes[:0], s.bootnodes[1:]...)
            s.bootnodes = append(s.bootnodes, bootnode)

            if addDial(dynDialedConn, bootnode) {
                needDynDials--
            }
        }
        //随机的从路由中抽取最大节点的二分之一
        randomCandidates := needDynDials / 2
        if randomCandidates > 0 {
            n := s.ntab.ReadRandomNodes(s.randomNodes)
            for i := 0; i < randomCandidates && i < n; i++ {
                if addDial(dynDialedConn, s.randomNodes[i]) {
                    needDynDials--
                }
            }
        }
        // 从lookupbuf中抽取
        i := 0
        for ; i < len(s.lookupBuf) && needDynDials > 0; i++ {
            if addDial(dynDialedConn, s.lookupBuf[i]) {
                needDynDials--
            }
        }
        s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])]
        // 如果还是不够,路由再去搜索节点
        if len(s.lookupBuf) < needDynDials && !s.lookupRunning {
            s.lookupRunning = true
            newtasks = append(newtasks, &discoverTask{})
        }

        // wait
        if nRunning == 0 && len(newtasks) == 0 && s.hist.Len() > 0 {
            t := &waitExpireTask{s.hist.min().exp.Sub(now)}
            newtasks = append(newtasks, t)
        }
        return newtasks
    }

消息发送

另一个是message中的Send,SendItem函数 实现了MsgWriter的对象都可以调用这个函数写入,觉得这里没什么必要,完全可以封装到peer里面去,不过它上层做广播的时候确实是调用的这两个函数。

    func Send(w MsgWriter, msgcode uint64, data interface{}) error {
        size, r, err := rlp.EncodeToReader(data)
        if err != nil {
            return err
        }
        return w.WriteMsg(Msg{Code: msgcode, Size: uint32(size), Payload: r})
    }

    func SendItems(w MsgWriter, msgcode uint64, elems ...interface{}) error {
        return Send(w, msgcode, elems)
    }

以太坊上层调用

Peer/PeerSet

文件:go-ethereum/eth/peer.go

定义了两个struct,Peer和PeerSet。Peer封装了底层的p2p.Peer,集成了一些和业务相关的方法,比如SendTransactions,SendNewBlock等。PeerSet是Peer的集合

    type peer struct {
        id string

        *p2p.Peer
        rw p2p.MsgReadWriter

        version  int         // Protocol version negotiated
        forkDrop *time.Timer // Timed connection dropper if forks aren't validated in time

        head common.Hash
        td   *big.Int
        lock sync.RWMutex

        knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
        knownBlocks *set.Set // Set of block hashes known to be known by this peer
    }

    type peerSet struct {
        peers  map[string]*peer
        lock   sync.RWMutex
        closed bool
    }

Peer注册/注销

文件:go-ethereum/eth/handler.go manager.handle在检查了peer后会把这个peer注册到peerset中,表示此peer可用,发生错误后peerset注销该peer,返回错误,最后再Server中销毁。

	manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
	for i, version := range ProtocolVersions {
		// Skip protocol version if incompatible with the mode of operation
		if mode == downloader.FastSync && version < eth63 {
			continue
		}
		// Compatible; initialise the sub-protocol
		version := version // Closure for the run
		manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
			Name:    ProtocolName,
			Version: version,
			Length:  ProtocolLengths[i],
			Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
				peer := manager.newPeer(int(version), p, rw)
				select {
				case manager.newPeerCh <- peer:
					manager.wg.Add(1)
					defer manager.wg.Done()
                    //此处如果顺利会进入for循环 如果失败返回错误我会销毁掉这个peer
					return manager.handle(peer)  
				case <-manager.quitSync:
					return p2p.DiscQuitting
				}
			},
			NodeInfo: func() interface{} {
				return manager.NodeInfo()
			},
			PeerInfo: func(id discover.NodeID) interface{} {
				if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
					return p.Info()
				}
				return nil
			},
		})
	}

参考

源码:https://github.com/ethereum/go-ethereum/tree/master/p2p

Kademlia算法:https://en.wikipedia.org/wiki/Kademlia

转自:(魂祭心) https://my.oschina.net/hunjixin/blog/1803029


如果你希望高效的学习以太坊DApp开发,可以访问汇智网提供的最热门在线互动教程:

1.适合区块链新手的以太坊DApp实战入门教程
2.区块链+IPFS+Node.js+MongoDB+Express去中心化以太坊电商应用开发实战

其他更多内容也可以访问这个以太坊博客

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

以太坊 p2p Server 原理及实现 的相关文章

  • 制作镜像并上传到阿里云

    run一个ubuntu容器 然后自动进入容器 ucsmy QKTEST21191 docker run it name node daocloud io ubuntu root 3ac09729dadb 安装geth sudo apt ge
  • FutureWarning: Criterion ‘mse‘ was deprecated in v1.0 and will be removed in version 1.2.

    出现FutureWarning Criterion mse was deprecated in v1 0 and will be removed in version 1 2 Use criterion squared error whic
  • Mysql索引原理

    Mysql索引类型及其特性 1 普通索引 最基本的索引 它没有任何限制 也是我们大多数情况下用到的索引 直接创建索引 CREATE INDEX index name ON table column length 修改表结构的方式添加索引 A
  • javaの日志级别

    最近几周给项目补日志 头都大了 项目开发接口时一定要同步日志 一定 首先 日志级别从低到高 all
  • linux学习课程从入门到精通:Centos8用户管理

    本人从事IT行业已有十多年 有着丰富的实战经验 总结了大量的学习方法 更是积累了很多的学习资料 很高兴能在这里跟大家交流学习 希望能在这里跟大家共同进步和成长 全套学习资料移步至公众号 学神来啦 更多学习资料添加扣扣资源群 66130895
  • eNSP:ospf相关实验

    一 实验要求 二 实验步骤 1 建设如下图拓扑并划分网段 2 配置R1 R4的接口和回环地址 R1 r1 int g0 0 0 r1 GigabitEthernet0 0 0 ip add 192 168 1 1 27 r1 int l 0
  • 1093: 数1的个数

    存限制 128 MB 题目描述 给定一个十进制正整数n 1 n 10000 写下从1到n的所有整数 然后数一下其中出现的数字 1 的个数 例如当n 2时 写下1 2 这样只出现了1个 1 当n 12时 写下1 2 3 4 5 6 7 8 9
  • springboot多模块打包配置问题

    工程案例结构 baidu 聚合过程 baidu web 子模块web工程 baidu service 子模块 baidu config 子模块配置工程 注意事项 配置步骤 1 baidu 聚合工程 工程下的 pom xml 文件案列如下
  • 引介

    转载自 https ethfans org posts rlp encode and decode RLP编码和解码 RLP Recursive Length Prefix 递归的长度前缀 是一种编码规则 可用于编码任意嵌套的二进制数组数据
  • IP包头&ARP协议笔记

    一 IP包头分析 1 帧中的IP包头 从版本到可选项 其中2为帧头 注 1 IP包头最小长度 20字节 即可选项以前部分 IP包头长度是可变的 2 可选项最长可以是40个字节 故IP包头最长可以是60个字节 1 版本 4 说明是IPv4 2
  • Windows实例如何通过本地安全策略限制远程登录的IP地址

    Windows实例如何通过本地安全策略限制远程登录的IP地址 阿里云 禁止所有的IP地址连接服务器的RDP端口 远程连接登录服务器 单击 开始 选择 运行 输入gpedit msc 单击 确定 打开本地组策略编辑器 在左侧依次找到 计算机配
  • Windows10如何添加开机启动项

    在日常生活中 偶尔要求其中的软件在开机时便能自动启动 比如MySQL一般被设置为自启动项 今天将为大家介绍window10中如何添加开机启动项 操作过程 1 按下win R调出运行窗口 并输入 shell startup 即可进入开机启动文
  • 是否可以连接两个或多个 WiFi Direct 组?

    我目前正在为我正在进行的一个项目尝试 WiFi Direct WiFiP2p 并想知道是否可以在组之间创建桥梁 从而将它们连接在一起 基于白皮书由 WiFi 联盟发布 这应该是可能的 尽管 P2P 规范没有描述此功能的机制 实施是特定于供应
  • UDP打洞帮助

    我正在尝试使用点对点将互联网网络添加到我的游戏中 我已经实现了仅 LAN 版本 它使用 NET 的对等类在本地网络内进行连接 我意识到我需要使用 UDP 打洞来建立与防火墙后面的其他客户端的连接 这就是我开始实施它的方式 该服务器是一个 H
  • NAT 后面的 UDP 打洞

    我正在尝试用 Java 实现 UDP Holepunching 的简单草图来测试它的概念 并稍后在我的 C C 应用程序中使用它 Concept 根据维基百科 我对这个概念的理解是这样的 假设 A 和 B 是未定义网络结构后面的客户端 C
  • 如何制作自己的 P2P 软件? [关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我怎样才能制作自己的napster 这些 p2p 程序使用哪个库 我不太熟悉套接字编程的概念 你能用Qt4制作p2p程序吗 从 Napst
  • Windows Metro 应用程序中没有 P2P?

    在 BUILD 的 NET 开发人员对 Windows 8 应用程序开发的看法 会议中 讲师提到 Metro 配置文件中仅公开了客户端 WCF 功能 我们无法创建服务器 http channel9 msdn com Events BUILD
  • 具有 3 个用户连接的 WebRTC

    我现在正在实施源代码WebRTC 示例 https github com webrtc samples tree gh pages src content peerconnection audio通过使用网状拓扑成为 3 个用户连接 但是
  • 为什么我在使用 WifiP2pManager 时总是显示 BUSY?

    我正在尝试使用 Wi Fi Direct 连接两个 Android 设备 在我的 HTC 手机 One SV 上它似乎可以工作 但在我的第二台设备 LG Optimus 4xhd 上它不起作用 在我的 onResume 函数中 我启动以下线
  • 比特币客户端如何确定第一个连接的IP地址?

    据我所知 比特币是一种 p2p 协议P2P协议必须有一个专用的中央服务器 https stackoverflow com questions 310607 peer to peer methods of finding peers 但据说比

随机推荐

  • Web前端学习(HTML)学习---下(表格标签,列表标签,表单标签)案例

    作者 旧梦拾遗186 专栏 C语言编程 小比特成长日记 前言 趁年轻 余额不足可以挣 电量不足可以充 时间匆匆不再回来 趁年轻就去多付出 不攀比 不抱怨 不计较 多付出 因为有一种努力叫靠自己 前一篇文章我们着重学习了HTML中的 标题标签
  • CSS--滑动门和过渡效果

    滑动门 滑动门出现的背景 制作网页时 为了美观 常常需要为网页元素设置特殊形状的背景 比如微信导航栏 有凸起和凹下去的感觉 最大的问题是里面的字数不一样多 咋办 为了使各种特殊形状的背景能够自适应元素中文本内容的多少 出现了CSS滑动门技术
  • Blender常用快捷键整理

    物体操作快捷键 即选中物体 G键 移动物体 R键 旋转物体 S键 缩放物体 移动 旋转或缩放物体时 按下X Y或Z键 按X Y或Z轴方向移动 旋转或缩放 TAB键 切换为编辑模式 CTRL A 弹出应用菜单 物体模式旋转缩放后应用旋转与缩放
  • 人工智能在通信领域的应用

    人工智能的出现使得各个行业都有了新的发展方向 通过和人工智能结合 使得自己的行业打破传统的方式 以一种新的姿态进入人们的视线中 现在我们都离不开通信技术 很多人对于人工智能给通信领域带来什么的改变 在这篇文章中我们会详细的介绍这一问题 大家
  • 基于opencv的车道线识别 方法二(极易实现(python))

    基于opencv的车道线识别 方法二 效果图 语言 平台 所需的库 步骤及原理 1 导入库 2 二值化 3 提取感兴趣区域 4 剔除噪点 5 找出值不为零的点 即车道线 并将其绘制在原图上 完整代码 效果图 语言 python 平台 pyc
  • 关于FFmpeg里的GPL和LGPL协议

    参考博文 谢谢博主的分享 http www cnblogs com findumars p 3556883 html GPL介绍 我们很熟悉的Linux就是采用了GPL GPL协议和BSD Apache Licence等鼓励代码重用的许可很
  • python并发编程学习笔记--单线程,多线程,多进程 day06

    Python并发编程是指同时处理多个任务的技术 包括单线程 多线程和多进程三种方式 1 单线程 单线程是指在一个进程中只有一个线程在执行任务的情况 虽然只有一个线程在执行任务 但可以使用异步编程模型来实现并发操作 从而达到提高程序效率的目的
  • 置信度传播算法(Belief Propagation)

    基础知识 条件概率 Conditional Probability 相互独立时 p A B p A 贝叶斯规则 贝叶斯网络 Bayesian Network 定了一个独立的结构 一个节点的概率仅依赖于它的父节点 贝叶斯网络适用于稀疏模型 即
  • 【Angular】——无限级下拉列表框

    前言 前段时间换了新框架 将前后端分离 对Angular2有点感兴趣 所以参与一起封装组件 在小5的帮助下 学会了 好多东西 这里总结下封装的无限级下拉列表框 dropdownlist ts import Component OnInit
  • IDEA 调试小技巧

    条件断点 循环中经常用到这个技巧 比如 遍历中 想让断点停在某个特定值 见上图 在断点位置 右击会弹出一个界面 在condition中填写断点条件 在调试的时候 断点会自动在断点条件 i 6 为 true时候停下 跳过为false的条件 回
  • Java对象与byte[]数组之间的相互转化,压缩解压缩操作

    原文 http blog csdn net NsdnResponsibility article details 51028739 comments 下面介绍一下java对象之间和byte 数组之间的相互转化 并对byte 数据进行压缩操作
  • 数学的回忆(零)——傅立叶

    一 什么是频域 从我们出生 我们看到的世界都以时间贯穿 股票的走势 人的身高 汽车的轨迹都会随着时间发生改变 这种以时间作为参照来观察动态世界的方法我们称其为时域分析 而我们也想当然的认为 世间万物都在随着时间不停的改变 并且永远不会静止下
  • 如何使用KubeSphere3.0的DevOps系统构建dotnet core应用

    如何使用KubeSphere3 0的DevOps系统构建dotnet core应用 因KubeSphere的DevOps系统官方未提供 net core的ci cd解决方案 需要自己进行DIY 现把实施过程记录下来 供需要的小伙伴自取 前提
  • FISCO BCOS 六、通过Caliper进行压力测试程序(及常见问题)

    目录 1 环境要求 第一步 配置基本环境 这里我使用的是Ubuntu20 04 第二步 安装NodeJS 第三步 部署Docker 第四步 安装Docker Compose 2 Caliper部署 第一步 部署 第二步 绑定 第三步 快速体
  • Jedis使用

    Jedis Jedis是Redis官方推荐的Java连接服务工具 Java语言连接redis服务还有这些SpringData Redis Lettuce 下载地址 https mvnrepository com artifact redis
  • MySQL left join优化

    问题描述 遇到了一个需要4个表连接查询的问题 数据量不是很大 两个表大概9000条数据 另外两个表大概几百条数据 但是每次查询时间都需要50秒左右的时间 SELECT FROM gzgdm gz gd region region LEFT
  • 类组件使用mobx实现数据修改以及请求数据

    src下创建新的共享数据文件test js import observable computed action autorun runInAction configure makeAutoObservable from mobx impor
  • 处理高并发的方法

    处理高并发 六种方法 1 系统拆分 将一个系统拆分为多个子系统 用dubbo来搞 然后每个系统连一个数据库 这样本来就一个库 现在多个数据库 这样就可以抗高并发 2 缓存 大部分的高并发场景 都是读多写少 那你完全可以在数据库和缓存里都写一
  • SpringCloud整合 Oauth2+Gateway+Jwt+Nacos 实现授权码模式的服务认证

    前言 目前正在出一个SpringCloud进阶系列教程 含源码解读 篇幅会较多 喜欢的话 给个关注 前段时间拖更了 主要事情比较多和杂 不多废话了 直接给大家开整吧 本节重点是给大家介绍Oauth2 将会带大家从0到1搭建一个 Spring
  • 以太坊 p2p Server 原理及实现

    以太坊p2p原理与实现 区块链技术的去中心依赖于底层组网技术 以太坊的底层实现了p2pServer 大约可以分为这样三层 底层路由表 封装了kad路由 节点的数据结构以及计算记录 节点搜索 验证等功能 中层peer抽象 message开放发