【手写一个RPC框架】simpleRPC-03

2023-11-18

在这里插入图片描述

本项目所有代码可见:https://github.com/weiyu-zeng/SimpleRPC

前言

我们将新写一个服务接口:通过某个id查询blog信息

我们本次改进希望重构服务端server的代码,使得server可以容纳多个service接口的调用。我们将为此通过HashMap构建一个service的映射关系。

另外原来的server处理请求是用BIO的监听,捕获消息之后直接new一个线程来处理,我们这里将使用一个线程池来代替直接new 一个线程,实现线程的有效管理。

实现

项目创建

我们创建一个名为 simpleRPC-03 的module:

在这里插入图片描述

依赖配置

跟simpleRPC-02一样:

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>SimpleRPC</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>simpleRPC-03</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
            <scope>provided</scope>
        </dependency>

    </dependencies>
</project>

common

RPCRequest.java和simpleRPC-02一样:

package com.rpc.common;


import lombok.Builder;
import lombok.Data;

import java.io.Serializable;


/**
 * 客户端请求的抽象(接口名,方法名,参数,参数类型)
 *
 * 在上个例子中,我们的Request仅仅只发送了一个id参数过去,这显然是不合理的,
 * 因为服务端不会只有一个服务一个方法,因此只传递参数服务端不会知道调用那个方法
 *
 * 因此一个RPC请求中,client发送应该是需要调用的Service接口名,方法名,参数,参数类型
 * 这样服务端就能根据这些信息根据反射调用相应的方法
 * 使用java自带的序列化方式(实现接口)
 */
@Data
@Builder
public class RPCRequest implements Serializable {
    // 服务类名,客户端只知道接口名,在服务端中用接口名指向实现类
    private String interfaceName;
    // 方法名
    private String methodName;
    // 参数列表
    private Object[] params;
    // 参数类型
    private Class<?>[] paramsTypes;
}

RPCResponse.java和simpleRPC-02一样:

package com.rpc.common;

import lombok.Builder;
import lombok.Data;

import java.io.Serializable;

/**
 * 服务器端回应的抽象
 *
 * 上个例子中response传输的是User对象,显然在一个应用中我们不可能只传输一种类型的数据
 * 由此我们将传输对象抽象成为Object
 * RPC需要经过网络传输,有可能失败,类似于http,引入状态码和状态信息表示服务调用成功还是失败
 */
@Data
@Builder
public class RPCResponse implements Serializable {

    // 状态信息
    private int code;

    private String message;
    // 具体数据
    private Object data;

    public static RPCResponse success(Object data) {
        return RPCResponse.builder().code(200).data(data).build();
    }

    public static RPCResponse fail() {
        return RPCResponse.builder().code(500).message("服务器发生错误").build();
    }
}

User.java和simpleRPC-02一样:

package com.rpc.common;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;


/**
 * @author zwy
 *
 * 定义简单User信息
 *
 * 要使用lombok,IDEA必须也安装lombok插件,否则用不了。
 */

@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
    // 客户端和服务端共有的
    private Integer id;
    private String userName;
    private Boolean sex;
}

我们添加一个新的实例:Blog.java

package com.rpc.common;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;


@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Blog implements Serializable {
    private Integer id;
    private Integer useId;
    private String title;
}

service

UserService.java和simpleRPC-02一样:

package com.rpc.service;

import com.rpc.common.User;

/**
 * @author zwy
 *
 * 服务器端提供服务的方法的接口
 */
public interface UserService {

    // 客户端通过这个接口调用服务端的实现类
    User getUserByUserId(Integer id);

    // 给这个服务增加一个功能
    Integer insertUserId(User user);
}

UserServiceImpl.java和simpleRPC-02:

package com.rpc.service;

import com.rpc.common.User;


/**
 * @author zwy
 *
 * 服务器端提供服务的方法
 */
public class UserServiceImpl implements UserService {

    @Override
    public User getUserByUserId(Integer id) {
        // 模拟从数据库中取用户的行为
        User user = User.builder()
                        .id(id)
                        .userName("he2121")
                        .sex(true).build();
        System.out.println("客户端查询了" + id + "的用户");
        return user;
    }

    @Override
    public Integer insertUserId(User user) {
        System.out.println("插入数据成功: " + user);
        return 1;
    }
}

添加新的服务:

