基于Netty手撕RPC框架
1.项目结构
2.api
定义一个接口,实现服务调用者与提供者的契约
package com.lagou.rpc.api;
import com.lagou.rpc.pojo.User;
/**
* 用户服务
*/
public interface IUserService {
/**
* 根据ID查询用户
*
* @param id
* @return
*/
User getById(int id);
}
封装User对象
package com.lagou.rpc.pojo;
import lombok.Data;
@Data
public class User {
private int id;
private String name;
}
封装请求对象和返回对象
请求对象:rpc远程调用需要用到的信息:调用的接口类名,方法名,方法参数类型,方法参数列表,另外再加上一个请求id
package com.lagou.rpc.common;
import lombok.Data;
/**
* 封装的请求对象
*/
@Data
public class RpcRequest {
/**
* 请求对象的ID
*/
private String requestId;
/**
* 类名
*/
private String className;
/**
* 方法名
*/
private String methodName;
/**
* 参数类型
*/
private Class<?>[] parameterTypes;
/**
* 入参 Object不需要有泛型
*/
private Object[] parameters;
}
返回对象:相应id,错误码,返回结果
package com.lagou.rpc.common;
import lombok.Data;
/**
* 封装的响应对象
*/
@Data
public class RpcResponse {
/**
* 响应ID
*/
private String requestId;
/**
* 错误信息
*/
private String error;
/**
* 返回的结果
*/
private Object result;
}
3.provider
porovider为一个springboot工程,其中的主要结构有:
- 实现共有接口的UserServiceImpl
- 负责相应请求的NettyServer
- 负责处理Netty的Handler
本案例加入了自定义注解,使用自定义注解标定需要暴露的接口
1.自定义注解
package com.lagou.rpc.provider.anno;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 用于暴露服务接口
*/
@Target(ElementType.TYPE) // 用于类上
@Retention(RetentionPolicy.RUNTIME)//在运行时可以获取到
public @interface RpcService {
}
2.实现UserServiceInterface
package com.lagou.rpc.provider.service;
import com.lagou.rpc.api.IUserService;
import com.lagou.rpc.pojo.User;
import com.lagou.rpc.provider.anno.RpcService;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@RpcService
@Service
public class UserServiceImpl implements IUserService {
Map<Object, User> userMap = new HashMap();
@Override
public User getById(int id) {
if (userMap.size() == 0) {
User user1 = new User();
user1.setId(1);
user1.setName("张三");
User user2 = new User();
user2.setId(2);
user2.setName("李四");
userMap.put(user1.getId(), user1);
userMap.put(user2.getId(), user2);
}
return userMap.get(id);
}
}
3.NettyServer的客户端
此类的主要特征点:
- 实现spring的DisposableBean接口,管理NettyServerHandler的生命周期
- NettyServer的创建流程
package com.lagou.rpc.provider.server;
import com.lagou.rpc.provider.handler.NettyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* Netty的服务端
* 启动服务端监听端口
*/
@Component
public class NettyRpcServer implements DisposableBean {
@Autowired
NettyServerHandler nettyServerHandler;
EventLoopGroup bossGroup = null;
EventLoopGroup workerGroup = null;
public void start(String host, int port) {
try {
//1.创建bossGroup和workerGroup
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
//2.设置启动助手
ServerBootstrap bootstrap = new ServerBootstrap();
//3.设置启动参数
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//添加String的编解码器
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
//添加自定义处理器
ch.pipeline().addLast(nettyServerHandler);
}
});
//绑定ip和端口号
ChannelFuture channelFuture = bootstrap.bind(host, port).sync();
System.out.println("======Netty服务端启动成功======");
//监听通道的关闭状态
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
//关闭资源
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
}
@Override
public void destroy() throws Exception {
//关闭资源
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
}
4.NettyHandler的实现
此类的专注点:
- NettyServerHander需要继承自SimpleChannelInBoundHandler
- 实现ApplicationContextAware接口,用于缓存标注了@RpcService注解的方法并缓存
- 标注@ChannelHandler.Sharable,设置通道共享
- 通道的读取方法为 channelRead0
package com.lagou.rpc.provider.handler;
import com.alibaba.fastjson.JSON;
import com.lagou.rpc.common.RpcRequest;
import com.lagou.rpc.common.RpcResponse;
import com.lagou.rpc.provider.anno.RpcService;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.springframework.beans.BeansException;
import org.springframework.cglib.reflect.FastClass;
import org.springframework.cglib.reflect.FastMethod;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* 自定义业务处理类
* 1.将标有@RpcService的注解的bean进行缓存
* 2.接收客户端的请求
* 3.根据传递过来的beanName从缓存中查找
* 4.通过反射调用bean的方法
* 5.给客户端响应
*/
@Component
@ChannelHandler.Sharable //设置通道共享
public class NettyServerHandler extends SimpleChannelInboundHandler<String> implements ApplicationContextAware {
static Map<String, Object> SERVICE_INSTANCE_MAP = new HashMap<>();
/**
* 1.将标有@RpcService的注解的bean进行缓存
*
* @param applicationContext
* @throws BeansException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//1.1 通过注解获取bean的集合
Map<String, Object> serviceMap = applicationContext.getBeansWithAnnotation(RpcService.class);
//1.2 循环遍历
Set<Map.Entry<String, Object>> entries = serviceMap.entrySet();
for (Map.Entry<String, Object> entry : entries) {
Object serviceBean = entry.getValue();
if (serviceBean.getClass().getInterfaces().length == 0) {
throw new RuntimeException("对外暴露的服务必须实现接口");
}
//默认处理第一个作为缓存bean的名字
String serviceName = serviceBean.getClass().getInterfaces()[0].getName();
SERVICE_INSTANCE_MAP.put(serviceName, serviceBean);
System.out.println(SERVICE_INSTANCE_MAP);
}
}
/**
* 通道读取就绪事件--读取客户端的消息
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 2.接收客户端的请求
RpcRequest rpcRequest = JSON.parseObject(msg, RpcRequest.class);
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setRequestId(rpcRequest.getRequestId());
//业务处理
try {
rpcResponse.setResult(handler(rpcRequest));
} catch (Exception e) {
e.printStackTrace();
rpcResponse.setError(e.getMessage());
}
//5.给客户端响应
ctx.writeAndFlush(JSON.toJSONString(rpcResponse));
}
private Object handler(RpcRequest rpcRequest) throws InvocationTargetException {
//3.根据传递过来的beanName从缓存中查找
Object serviceBean = SERVICE_INSTANCE_MAP.get(rpcRequest.getClassName());
if (serviceBean == null) {
throw new RuntimeException("服务端没有找到服务");
}
// 4.通过反射调用bean的方法
FastClass proxyClass = FastClass.create(serviceBean.getClass());
FastMethod method = proxyClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
return method.invoke(serviceBean, rpcRequest.getParameters());
}
}
5.启动类
CommandLineRunner的用法
package com.lagou.rpc.provider;
import com.lagou.rpc.provider.server.NettyRpcServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ServerBootstrapApplication implements CommandLineRunner {
@Autowired
NettyRpcServer rpcServer;
public static void main(String[] args) {
SpringApplication.run(ServerBootstrapApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
new Thread(new Runnable() {
@Override
public void run() {
rpcServer.start("127.0.0.1", 8899);
}
}).start();
}
}
4.consumer
1.自定义接口
根据注解暴露接口
package com.lagou.rpc.consumer.anno;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 引用代理类
*/
@Target(ElementType.FIELD)//作用于字段
@Retention(RetentionPolicy.RUNTIME) // 在运行时可以获取得到
public @interface RpcReference {
}
2.创建client
client的功能有:
-
连接客户端:实现spring的InitializingBean接口
-
关闭资源:实现spring的DisposableBean的接口
-
提供发送消息的方法send()
:使用线程池实现该功能,将需要发送的消息set到nettyHandler中,在handler中进行处理。此处的接口有点组合
设计模式的意思,将各个需要的功能组合到一个文件中进行处理。将具体消息的发送的实现
放到handler中的原因:handler需要继承SimpleChannelInboundHandler类,该类中有channelRead0
接受服务端返回消息的方法,channelActive
通道建立连接时候获取ChannelHandlerContext
的方法。通过这两个方法可以实现消息的发送以及接口,send()
对消息的发送与接收进行封装,供其他地方进行调用。另外,handler可以实现Callable接口,便于使用线程池。
package com.rpc.consumer.client;
import com.rpc.consumer.handler.NettyRpcClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* Netty客户端
* 1.连接服务端
* 2.关闭资源
* 3.提供发送消息的方法
*/
@Component
public class NettyRpcClient implements InitializingBean, DisposableBean {
EventLoopGroup group = null;
Channel channel = null;
@Autowired
NettyRpcClientHandler nettyRpcClientHandler;
ExecutorService service = Executors.newCachedThreadPool();
/**
* 1.连接服务端
*
* @throws Exception
*/
@Override
public void afterPropertiesSet() throws Exception {
try {
//1.1 创建线程组
group = new NioEventLoopGroup();
//1.2 创建客户端启动助手
Bootstrap bootstrap = new Bootstrap();
//1.3 设置参数
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//添加编解码器
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
//添加自定处理类
ch.pipeline().addLast(nettyRpcClientHandler);
}
});
//1.4 连接服务
channel = bootstrap.connect("127.0.0.1", 8899).sync().channel();
} catch (Exception e) {
e.printStackTrace();
if (channel != null) {
channel.close();
}
if (group != null) {
group.shutdownGracefully();
}
}
}
@Override
public void destroy() throws Exception {
if (channel != null) {
channel.close();
}
if (group != null) {
group.shutdownGracefully();
}
}
/**
* 消息发送
*
* @param msg
* @return
*/
public Object send(String msg) throws ExecutionException, InterruptedException {
nettyRpcClientHandler.setReqMsg(msg);
Future submit = service.submit(nettyRpcClientHandler);
return submit.get();
}
}
3.创建handler
该类主要实现具体功能。
- 通过继承SimpleChannelInboundHandler类,该类中有
channelRead0
接受服务端返回消息的方法,获取到返回的消息以后在call()
方法中进行返回;channelActive
通道建立连接时候获取ChannelHandlerContext
,获取到以后通过。通过这两个方法可以实现消息的发送以及接口context.writeAndFlush(reqMsg)
将消息发送到服务端,reqMsg可以通过set方法进行赋值;
- 实现Callable接口,使用线程池
- 注意消息发送以后,线程要进行阻塞;
package com.rpc.consumer.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.springframework.stereotype.Component;
import java.util.concurrent.Callable;
/**
* 客户端业务处理类
*/
@Component
public class NettyRpcClientHandler extends SimpleChannelInboundHandler<String> implements Callable {
ChannelHandlerContext context;
private String reqMsg;//发送消息
private String respMsg;//接收消息
public void setReqMsg(String reqMsg) {
this.reqMsg = reqMsg;
}
/**
* 通道读取就绪事件--读取服务端消息
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected synchronized void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
respMsg = msg;
//唤醒等待线程
notify();
}
/***
* 通道连接就绪事件
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
context = ctx;
}
/**
* 给服务端发送消息
*
* @return
* @throws Exception
*/
@Override
public synchronized Object call() throws Exception {
context.writeAndFlush(reqMsg);
//将线程处于等待状态
wait();
return respMsg;
}
}
4.创建proxy
此功能主要创建代理类,这个代理类封装了从消息发送到消息返回的过程
package com.rpc.consumer.proxy;
import com.alibaba.fastjson.JSON;
import com.rpc.common.RpcRequest;
import com.rpc.common.RpcResponse;
import com.rpc.consumer.client.NettyRpcClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* 客户端的代理
*/
@Component
public class RpcClientProxy {
@Autowired
NettyRpcClient rpcClient;
Map<Class, Object> SERVICE_PROXY = new HashMap<>();
/**
* 获取代理对象
*
* @return
*/
public Object getProxy(Class serviceClass) {
//从缓存中查找
Object proxy = SERVICE_PROXY.get(serviceClass);
if (proxy == null) {
//创建代理对象
proxy = Proxy.newProxyInstance(this.getClass().getClassLoader(),
new Class[]{serviceClass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//1.封装请求对象
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setRequestId(UUID.randomUUID().toString());
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameterTypes(method.getParameterTypes());
rpcRequest.setParameters(args);
try {
//2.发送消息
Object msg = rpcClient.send(JSON.toJSONString(rpcRequest));
//3.将消息转化
RpcResponse rpcResponse = JSON.parseObject(msg.toString(), RpcResponse.class);
if (rpcResponse.getError() != null) {
throw new RuntimeException(rpcResponse.getError());
}
if (rpcResponse.getResult() != null) {
return JSON.parseObject(rpcResponse.getResult().toString(),
method.getReturnType());
}
return null;
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
});
//放入缓存
SERVICE_PROXY.put(serviceClass, proxy);
return proxy;
} else {
return proxy;
}
}
}
5.创建controller
package com.lagou.rpc.consumer.controller;
import com.lagou.rpc.api.IUserService;
import com.lagou.rpc.consumer.anno.RpcReference;
import com.lagou.rpc.pojo.User;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 用户控制类
*/
@RestController
@RequestMapping("/user")
public class UserController {
@RpcReference
IUserService userService;
@RequestMapping("/getUserById")
public User getUserById(int id) {
return userService.getById(id);
}
}
6.创建processor
在controller中进行注入时,使用自定义的注解@RpcReference
进行标注,在这个方法进行注入。
package com.rpc.consumer.processor;
import com.rpc.consumer.anno.RpcReference;
import com.rpc.consumer.proxy.RpcClientProxy;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;
import java.lang.reflect.Field;
/**
* bean的后置增强
*/
@Component
public class MyBeanPostProcessor implements BeanPostProcessor {
@Autowired
RpcClientProxy rpcClientProxy;
/**
* 自定义注解的注入
*
* @param bean
* @param beanName
* @return
* @throws BeansException
*/
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
//1.查看bean的字段中有没有对应注解
Field[] declaredFields = bean.getClass().getDeclaredFields();
for (Field field : declaredFields) {
//2.查找字段中是否包含这个注解
RpcReference annotation = field.getAnnotation(RpcReference.class);
if (annotation != null) {
//3.获取代理对象
Object proxy = rpcClientProxy.getProxy(field.getType());
try {
//4.属性注入
field.setAccessible(true);
field.set(bean, proxy);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
return bean;
}
}
7.创建启动类
正常启动
package com.rpc.consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ClientBootStrapApplication {
public static void main(String[] args) {
SpringApplication.run(ClientBootStrapApplication.class, args);
}
}