gRPC实现
借助gRPC,我们可以在一个.proto
文件中定义一次服务,并以gRPC支持的任何语言生成客户端和服务器,而这又可以在从大型数据中心内的服务器到电脑的各种环境中运行– gRPC为您处理不同的语言和环境。还获得了使用协议缓冲区的所有优点,包括有效的序列化,简单的IDL和轻松的接口更新。
grpc中有四种服务类型:
简单rpc:这就是一般的rpc调用,一个请求对象对应一个返回对象
服务端流式rpc:一个请求对象,服务端可以传回多个结果对象
客户端流式rpc:客户端传入多个请求对象,服务端返回一个响应结果
双向流式rpc:结合客户端流式rpc和服务端流式rpc,可以传入多个对象,返回多个响应对象
grpc通过使用流式的方式,返回/接受多个实例可以用于类似不定长数组的入参和出参
准备工作
创建maven项目grpc和生成proto接口项目grpc-protos
grpc-protos项目
在grpc-protos下创建接口文件route_guide.proto,内容如下
// 使用proto3语法
syntax = "proto3";
// 生成多个类
option java_multiple_files = true;
// 生成java类所在的包
option java_package = "io.grpc.examples.routeguide";
// 生成外层类类名
option java_outer_classname = "RouteGuideProto";
// Objective-C类的前缀
option objc_class_prefix = "RTG";
// proto包名
package routeguide;
// 定义RPC服务RouteGuide
service RouteGuide {
// 简单RPC接受SimpleRpcReq参数返回SimpleRpcRes类型对象
rpc GetFeature(SimpleRpcReq) returns (SimpleRpcRes) {}
// 服务端到客户端流式RPC,接受ServerToClientStreamRpcReq对象参数,返回批量ServerToClientStreamRpcRes数据
rpc ListFeatures(ServerToClientStreamRpcReq) returns (stream ServerToClientStreamRpcRes) {}
// 客户端到服务端流式RPC,接受批量Point数据,返回RouteSummary类型对象
rpc RecordRoute(stream ClientToServerStreamRpcReq) returns (ClientToServerStreamRpcReq) {}
// 双向流式RPC,接受批量RouteNote类型数据,返回批量RouteNote类型数据
rpc RouteChat(stream TwoWayStreamRpcReq) returns (stream TwoWayStreamRpcRes) {}
}
/** 简单RPC **/
// Point
message SimpleRpcReq {
int32 latitude = 1;
int32 longitude = 2;
}
// Feature
message SimpleRpcRes {
string name = 1;
Point location = 2;
}
/** 简单RPC **/
/** 服务端到客户端流式RPC **/
message ServerToClientStreamRpcReq {
Point lo = 1;
Point hi = 2;
}
// Feature
message ServerToClientStreamRpcRes {
string name = 1;
Point location = 2;
}
/** 服务端到客户端流式RPC **/
/** 客户端到服务端流式RPC **/
// Point
message ClientToServerStreamRpcReq {
Point lo = 1;
Point hi = 2;
}
message ClientToServerStreamRpcRes {
int32 point_count = 1;
int32 feature_count = 2;
int32 distance = 3;
int32 elapsed_time = 4;
}
/** 客户端到服务端流式RPC **/
/** 客户端到服务端流式RPC **/
message TwoWayStreamRpcReq {
Point location = 1;
string message = 2;
}
message TwoWayStreamRpcRes {
Point location = 1;
string message = 2;
}
/** 客户端到服务端流式RPC **/
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
message Feature {
string name = 1;
Point location = 2;
}
grpc项目
pom文件内容如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.starnet</groupId>
<artifactId>grpc</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<java.version>11</java.version>
<grpc.version>1.29.0</grpc.version>
<protobuf.version>3.11.0</protobuf.version>
</properties>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency> <!-- necessary for Java 9+ -->
<groupId>org.apache.tomcat</groupId>
<artifactId>annotations-api</artifactId>
<version>6.0.53</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.5.0.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
<protoSourceRoot>../grpc-protos</protoSourceRoot>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
注意protobuf 插件中配置的protoSourceRoot要是protos接口目录的绝对路径或者相对路径,因为插件需要根据该目录找到proto文件去生成相关代码;如果没有设置protoSourceRoot的话默认为项目的src/main/proto目录
生成grpc protobuf相关类
在grpc目录执行,或者通过Idea直接进行生成
mvn protobuf:compile
mvn protobuf:compile-custom
在target目录下生成了对应的代码
代码实现
服务端
启动gRPC Server
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class GRpcServer {
private Server server;
private void start() throws IOException {
/* The port on which the server should run */
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new RouteGuideService()) //这里可以添加多个模块
.build()
.start();
System.out.println("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("shutting down gRPC server since JVM is shutting down");
try {
GRpcServer.this.stop();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
System.err.println("server shut down");
}
});
}
private void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
GRpcServer server = new GRpcServer();
server.start();
server.blockUntilShutdown();
}
}
gRPC服务端处理
import io.grpc.examples.routeguide.*;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
public class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase {
/**
* 简单RPC接受SimpleRpcReq参数返回SimpleRpcRes类型对象
* @param request
* @param responseObserver
*/
@Override
public void getSimpleRpcRes(SimpleRpcReq request, StreamObserver<SimpleRpcRes> responseObserver) {
Point location = Point.newBuilder()
.setLatitude(request.getLatitude())
.setLongitude(request.getLongitude())
.build();;
SimpleRpcRes simpleRpcRes = SimpleRpcRes
.newBuilder()
.setName("fuzhou")
.setLocation(location)
.build();
// 通过responseObserver.onNext向客户端发送消息
responseObserver.onNext(simpleRpcRes);
// 标记服务端响应完成
responseObserver.onCompleted();
}
/**
* 服务端到客户端流式RPC,接受ServerToClientStreamRpcReq对象参数,返回批量ServerToClientStreamRpcRes数据
* @param request
* @param responseObserver
*/
@Override
public void listServerToClientStreamRpcRes(ServerToClientStreamRpcReq request, StreamObserver<ServerToClientStreamRpcRes> responseObserver) {
List<Point> pointList = new ArrayList<>();
pointList.add(request.getLo());
pointList.add(request.getHi());
int i = 1;
for (Point point : pointList) {
ServerToClientStreamRpcRes res = ServerToClientStreamRpcRes.newBuilder()
.setName("fuzhou" + i++)
.setLocation(point)
.build();
// 循环调用responseObserver.onNext向客户端回调发送数据
responseObserver.onNext(res);
}
responseObserver.onCompleted();
}
/**
* 客户端到服务端流式RPC,接受批量Point数据,返回RouteSummary类型对象
* @param responseObserver
* @return
*/
@Override
public StreamObserver<ClientToServerStreamRpcReq> getClientToServerStreamRpcRes(StreamObserver<ClientToServerStreamRpcRes> responseObserver) {
// 构造观察者与客户端交互
return new StreamObserver<ClientToServerStreamRpcReq>() {
int pointCount;
// 响应客户端
@Override
public void onNext(ClientToServerStreamRpcReq point) {
System.out.println("point is " + point);
pointCount++;
}
@Override
public void onError(Throwable t) {
System.out.println("client to server stream rpc is cancelled");
}
// 在客户端调用requestObserver.onCompleted()时触发,标记服务端处理完成
@Override
public void onCompleted() {
// 回调客户端responseObserver.onNext
responseObserver.onNext(ClientToServerStreamRpcRes.newBuilder().setPointCount(pointCount).build());
// 回调客户端responseObserver.onCompleted标记完成
responseObserver.onCompleted();
}
};
}
/**
* 双向流式RPC,接受批量RouteNote类型数据,返回批量RouteNote类型数据
* @param responseObserver
* @return
*/
@Override
public StreamObserver<TwoWayStreamRpcReq> listTwoWayStreamRpcRes(StreamObserver<TwoWayStreamRpcRes> responseObserver) {
// 构造观察者与客户端交互
return new StreamObserver<TwoWayStreamRpcReq>() {
// 响应客户端
@Override
public void onNext(TwoWayStreamRpcReq point) {
// 回调客户端responseObserver.onNext
responseObserver.onNext(TwoWayStreamRpcRes.newBuilder().setMessage("res" + point.getMessage())
.setLocation(Point.newBuilder()
.setLatitude(point.getLocation().getLatitude() + 100)
.setLongitude(point.getLocation().getLongitude() + 100)
.build())
.build());
}
@Override
public void onError(Throwable t) {
System.out.println("client to server stream rpc is cancelled");
}
// 在客户端调用requestObserver.onCompleted()时触发,标记服务端处理完成
@Override
public void onCompleted() {
// 回调客户端responseObserver.onCompleted标记完成
responseObserver.onCompleted();
}
};
}
}
客户端
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.examples.routeguide.*;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class GRpcClient {
//远程连接管理器,管理连接的生命周期
private final ManagedChannel channel;
private final RouteGuideGrpc.RouteGuideBlockingStub blockingStub;
private final RouteGuideGrpc.RouteGuideStub stub;
Random random = new Random();
public GRpcClient(String host, int port) {
//初始化连接
channel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext()
.build();
//初始化远程服务Stub
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
stub = RouteGuideGrpc.newStub(channel);
}
public void shutdown() throws InterruptedException {
//关闭连接
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
public static void main(String[] args) throws Exception {
GRpcClient client = new GRpcClient("127.0.0.1", 50051);
// 简单gRPC交互
System.out.println("simple rpc start!!!!!!");
SimpleRpcRes simpleRpcRes = client.getSimpleRpcRes(12, 16);
System.out.println("simple rpc response: ");
System.out.println(simpleRpcRes);
System.out.println("simple rpc over!!!!!!");
System.out.println("\n===================================================\n");
// 服务端到客户端流式交互
System.out.println("server to client stream rpc start!!!!!!");
Iterator<ServerToClientStreamRpcRes> serverToClientStreamRpcResIterator = client.listServerToClientStreamRpcRes(11, 22, 666, 777);
System.out.println("server to client stream rpc response: ");
while (serverToClientStreamRpcResIterator.hasNext()) {
System.out.println(serverToClientStreamRpcResIterator.next());
}
System.out.println("server to client stream rpc over!!!!!!");
System.out.println("\n===================================================\n");
// 客户端到服务端流式交互
System.out.println("client to server stream rpc start!!!!!!");
client.getClientToServerStreamRpcRes(10);
System.out.println("client to server stream rpc over!!!!!!");
System.out.println("\n===================================================\n");
// 客户端到服务端流式交互
System.out.println("two way stream rpc start!!!!!!");
client.listTwoWayStreamRpcRes(5);
System.out.println("two way stream rpc over!!!!!!");
//关闭连接
client.shutdown();
}
public SimpleRpcRes getSimpleRpcRes(int lat, int lon) {
// 构造服务调用参数对象
SimpleRpcReq request = SimpleRpcReq.newBuilder().setLatitude(lat).setLongitude(lon).build();
// 调用远程服务方法
return blockingStub.getSimpleRpcRes(request);
}
public Iterator<ServerToClientStreamRpcRes> listServerToClientStreamRpcRes(int lowLag, int lowLon, int highLag, int highLong) {
// 构造服务调用参数对象
ServerToClientStreamRpcReq request = ServerToClientStreamRpcReq.newBuilder()
.setLo(Point.newBuilder().setLatitude(lowLag).setLongitude(lowLon).build())
.setHi(Point.newBuilder().setLatitude(highLag).setLongitude(highLong).build()).build();
// 调用远程服务方法
return blockingStub.listServerToClientStreamRpcRes(request);
}
private void getClientToServerStreamRpcRes(int pointNum) throws Exception {
List<ClientToServerStreamRpcReq> pointList = new ArrayList<>();
for (int i = 1; i <= pointNum; i++) {
pointList.add(ClientToServerStreamRpcReq.newBuilder().setLatitude(i).setLongitude(i).build());
}
CountDownLatch finishLatch = new CountDownLatch(1);
// 创建responseObserver用于服务端回调客户端
StreamObserver<ClientToServerStreamRpcRes> responseObserver = new StreamObserver<ClientToServerStreamRpcRes>() {
// 响应服务端responseObserver.onNext回调
@Override
public void onNext(ClientToServerStreamRpcRes res) {
System.out.println("client to server stream rpc response: ");
System.out.println(res);
}
// 响应服务端responseObserver.onError回调
@Override
public void onError(Throwable t) {
System.out.println("client to server stream is error " + t.getMessage() + ": " + t);
}
// 响应服务端responseObserver.onCompleted的回调
@Override
public void onCompleted() {
System.out.println("client to server stream, server has been over");
finishLatch.countDown();
}
};
// 通过异步存根发起调用,参数为响应观察者responseObserver
StreamObserver<ClientToServerStreamRpcReq> requestObserver = stub.getClientToServerStreamRpcRes(responseObserver);
try {
for (ClientToServerStreamRpcReq point : pointList) {
// Thread.sleep(random.nextInt(1000) + 500);
if (finishLatch.getCount() == 0) {
return;
}
// 多次调用requestObserver.onNext向服务端写入数据
requestObserver.onNext(point);
}
} catch (Exception e) {
requestObserver.onError(e);
}
// 标记客户端写入结束
requestObserver.onCompleted();
// 由于是异步获得结果,所以sleep一秒,不然程序就已经结束了
Thread.sleep(1000);
}
private void listTwoWayStreamRpcRes(int pointNum) throws Exception {
List<TwoWayStreamRpcReq> rpcReqList = new ArrayList<>();
for (int i = 1; i <= pointNum; i++) {
rpcReqList.add(TwoWayStreamRpcReq.newBuilder().setMessage("" + i)
.setLocation(Point.newBuilder().setLatitude(i).setLongitude(i).build())
.build());
}
CountDownLatch finishLatch = new CountDownLatch(1);
// 创建responseObserver用于服务端回调客户端
StreamObserver<TwoWayStreamRpcRes> responseObserver = new StreamObserver<TwoWayStreamRpcRes>() {
// 响应服务端responseObserver.onNext回调
@Override
public void onNext(TwoWayStreamRpcRes res) {
System.out.println("two way stream rpc response: ");
System.out.println(res);
}
// 响应服务端responseObserver.onError回调
@Override
public void onError(Throwable t) {
System.out.println("two way stream is error " + t.getMessage() + ": " + t);
}
// 响应服务端responseObserver.onCompleted的回调
@Override
public void onCompleted() {
System.out.println("two way stream, server has been over");
finishLatch.countDown();
}
};
// 通过异步存根发起调用,参数为响应观察者responseObserver
StreamObserver<TwoWayStreamRpcReq> requestObserver = stub.listTwoWayStreamRpcRes(responseObserver);
try {
for (TwoWayStreamRpcReq point : rpcReqList) {
// Thread.sleep(random.nextInt(1000) + 500);
if (finishLatch.getCount() == 0) {
return;
}
// 多次调用requestObserver.onNext向服务端写入数据
requestObserver.onNext(point);
}
} catch (Exception e) {
requestObserver.onError(e);
}
// 标记客户端写入结束
requestObserver.onCompleted();
// 由于是异步获得结果,所以sleep一秒,不然程序就已经结束了
Thread.sleep(1000);
}
}