BlogService.java

package com.rpc.service;

import com.rpc.common.Blog;

public interface BlogService {

    Blog getBlogById(Integer id);
}


BlogServiceImpl.java

package com.rpc.service;

import com.rpc.common.Blog;

public class BlogServiceImpl implements BlogService {

    @Override
    public Blog getBlogById(Integer id) {
        Blog blog = Blog.builder()
                        .id(id)
                        .title("我的博客")
                        .useId(22).build();
        System.out.println("客户端查询了" + id + "博客");
        return blog;
    }
}

ServiceProvider.java

package com.rpc.service;


import java.util.HashMap;
import java.util.Map;

/**
 * @author zwy
 *
 * ServiceProvider 存放服务接口名与服务端对应的实现类(本质是hashmap),服务启动时要暴露其相关的实现类
 * 根据request中的interface调用服务端中相关实现类。
 */
public class ServiceProvider {
    /**
     * 一个实现类可能实现多个接口
     */
    private Map<String, Object> interfaceProvider;

    // 构造函数,初始化一个空的 hashmap 赋给 Map<String, Object> interfaceProvider
    public ServiceProvider() {
        this.interfaceProvider = new HashMap<>();
    }

    public void provideServiceInterface(Object service) {
        // 反射,.getClass().getInterfaces()得到class的interface,按照interfaces name(key)和object(value)存入map
        Class<?>[] interfaces = service.getClass().getInterfaces();
        for (Class clazz : interfaces) {
            interfaceProvider.put(clazz.getName(), service);
        }
    }

    public Object getService(String interfaceName) {
        return interfaceProvider.get(interfaceName);  // 通过interface name得到object
    }
}

client

client在simpleRPC-03中基本没改动,只是在RPCClient.java中增加了调用的方法。

IOClient.java

package com.rpc.client;


import com.rpc.common.RPCRequest;
import com.rpc.common.RPCResponse;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;


/**
 * @author zwy
 *
 * IO Client:底层的通信
 * 通过Socket和输出流把 RPCRequest 传给服务器端,接收到服务器端传来的 RPCResponse,返回这个 RPCResponse
 */
public class IOClient {

    public static RPCResponse sendRequest(String host, int port, RPCRequest request) throws IOException, ClassNotFoundException {

        // 老样子,创建Socket对象,定义host和port
        Socket socket = new Socket(host, port);

        // 定义输入输出流对象
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
        ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
        System.out.println("request: " + request);

        // 输出流写入request对对象,刷新输出流
        objectOutputStream.writeObject(request);
        objectOutputStream.flush();

        // 通过输入流的readObject方法,得到服务器端传来的RPCResponse,并返回RPCResponse对象
        RPCResponse response = (RPCResponse) objectInputStream.readObject();

        return response;

    }
}

ClientProxy.java

package com.rpc.client;


import com.rpc.common.RPCRequest;
import com.rpc.common.RPCResponse;
import lombok.AllArgsConstructor;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

/**
 * @author zwy
 *
 * 客户端代理:把动态代理封装request对象
 *
 * '@AllArgsConstructor':它是lombok中的注解。使用后添加一个构造函数,该构造函数含有所有已声明字段属性参数
 *                        (这也就是为什么ClientProxy明明没定义构造函数,但RPCClient还可以再创建ClientProxy时,
 *                        通过构造函数传参给 host 和 port。)
 * java动态代理机制中有两个重要的类和接口InvocationHandler(接口)和Proxy(类):也是实现动态代理的核心
 * InvocationHandler接口:是proxy代理实例的调用处理程序实现的一个接口,每一个proxy代理实例都有一个关联的调用处理程序,
 *                        在代理实例调用方法时,方法调用被编码分派到调用处理程序的invoke方法。
 *                        每一个动态代理类的调用处理程序都必须实现InvocationHandler接口,并且每个代理类的实例都关联到了
 *                        实现该接口的动态代理类调用处理程序中,当我们通过动态代理对象调用一个方法时候,这个方法的调用
 *                        就会被转发到实现InvocationHandler接口类的invoke方法来调用
 * Proxy:该类用于动态生成代理类,只需传入目标接口、目标接口的类加载器以及InvocationHandler便可为目标接口生成代理类及代理对象
 * Proxy.newProxyInstance:该方法用于为指定类装载器、一组接口及调用处理器生成动态代理类实例
 */
