Dubbo通信模型

2023-11-01

Dubbo和通信结合

通信实现

服务的发布过程使用通信功能: 
Protocol.export()时会为每个服务创建一个Server

服务的引用过程使用通信功能: 
Protocol.refer()时会创建一个Client

整个类结构及调用关系如下:

这里写图片描述

从图中可以看出,Dubbo的Transporter层完成通信功能,底层的Netty和Mina委托给统一的ChannelHandler来完成具体的功能

编解码(Codec)

Socket是对TCP/IP的封装和应用,TCP/IP都有一个报文头结构定义,作用非常大,例如解决粘包问题。Dubbo借助Netty已经将这样一部分工作委托出去了,不过还是有些工作需要Dubbo来完成,我们来看一张官方提供的报文头定义: 
这里写图片描述
只有搞清楚了报文头定义,才能完成报文体的编码解码,交给底层通信框架去收发

序列化(Serialization)

Dubbo本身支持多种序列化方式,具体使用哪种序列化方式需要由业务场景来决定,详见Dubbo官网

NIO通信层

Dubbo已经集成的有Netty、Mina,重点分析下Netty,详见Netty系列之Netty线程模型

服务器端

NettyServer的启动流程: 首先创建出NettyHandler,用户的连接请求的处理全部交给NettyHandler来处理,NettyHandler又会委托ChannelHandler接口做Dubbo具体的事情。

至此就将所有底层不同的通信实现全部转化到了外界传递进来的ChannelHandler接口的实现上了。

而上述Server接口的另一个分支实现HeaderExchangeServer则充当一个装饰器的角色,为所有的Server实现增添了如下功能:

向该Server所有的Channel依次进行心跳检测:

  • 如果当前时间减去最后的读取时间大于heartbeat时间或者当前时间减去最后的写时间大于heartbeat时间,则向该Channel发送一次心跳检测
  • 如果当前时间减去最后的读取时间大于heartbeatTimeout,则服务器端要关闭该Channel,如果是客户端的话则进行重新连接(客户端也会使用这个心跳检测任务)

看下ChannelHandler接口的实现情况:

这里写图片描述

看下Server接口实现情况:

这里写图片描述

客户端

看下Client接口实现情况: 
这里写图片描述

NettyClient在使用Netty的API开启客户端之后,仍然使用NettyHandler来处理,还是最终委托给ChannelHandler接口实现上

我们可以发现,这样集成完成之后,就完全屏蔽了底层通信细节,将逻辑全部交给了ChannelHandler

同步调用和异步调用的实现

该部分主要在Client端,调用过程DubboProtocol.refer()->DubboInvoker,

来看下DubboInvoker的具体实现:

  @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout) ;
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

如果不需要返回值,直接使用send方法,发送出去,设置当期和线程绑定RpcContext的future为null

  1. 如果需要异步通信,使用request方法构建一个ResponseFuture,然后设置到和线程绑定的RpcContext中
  2. 如果需要同步通信,使用request方法构建一个ResponseFuture,阻塞等待请求完成

另外官方文档有说明(Dubbo协议):Dubbo协议采用单一长连接和NIO异步通讯(默认Netty,Netty使用Socket(通信是全双工的方式,可以更方便的使用TCP/IP协议栈)完成通信) 
适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况

Dubbo协议线程说明

这里写图片描述

Dubbo协议:

  • 连接个数:单连接
  • 连接方式:长连接
  • 传输协议:TCP
  • 传输方式:NIO异步传输
  • 序列化:Hessian
  • 适用范围:入传出参数数据包较小(建议小于100K),消费者比提供者个数多,单一消费者无法压满提供者,尽量不要用dubbo协议传输大文件或超大字符串

同步调用

我们首先看第3种情况,对于当前线程来说,将请求发送出去,暂停等结果回来后再执行。于是这里出现了2个问题:

  • 当前线程怎么让它“暂停,等结果回来后,再执行?
  • 正如前面所说,Socket通信是一个全双工的方式,如果有多个线程同时进行远程方法调用,这时建立在client server之间的socket连接上会有很多双方发送的消息传递,前后顺序也可能是乱七八糟的,server处理完结果后,将结果消息发送给client,client收到很多消息,怎么知道哪个消息结果是原先哪个线程调用的?

我们从代码上找些痕迹 
调用路径:HeaderExchangeClient.request()->HeaderExchangeChannel.request()

public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        req.setData(request);
        //客户端并发请求线程阻塞的对象
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try{
            channel.send(req);//非阻塞调用
        }catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }


注意这个方法返回的ResponseFuture对象,当前客户端请求的线程在经过一系列调用后,会拿到ResponseFuture对象,最终该线程会阻塞在这个对象的下面这个方法调用上,如下:

public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (! isDone()) {//无限连
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (! isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (! isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }


上面我已经看到请求线程已经阻塞,那么又是如何被唤醒的呢? 

上文提到过Client端的处理最终转化成ChannelHandler接口实现上,我们看HeaderExchangeHandler.received()

 public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {
                    //服务端处理请求
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
            //这里就是作为消费者的dubbo客户端在接收到响应后,触发通知对应等待线程的起点
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

    static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }


熟悉的身影:DefaultFuture,继续看received()方法

public static void received(Channel channel, Response response) {
        try {
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at " 
                            + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) 
                            + ", response " + response 
                            + (channel == null ? "" : ", channel: " + channel.getLocalAddress() 
                                + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }


留一下我们之前提到的id的作用,这里可以看到它已经开始发挥作用了。通过id,DefaultFuture.FUTURES可以拿到具体的那个DefaultFuture对象,它就是上面我们提到的,阻塞请求线程的那个对象。好,找到目标后,调用它的doReceived方法,唤醒阻塞的线程,拿到返回结果

 private void doReceived(Response res) {
        lock.lock();
        try {
            response = res;
            if (done != null) {
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }


现在前面2个问题已经有答案了

  • 当前线程怎么让它“暂停”,等结果回来后,再向后执行? 
    答:先生成一个对象ResponseFuture,在一个全局map里put(ID,Future)存放起来,使用ResponseFuture的ReentrantLock.lock()让当前线程处于等待状态,然后另一消息监听线程等到服务端结果来了后,再map.get(ID)找到ResponseFuture,调用ResponseFuture.unlock()唤醒前面处于等待状态的线程。

  • 正如前面所说,Socket通信是一个全双工的方式,如果有多个线程同时进行远程方法调用,这时建立在client server之间的socket连接上会有很多双方发送的消息传递,前后顺序也可能是乱七八糟的,server处理完结果后,将结果消息发送给client,client收到很多消息,怎么知道哪个消息结果是原先哪个线程调用的? 
    答:使用一个ID,让其唯一,然后传递给服务端,再服务端又回传回来,这样就知道结果是原先哪个线程的了。

异步调用

官方给出了异步调用的文档 
异步调用先返回一个ResponseFuture对象,然后设置到和线程绑定的RpcContext中去

此时我们会发现一个问题,当某个线程多次发送异步请求时,都会将返回的DefaultFuture对象设置到当前线程绑定(ThreadLocal是个静态常量)的RpcContext中,就会造成了覆盖问题,如下调用方式:

//RpcContext.getContext().setFuture()
String result1 = helloService.hello("World");
//RpcContext.getContext().setFuture()
String result2 = helloService.hello("java");
System.out.println("result :"+result1);
System.out.println("result :"+result2);
System.out.println("result : "+RpcContext.getContext().getFuture().get());
System.out.println("result : "+RpcContext.getContext().getFuture().get());

即异步调用了hello方法,再次异步调用,则前一次的结果就被冲掉了,则就无法获取前一次的结果了。必须要调用一次就立马将DefaultFuture对象获取走,以免被冲掉。即这样写:

String result1 = helloService.hello("World");
Future<String> result1Future=RpcContext.getContext().getFuture();
String result2 = helloService.hello("java");
Future<String> result2Future=RpcContext.getContext().getFuture();
System.out.println("result :"+result1);
System.out.println("result :"+result2);
System.out.println("result : "+result1Future.get());
System.out.println("result : "+result2Future.get());


http://blog.csdn.net/qq418517226/article/details/51906357

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Dubbo通信模型 的相关文章

  • 深拷贝与浅拷贝的简单理解

    首先说下什么是简单数据类型 什么是复杂数据类型 简单数据类型有 number 数字型 null undfande booler 布尔值 string 字符串 复杂数据类型 有array 数组 object 对象 function 函数 等等

随机推荐

  • Python探索性数据分析畅销书

    探索性数据分析 探索性数据分析 EDA 是一种分析和调查数据集以了解数据特征的方法 数据集 查看数据集示例 有许多与 2009 年至 2019 年在销售的畅销书的标题和作者相关的信息 除了标题和作者之外 数据中还有其他元素 例如用户评分 评
  • 以太坊Web3.js开发基础

    简介 web3 js是一个通过RPC调用和本地以太坊节点进行通信的js库 web3 js可以与任何暴露了RPC接口的以太坊节点连接 web3中提供了eth对象 web3 eth来与以太坊区块链进行交互 在github上上获得代码 安装Tes
  • Lua--字符串操作

    str1 luaC Java str2 SQLServerOracle 一 输出字符串的长度 print str1 print string len str1 二 字符串的大小写转换 print 全大写 string upper str1
  • 超简单 STM32 RTC闹钟 时钟配置

    基于正点原子的RTC时钟 实验效果 LCD屏幕显示 年月日时分秒 设置任意时间 到时间蜂鸣器启动 直接上代码 主函数 简单说 就是初始化各个部件 然后让LED1 闪烁来提示系统的正常运行 显示屏显示实时时间 include led h in
  • winform 中 Devexpress Charts动态添加数据

    参考 Devexpress Charts动态添加数据 https www cnblogs com zhangruisoldier p 4226950 html DevExpress 图表控件 ChartControl 动态绑定数据 http
  • 打开ftp服务器显示成文件,打开ftp文件时出现与"服务器连接被重置"是怎么回事?拜托各位大神...

    满意答案 追问 我才注册的啊 回答 哦 看一下这份资料 希望能解决你的问题 分析解决首先可以排除物理连接上的问题 因为其他网络应用都是正常的 由于出现不能访问ftp服务器现象的电脑不止一台 所有的办公电脑均存在这个问题 因此只能从软件设置上
  • 自动化测试-Selenium

    一 selenium环境搭建 1 检查python环境 2 在cmd命令窗口 输入pip3 install selenium 3 浏览器驱动安装 由于执行的脚本需要浏览器驱动来驱动浏览器 所以需要安装形影的浏览器驱动 WebDriver浏览
  • 论三网融合对数据中心的影响

    近日国务院办公厅印发 三网融合推广方案 方案明确要加快在全国推进三网融合 推动信息网络基础设施互联互通和资源共享 将广电 电信业务双向进入扩大到全国范围 并实质性展开工作 三网融合其实国家政府提了好多年 是指电信网 广播电视网和互联网三网的
  • styled-components 基本用法

    styled components 基本用法 安装 npm install save styled components 或 yarn add styled components 注 如使用tsx语法请同时安装相应的 types声明文件 n
  • qt 程序中执行额外程序和脚本

    1 最简单的 我们可以通过system直接启动一个应用程序或者脚本 system helloworld system hello sh 操作简单 但是我们可以很清晰的看到弊端 虽然很顺利的匹出一个进程去执行另外一个应用 但是我们拿不到这个新
  • 新冠造成的经济崩溃对女性影响最大

    Yui Koizumi 化名 曾经过的挺不错的 大学毕业后她进入了一家广告公司 人生逐渐走上正轨 今年3月的时候 她收到了公司发来的邮件 公司暂时要关闭 不过她无须担心 因为收到了一些补偿金 一旦COVID 19疫情缓解了 公司就又会开张营
  • 23 KVM管理虚拟机-使用VNC密码登录虚拟机

    文章目录 23 KVM管理虚拟机 使用VNC密码登录虚拟机 23 1 概述 23 2 前提条件 23 3 操作步骤 23 KVM管理虚拟机 使用VNC密码登录虚拟机 本章介绍使用VNC密码登录虚拟机的方法 23 1 概述 当虚拟机操作系统安
  • IDEA安装MybatisX插件及使用

    打开idea File gt Setting gt Plugins gt Marketplace gt 搜索 mybatis 出现MybatisX选择点击Install gt Apply gt OK 提示重启即可 图示如下 在IDEA中使用
  • 机械硬盘无法弹出的问题:进程 ID 为 4 的应用程序 System 已停止删除或弹出设备

    一般的解决方法 此电脑单机右键选管理 1 计算机管理 gt 系统工具 gt 事件查看器 gt 自定义视图 gt 管理事件 2 在日期与事件进行排序找到最新的事件 3 合理的关掉这个程序 直接结束进程 保存相关文档后关闭 Word 等程序 另
  • android sdk 64bit,Android SDK不安裝在win 7 64位上。

    I am trying to install Android SDK on windows 7 64 bit but it doesn t work I keep getting this screen 我正在嘗試在windows 7 64
  • android中卡号输入框控件(每四位用空格分隔)(解决输入法跳转的问题)

    由于项目的需求 需要在卡号输入时 每四位用空间分隔 于是就写了个控件 该控件支持中间删除 中间增加 粘贴 末尾输入等 光标的位置显示正确 主要的思想就是 对于添加TextWatcher监听Text的改变 text改变后 拿到该text 将t
  • python爬取51job简历查看信息

    python 爬虫 51job简历 存储历史 效果展示 脚本实现 linux 定时任务 查看定时任务是否添加成功 查看定时任务日志 运行常见问题 1 No module named requests 解决方法 2 No module nam
  • kafka的简单实例

    关于kafka的安装 我主要是在windows下部署的 大家可以看这一篇 https blog csdn net woshixiazaizhe article details 80610432 然后后台启动这个kafka 进入到kafka的
  • 朝圣Java(问题集锦)之:The Apache Tomcat installation at this directory is version 8.5.32. A Tomcat 8.0 inst...

    最近开始学Java了 有C 底子 但是学起来Java还是很吃力 感觉别人架好了各种包 自己只要调用就行了 结果还有各种bug出现 掩面中 启动Tomcat的时候 报错The Apache Tomcat installation at thi
  • Dubbo通信模型

    Dubbo和通信结合 通信实现 服务的发布过程使用通信功能 Protocol export 时会为每个服务创建一个Server 服务的引用过程使用通信功能 Protocol refer 时会创建一个Client 整个类结构及调用关系如下 从