什么是RPC并实现一个简单的RPC

2023-11-18

1. 基本的RPC模型

主要介绍RPC是什么,基本的RPC代码,RPC与REST的区别,gRPC的使用

1.1 基本概念

  • RPC(Remote Procedure Call)远程过程调用,简单的理解是一个节点请求另一个节点提供的服务
  • 本地过程调用:如果需要将本地student对象的age+1,可以实现一个addAge()方法,将student对象传入,对年龄进行更新之后返回即可,本地方法调用的函数体通过函数指针来指定。
  • 远程过程调用:上述操作的过程中,如果addAge()这个方法在服务端,执行函数的函数体在远程机器上,如何告诉机器需要调用这个方法呢?
  1. 首先客户端需要告诉服务器,需要调用的函数,这里函数和进程ID存在一个映射,客户端远程调用时,需要查一下函数,找到对应的ID,然后执行函数的代码。
  2. 客户端需要把本地参数传给远程函数,本地调用的过程中,直接压栈即可,但是在远程调用过程中不再同一个内存里,无法直接传递函数的参数,因此需要客户端把参数转换成字节流,传给服务端,然后服务端将字节流转换成自身能读取的格式,是一个序列化和反序列化的过程。
  3. 数据准备好了之后,如何进行传输?网络传输层需要把调用的ID和序列化后的参数传给服务端,然后把计算好的结果序列化传给客户端,因此TCP层即可完成上述过程,gRPC中采用的是HTTP2协议。
    总结一下上述过程:
// Client端 
//    Student student = Call(ServerAddr, addAge, student)
1. 将这个调用映射为Call ID。
2. 将Call ID,student(params)序列化,以二进制形式打包
3. 把2中得到的数据包发送给ServerAddr,这需要使用网络传输层
4. 等待服务器返回结果
5. 如果服务器调用成功,那么就将结果反序列化,并赋给student,年龄更新

// Server端
 - 在本地维护一个Call ID到函数指针的映射call_id_map,可以用Map<String, Method> callIdMap
 - 等待客户端请求
 - 得到一个请求后,将其数据包反序列化,得到Call ID
 - 通过在callIdMap中查找,得到相应的函数指针
 - 将student(params)反序列化后,在本地调用addAge()函数,得到结果
 - 将student结果序列化后通过网络返回给Client

在这里插入图片描述

  • 在微服务的设计中,一个服务A如果访问另一个Module下的服务B,可以采用HTTP
    REST传输数据,并在两个服务之间进行序列化和反序列化操作,服务B把执行结果返回过来。

在这里插入图片描述

  • 由于HTTP在应用层中完成,整个通信的代价较高,远程过程调用中直接基于TCP进行远程调用,数据传输在传输层TCP层完成,更适合对效率要求比较高的场景,RPC主要依赖于客户端和服务端之间建立Socket链接进行,底层实现比REST更复杂。

1.2 rpc demo

系统类图
系统调用过程

客户端:

public class RPCClient<T> {
    public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr) {
        // 1.将本地的接口调用转换成JDK的动态代理,在动态代理中实现接口的远程调用
        return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface},
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        Socket socket = null;
                        ObjectOutputStream output = null;
                        ObjectInputStream input = null;
                        try{
                            // 2.创建Socket客户端,根据指定地址连接远程服务提供者
                            socket = new Socket();
                            socket.connect(addr);

                            // 3.将远程服务调用所需的接口类、方法名、参数列表等编码后发送给服务提供者
                            output = new ObjectOutputStream(socket.getOutputStream());
                            output.writeUTF(serviceInterface.getName());
                            output.writeUTF(method.getName());
                            output.writeObject(method.getParameterTypes());
                            output.writeObject(args);

                            // 4.同步阻塞等待服务器返回应答,获取应答后返回
                            input = new ObjectInputStream(socket.getInputStream());
                            return input.readObject();
                        }finally {
                            if (socket != null){
                                socket.close();
                            }
                            if (output != null){
                                output.close();
                            }
                            if (input != null){
                                input.close();
                            }
                        }
                    }
                });
    }
}

服务端:

public class ServiceCenter implements Server {

    private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    private static final HashMap<String, Class> serviceRegistry = new HashMap<String, Class>();

    private static boolean isRunning = false;

    private static int port;


    public ServiceCenter(int port){
        ServiceCenter.port = port;
    }


    @Override
    public void start() throws IOException {
        ServerSocket server = new ServerSocket();
        server.bind(new InetSocketAddress(port));
        System.out.println("Server Start .....");
        try{
            while(true){
                executor.execute(new ServiceTask(server.accept()));
            }
        }finally {
            server.close();
        }
    }

    @Override
    public void register(Class serviceInterface, Class impl) {
        serviceRegistry.put(serviceInterface.getName(), impl);
    }

    @Override
    public boolean isRunning() {
        return isRunning;
    }

    @Override
    public int getPort() {
        return port;
    }