@AllArgsConstructor
public class ClientProxy implements InvocationHandler {

    private String host;

    private int port;

    /**
     * 动态代理,每一次代理对象调用方法,会经过此方法增强(反射获取request对象,socket发送至客户端)
     */
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        // 构建RPCRequest对象,初始化其中的四个重要参数,使用了lombok中的builder。
        // 初始化interfaceName。初始化methodName,初始化params,,初始化paramsTypes
        RPCRequest request = RPCRequest.builder()
                                       .interfaceName(method.getDeclaringClass().getName())
                                       .methodName(method.getName())
                                       .params(args)
                                       .paramsTypes(method.getParameterTypes())
                                       .build();

        // 调用IOClient,通过输入输出流进行request的数据传输,并返回服务器端传来的response
        RPCResponse response = IOClient.sendRequest(host, port, request);
        System.out.println("response: " + response);

        return response.getData();  // 获取RPCResponse中的目标数据(因为RPCResponse中除了目标数据,还有状态码和状态信息这些非目标数据)
    }

    /**
     * 传入Client需要的服务的class反射对象
     */
    <T> T getProxy(Class<T> clazz) {
        // 传入目标接口的类加载器,目标接口,和InvocationHandler(的实现类,也就是本类,this),生成动态代理类实例
        Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
        return (T)o;
    }
}

RPCClient.java

package com.rpc.client;


import com.rpc.common.Blog;
import com.rpc.common.User;
import com.rpc.service.BlogService;
import com.rpc.service.UserService;

/**
 * @author weiyu_zeng
 *
 * RPC客户端:调用服务器端的方法
 */
public class RPCClient {

    public static void main(String[] args) {

        ClientProxy rpcClientProxy = new ClientProxy ("127.0.0.1", 8899);  // 初始化host和port
        UserService proxy = rpcClientProxy .getProxy(UserService.class);

        // 服务的方法1
        User userByUserId = proxy.getUserByUserId(10);
        System.out.println("从服务器端得到的user为:" + userByUserId);

        // 服务的方法2
        User user = User.builder().userName("张三").id(100).sex(true).build();
        Integer integer = proxy.insertUserId(user);
        System.out.println("向服务器端插入数据" + integer);

        // 服务的方法3
        BlogService blogService = rpcClientProxy.getProxy(BlogService.class);
        Blog blogById = blogService.getBlogById(10000);
        System.out.println("从服务端得到的blog为:" + blogById);
    }
}

server

我们规范RPCServer的方法,定义接口RPCServer.java

package com.rpc.server;

/**
 * @author zwy
 *
 * RPC服务器端:接受,解析request,封装,发送response
 *
 */
public interface RPCServer {
    void start(int port);
    void stop();
}

定义线程池版本的服务器端实现:ThreadPoolRPCRPCServer.java

package com.rpc.server;

import com.rpc.service.ServiceProvider;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


/**
 * @author zwy
 *
 * 线程池版服务端的实现
 */
public class ThreadPoolRPCRPCServer implements RPCServer {

    private final ThreadPoolExecutor threadPool;
    private ServiceProvider serviceProvider;