    @Override
    public void stop() {
        isRunning = false;
        executor.shutdown();
    }
   private static class ServiceTask implements Runnable {
        Socket client = null;

        public ServiceTask(Socket client) {
            this.client = client;
        }

        @Override
        public void run() {
            ObjectInputStream input = null;
            ObjectOutputStream output = null;
            try{
                input = new ObjectInputStream(client.getInputStream());
                String serviceName = input.readUTF();
                String methodName = input.readUTF();
                Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                Object[] arguments = (Object[]) input.readObject();
                Class serviceClass = serviceRegistry.get(serviceName);
                if(serviceClass == null){
                    throw new ClassNotFoundException(serviceName + "not found!");
                }
                Method method = serviceClass.getMethod(methodName, parameterTypes);
                Object result = method.invoke(serviceClass.newInstance(), arguments);

                output = new ObjectOutputStream(client.getOutputStream());
                output.writeObject(result);
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                if(output!=null){
                    try{
                        output.close();
                    }catch (IOException e){
                        e.printStackTrace();
                    }
                }
                if (input != null) {
                    try {
                        input.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (client != null) {
                    try {
                        client.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}
public class ServiceProducerImpl implements ServiceProducer{
    @Override
    public String sendData(String data) {
        return "I am service producer!!!, the data is "+ data;
    }
}
public class RPCTest {
    public static void main(String[] args) throws IOException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Server serviceServer = new ServiceCenter(8088);
                    serviceServer.register(ServiceProducer.class, ServiceProducerImpl.class);
                    serviceServer.start();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        ServiceProducer service = RPCClient.getRemoteProxyObj(ServiceProducer.class, new InetSocketAddress("localhost", 8088));
        System.out.println(service.sendData("test"));
    }
}

1.3 分析

这里客户端只需要知道Server端的接口ServiceProducer即可,服务端在执行的时候,会根据具体实例调用实际的方法ServiceProducerImpl,符合面向对象过程中父类引用指向子类对象。

2. gRPC的使用

2.1. gRPC与REST

  • REST通常以业务为导向,将业务对象上执行的操作映射到HTTP动词,格式非常简单,可以使用浏览器进行扩展和传输,通过JSON数据完成客户端和服务端之间的消息通信,直接支持请求/响应方式的通信。不需要中间的代理,简化了系统的架构,不同系统之间只需要对JSON进行解析和序列化即可完成数据的传递。

  • 但是REST也存在一些弊端,比如只支持请求/响应这种单一的通信方式,对象和字符串之间的序列化操作也会影响消息传递速度,客户端需要通过服务发现的方式,知道服务实例的位置,在单个请求获取多个资源时存在着挑战,而且有时候很难将所有的动作都映射到HTTP动词。

  • 正是因为REST面临一些问题,因此可以采用gRPC作为一种替代方案,gRPC 是一种基于二进制流的消息协议,可以采用基于Protocol
    Buffer的IDL定义grpc
    API,这是Google公司用于序列化结构化数据提供的一套语言中立的序列化机制,客户端和服务端使用HTTP/2以Protocol
    Buffer格式交换二进制消息。

  • gRPC的优势是,设计复杂更新操作的API非常简单,具有高效紧凑的进程通信机制,在交换大量消息时效率高,远程过程调用和消息传递时可以采用双向的流式消息方式,同时客户端和服务端支持多种语言编写,互操作性强;不过gRPC的缺点是不方便与JavaScript集成,某些防火墙不支持该协议。

  • 注册中心:当项目中有很多服务时,可以把所有的服务在启动的时候注册到一个注册中心里面,用于维护服务和服务器之间的列表,当注册中心接收到客户端请求时,去找到该服务是否远程可以调用,如果可以调用需要提供服务地址返回给客户端,客户端根据返回的地址和端口,去调用远程服务端的方法,执行完成之后将结果返回给客户端。这样在服务端加新功能的时候,客户端不需要直接感知服务端的方法,服务端将更新之后的结果在注册中心注册即可,而且当修改了服务端某些方法的时候,或者服务降级服务多机部署想实现负载均衡的时候,我们只需要更新注册中心的服务群即可。

RPC调用过程

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

什么是RPC并实现一个简单的RPC 的相关文章

  • Apache 2.4.9 在启用 ssl 模块并设置 ssl 证书后失败

    Apache 在尝试设置 ssl 证书后抛出以下错误 ssl emerg pid 30907 AH02572 Failed to configure at least one certificate and key for localhos
  • PHPunit - 错误

    当 PHPunit 框架不希望发生的错误发生时 测试会停止 PHP 会抛出错误 但 PHPunit 不会记录这是一个错误 我如何确保 PHPunit 将其记录为错误 免责声明 我是 PHPUnit 的新手 我也试图弄清楚 发生错误时会发生什
  • 如何检查号码是否是巴基斯坦用户的手机号码而不是固定电话号码

    我所做的是从开头删除 92 或 0092 并使用以下代码检查它是否是巴基斯坦人的有效手机号码 if preg match 3 0 4 0 9 number 1 Pakistani mobile number else not a pakis
  • PHPExcel下载文件

    我想下载使用 PHPExcel 生成的 Excel 文件 我按照以下代码PHPExcel 强制下载问题 https stackoverflow com questions 26265108 phpexcel force download i
  • 从 FilterControllerEvent 监听器重定向到另一个 Symfony 路由

    我正在尝试设置一个 kernel controller 侦听器 以便在函数返回 true 时重定向到另一个路由 我有可用的路线 但无法使用此路线设置控制器 event gt setController 我收到以下错误 FilterContr
  • Laravel 5.1 中的VerifyCsrfToken.php 第 53 行:(Firefox 浏览器)中出现 TokenMismatchException?

    我试图找出为什么会出现这个错误 即使它是全新安装的 我在我的项目中遇到了这个错误 所以我用谷歌搜索 没有一个答案对我有用 所以我创建了新项目并复制了所有控制器 视图和模型 几个小时后工作正常 再次出现令牌不匹配错误 为什么在 laravel
  • Laravel - 停止并发访问记录

    在 Laravel 中 有什么方法可以停止同时与同一条记录交互 例如 如果用户 A 正在编辑一条记录 那么我同时需要阻止用户 B 编辑同一条记录 注意 我在 Laravel 5 2 中使用 SESSION DRIVER file 目前大约有
  • php表格:每行显示3个单元格[重复]

    这个问题在这里已经有答案了 我看这里 数组放入每行 5 个单元格的表格中 https stackoverflow com questions 9099568 array into a table with 5 cells in each r
  • TOMCAT 6 中的 PHP - 异常

    我一直在努力融入PHP in APACHE TOMCAT 6依照指示second answer为了QUESTION https stackoverflow com questions 779246 run a php app using t
  • php - 我应该加密电子邮件地址吗?

    当用户注册时 我应该将他们的电子邮件按原样存储在数据库中还是对其进行哈希处理 我希望稍后能够解密 那么我应该使用 md5 吗 谢谢你 No md5 is 单向哈希函数 http en wikipedia org wiki Cryptogra
  • Laravel 广播:通知与事件

    我阅读了 laravel 文档Events and Notifications 似乎我们可以触发一个事件 并从该事件中触发 使用ShouldBroadcast接口 将其广播到我理解的 laravel echo 另一方面我们可以使用通知via
  • PHP 中的 Preg_replace

    我想替换 中包含的字符串中的内容content 它是多行等 preg replace 函数应该删除整个 com 没有垫子 蒙特 尝试这个 result preg replace s replacement content subject
  • PHP 中的 NOW() 函数

    是否有 PHP 函数以与 MySQL 函数相同的格式返回日期和时间NOW 我知道如何使用date 但我想问是否有专门用于此的功能 例如 返回 2009 12 01 00 00 00 您可以使用date https www php net m
  • PHP - hash_pbkdf2 函数

    我正在尝试使用此 php 函数执行一个函数来哈希密码 http be php net manual en function hash pbkdf2 php http be php net manual en function hash pb
  • PHP中如何识别服务器IP地址

    PHP中如何识别服务器IP地址 对于服务器 ip 来说是这样的 SERVER SERVER ADDR 这是港口的 SERVER SERVER PORT
  • 使用 DOJO 自动完成文本框

    我正在寻找一种使用 DOJO 进行文本框自动建议的简单方法 我将查询的数据库表 使用 PHP 脚本 以 JSON 形式返回 有超过 100 000 条记录 因此这确实不应该采用 FilteringSelect 或 ComboBox 的形式
  • 如何从字符串中删除所有数字?

    我想删除字符串 0 9 中的所有数字 我写了这段有效的代码 words preg replace 0 words remove numbers words preg replace 1 words remove numbers words
  • mysqli bind_param 中的 NULL 是什么类型?

    我正在尝试将参数绑定到 INSERT INTO MySQLi 准备好的语句 如果该变量存在 否则插入 null 然后我知道 type variable i corresponding variable has type integer d
  • 如何使用 Google 帐户对我们网站中的用户进行身份验证

    如何在我们的网站中使用 Google 帐户对用户进行身份验证 我希望用户重定向到谷歌登录页面 然后将他重定向到我的网站 我想要这个 PHP 实现 你要OAuth http code google com apis accounts docs
  • “pdo_mysql”已禁用,我无法启用它。我在 iMac 7.1 OSX 10.6.8 上安装了 MAMP v. 3.0.4

    pdo mysql 已禁用 我无法启用它 我在 iMac 7 1 OSX 10 6 8 上安装了 MAMP v 3 0 4 在我的 phpinfo 页面上 我可以看到唯一启用的 PDO 是 sqlite 如果我查看 php 5 5 10 扩

随机推荐