    // 默认构造函数:函数里默认初始化ThreadPoolExecutor线程池,初始化serviceProvider
    public ThreadPoolRPCRPCServer(ServiceProvider serviceProvider) {
        threadPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
                                            1000, 60, TimeUnit.SECONDS,
                                            new ArrayBlockingQueue<>(100));
        this.serviceProvider = serviceProvider;
    }

    // 自定义构造函数:函数里自己初始化ThreadPoolExecutor线程池,初始化serviceProvider
    public ThreadPoolRPCRPCServer(ServiceProvider serviceProvider, int corePoolSize, int maximumPoolSize,
                                  long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.serviceProvider = serviceProvider;
    }

    @Override
    public void start(int port) {
        System.out.println("线程池服务器端启动了");
        try {
            ServerSocket serverSocket = new ServerSocket(port);
            while (true) {
                Socket socket = serverSocket.accept();
                threadPool.execute(new WorkThread(socket, serviceProvider));  // threadPool提交任务
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void stop() {
    }
}

我们实现单线程要做的任务,也是我们之前一直在写的RPC逻辑:WorkThread.java

package com.rpc.server;

import com.rpc.common.RPCRequest;
import com.rpc.common.RPCResponse;
import com.rpc.service.ServiceProvider;
import lombok.AllArgsConstructor;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;


/**
 * WorkThread
 *
 * 本线程专门负责解析得到的request请求,执行服务方法,返回给客户端
 * 1. 从request得到interfaceName
 * 2. 根据interfaceName在serviceProvide Map中获取服务端的实现类
 * 3. 从request中得到方法名,参数, 利用反射执行服务中的方法
 * 4. 得到结果,封装成response,写入socket
 */
@AllArgsConstructor
public class WorkThread implements Runnable {

    private Socket socket;
    private ServiceProvider serviceProvider;

    @Override
    public void run() {
        try {
            ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
            ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
            // 读取客户端传过来的request
            RPCRequest request = (RPCRequest) ois.readObject();
            // 反射调用服务方法获得返回值
            RPCResponse response = getResponse(request);
            // 写入客户端
            oos.writeObject(response);
            oos.flush();
        } catch (IOException | ClassNotFoundException e) {
            e.printStackTrace();
            System.out.println("从IO中读取错误数据");
        }
    }

    private RPCResponse getResponse(RPCRequest request) {
        // 得到服务名
        String interfaceName = request.getInterfaceName();
        // 通过serviceProvider得到服务器端相应的服务实现类
        Object service = serviceProvider.getService(interfaceName);
        // 反射调用方法
        Method method = null;

        try {
            method = service.getClass().getMethod(request.getMethodName(), request.getParamsTypes());
            Object invoke = method.invoke(service, request.getParams());
            return RPCResponse.success(invoke);
        } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
            e.printStackTrace();
            System.out.println("方法执行错误");
            return RPCResponse.fail();
        }
    }
}

最后我们实现服务器TestServer.java

package com.rpc.server;


import com.rpc.service.*;

public class TestServer {
    public static void main(String[] args) {
        UserService userService = new UserServiceImpl();
        BlogService blogService = new BlogServiceImpl();

//        Map<String, Object> serviceProvide = new HashMap<>();
//        serviceProvide.put("com.ganghuan.myRPCVersion2.service.UserService",userService);
//        serviceProvide.put("com.ganghuan.myRPCVersion2.service.BlogService",blogService);
        ServiceProvider serviceProvider = new ServiceProvider();
        serviceProvider.provideServiceInterface(userService);  // 把userService存入 serviceProvider
        serviceProvider.provideServiceInterface(blogService);  // 把blogService存入 serviceProvider

        RPCServer RPCServer = new ThreadPoolRPCRPCServer(serviceProvider);
        RPCServer.start(8899);
    }
}


文件结构

我们文件结构如下:

在这里插入图片描述

运行

我们先运行TestServer.java
在这里插入图片描述

然后再运行RPCClient.java

在这里插入图片描述

在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【手写一个RPC框架】simpleRPC-03 的相关文章

随机推荐

  • 前端vue可以左右滚动的切换的tabs tabs选项卡 滑动动画效果 自动宽度

    随着技术的发展 开发的复杂度也越来越高 传统开发方式将一个系统做成了整块应用 经常出现的情况就是一个小小的改动或者一个小功能的增加可能会引起整体逻辑的修改 造成牵一发而动全身 通过组件化开发 可以有效实现单独开发 单独维护 而且他们之间可以
  • Feign原理 (图解)

    1 1 简介 Feign远程调用的 Feign远程调用 核心就是通过一系列的封装和处理 将以JAVA注解的方式定义的远程调用API接口 最终转换成HTTP的请求形式 然后将HTTP的请求的响应结果 解码成JAVA Bean 放回给调用者 F
  • namespace命令空间

    目录 1 解决什么问题 2 基本介绍 2 1 定义 2 2 应用场景 3 使用案例 4 资源配额 5 标签 5 1 定义 5 2 pod资源打标签 5 3 查看标签 1 解决什么问题 命令空间类似于C 中的命名空间 当用户数量较多的集群 才
  • 使用docker搭建jupyter notebook/jupyterlab

    说明 由于官方镜像实在是不怎么好用 所以我自己做了一个优化过的jupyter notebook的镜像 notebook hub 使用我这个镜像搭建容器非常简单 下面就基于这个notebook hub来进行搭建 关于notebook hub
  • hive 报system:java.io.tmpdir错误解决

    Exception in thread main java lang IllegalArgumentException java net URISyntaxException Relative path in absolute URI sy
  • 2. IDEA + maven + protobuf配置(on mac)

    1 絮絮叨叨 都说懒惰是人类进步的源泉 有时候想想还真就那么回事 学习了如何使用protoc命令编译 重度依赖IDEA且已经习惯了maven的我 就在想是否能在IDEA中一键编译 proto文件 2 vscode配置protobuf编辑环境
  • pyecharts实现电影数据分析可视化

    根据电影数据 使用pyecharts进行可视化分析 数据介绍 import pandas as pd data pd read csv 电影 csv data head 前5行数据如下 需要安装的python库 pip install pa
  • 2.晶晨A311D-编译Ubuntu/Debian固件

    上面是我的微信和QQ群 欢迎新朋友的加入 参考 https docs khadas com zh cn vim3 FenixScript html 编译环境 我重新安装了ubuntu20 安装软件包 配置环境 sudo apt get in
  • 【数据结构】排序(直接插入、折半插入、希尔、冒泡、快速、直接选择、堆、归并、基数排序)

    一 什么是排序 排序 将一组杂乱无章的数据按一定规律顺次排列起来 即 将无序序列的数据节点包含多个数据域 那么排序往往是针对其中某个域而言 二 排序方法的分类 1 按数据存储介质可分为 内部排序 数据量不大 数据在内存 无需内外存交换数据
  • SQL抽取数据脚本

    sp OutputData IF EXISTS SELECT 1 FROM sys objects o WHERE object id object id N sp OutputData AND OBJECTPROPERTY object
  • vue数据劫持 ajax,Vue视图更新原理 - 数据劫持,最小量更新和DIFF算法

    什么是数据劫持 加入有一个js文件内容如下 var obj x 100 y 200 Object defineProperties obj x set console log You gonna update x the vision wi
  • python smtp发送邮件 附件 中文名乱码 问题

    重点 mime add header Content Disposition attachment filename make header file name UTF 8 encode UTF 8 完整代码可以发送多个附件 import
  • 20220801:强改jar包的一下经历

    提示 文章写完后 目录可以自动生成 如何生成可参考右边的帮助文档 文章目录 前言 一 改jar包内容 二 使用步骤 1 首先是修改的class文件 2 如何替换jar包中的class文件 总结 前言 目标是一个dubbo的服务 我们的工程引
  • VS2008错误Error spawning 'cmd.exe'的解决方法

    解决方法 In the Options go into Projects and Solutions gt VC Directories page and place this rows SystemRoot System32 System
  • Redisson配置类

    学习记录 Redisson配置类 Bean public RedissonClient redissonClient throws IOException ResourceLoader loader new DefaultResourceL
  • Dell台式机重装win 10系统之后开机报错

    电脑品牌 戴尔 报错信息 Hard disk dirve failure 硬盘驱动器故障 trick the F1 key to continue F2 to run the setup utility 报错原因 电脑一开机出现黑屏并出现H
  • Spring面向切面编程-AOP

    前言 在软件开发中 面向切面编程 Aspect Oriented Programming AOP 是一个非常重要的编程范式 Spring AOP是Spring框架提供的AOP实现 在Spring中使用AOP实现企业应用开发已经非常普遍 本文
  • 测试基础-静态白盒测试(检查代码)

    1 静态白盒测试 检查设计和代码 静态测试 测试非运行部分 检验和审查 白盒测试 访问代码 能够查看和审查 静态白盒测试 在不执行软件的条件下有条理地仔细审查软件设计 体系结构和代码 从而找出软件缺陷的过程 有时称为结构化分析 2 正式审查
  • yolov7运行自己的VOC格式数据集

    yolov7运行VOC格式数据集 代码下载 测试开发环境 使用自己的VOC格式数据集训练 修改配置文件yolov7 yaml 修改配置文件voc yaml VOC格式数据集转换COCO格式 开始训练 重头开始 fine train BUG
  • 【手写一个RPC框架】simpleRPC-03

    目录 前言 实现 项目创建 依赖配置 common service client server 文件结构 运行 本项目所有代码可见 https github com weiyu zeng SimpleRPC 前言 我们将新写一个服务接口 通