rpc-02-grpc

rpc-02-grpc

前言

  • RPC 框架的目标就是让远程服务调用更加简单、透明,RPC 框架负责屏蔽底层的传输方式(TCP 或者 UDP)、序列化方式(XML/Json/ 二进制)和通信细节。服务调用者可以像调用本地接口一样调用远程的服务提供者,而不需要关心底层通信细节和调用过程。

业界主流的 RPC 框架整体上分为三类:

  • 支持多语言的 RPC 框架,比较成熟的有 Google 的 gRPCApache(Facebook)的 Thrift;
  • 只支持特定语言的 RPC 框架,例如新浪微博的 Motan;
  • 支持服务治理等服务化特性的分布式服务框架,其底层内核仍然是 RPC 框架, 例如阿里的 Dubbo。

1. grpc 简介

  • gRPC 是一个高性能、开源和通用的 RPC 框架,面向服务端和移动端,基于 HTTP/2 设计。
1
2
3
4
5
6
特点

1. 语言中立,支持多种语言;
2. 基于 IDL 文件定义服务,通过 proto3 工具生成指定语言的数据结构、服务端接口以及客户端 Stub;
3. 通信协议基于标准的 HTTP/2 设计,支持双向流、消息头压缩、单 TCP 的多路复用、服务端推送等特性,这些特性使得 gRPC 在移动端设备上更加省电和节省网络流量;
4. 序列化支持 PB(Protocol Buffer)和 JSON,PB 是一种语言无关的高性能序列化框架,基于 HTTP/2 + PB, 保障了 RPC 调用的高性能。
  • 但是,通常我们不会去单独使用gRPC,而是将gRPC作为一个部件进行使用
    • 这是因为在生产环境,我们面对大并发的情况下,需要使用分布式系统来去处理,而gRPC并没有提供分布式系统相关的一些必要组件。
    • 而且,真正的线上服务还需要提供包括负载均衡,限流熔断,监控报警,服务注册和发现等等必要的组件。不过,这就不属于本篇文章讨论的主题了,我们还是先继续看下如何使用gRPC。

2. gRpc 原理分析

2.1 服务端创建流程

  • gRPC 服务端创建采用 Build 模式,对底层服务绑定、transportServerNettyServer s的创建和实例化做了封装和屏蔽,让服务调用者不用关心 RPC 调用细节,整体上分为三个过程:

    • 创建 Netty HTTP/2 s服务端;

    • 将需要调用的服务端接口实现类注册到内部的 Registry 中,RPC 调用时,可以根据 RPC 请求消息中的服务定义信息查询到服务接口实现类;

    • 创建 gRPC Server ,它是 gRPC 服务端的抽象,聚合了各种 Listener,用于 RPC 消息的统一调度和处理。

  • gRPC 服务端关键创建流程分析:

    • NettyServer 实例创建:gRPC 服务端创建,首先需要初始化 NettyServer,它是 gRPC 基于 Netty 4.1 HTTP/2 协议栈之上封装的 HTTP/2 服务端。NettyServer 实例由 NettyServerBuilder 的 buildTransportServer 方法构建,NettyServer 构建完成之后,监听指定的 Socket 地址,即可实现基于 HTTP/2 协议的请求消息接入。

    • 绑定 IDL 定义的服务接口实现类:gRPC 与其它一些 RPC 框架的差异点是服务接口实现类的调用并不是通过动态代理和反射机制,而是通过 proto 工具生成代码,在服务端启动时,将服务接口实现类实例注册到 gRPC 内部的服务注册中心上。请求消息接入之后,可以根据服务名和方法名,直接调用启动时注册的服务实例,而不需要通过反射的方式进行调用,性能更优。

    • gRPC 服务实例(ServerImpl )构建:ServerImpl 负责整个 gRPC 服务端消息的调度和处理,创建ServerImpl实例过程中,会对服务端依赖的对象进行初始化,例如 Netty 的线程池资源、gRPC 的线程池、内部的服务注册类(InternalHandlerRegistry)等,

      • ServerImpl 初始化完成之后,就可以调用 NettyServer的 start 方法启动 HTTP/2 s服务端,接收 gRPC 客户端的服务调用请求
  • 服务端的调用流程
    • gRPC 的客户端请求消息由 Netty Http2ConnectionHandler接入,由 gRPC 负责将 PB 消息(或者 JSON)反序列化为 POJO 对象,然后通过服务定义查询到该消息对应的接口实例,发起本地 Java 接口调用,调用完成之后,将响应消息反序列化为 PB(或者 JSON),通过 HTTP2 Frame 发送给客户端。
    • 整个 service 调用可以划分为如下四个过程:
      • gRPC 请求消息接入;
      • gRPC 消息头和消息体处理;
      • 内部的服务路由和调用;
      • 响应消息发送。
  1. gRPC 请求消息接入;
1
2
3
4
5
6
7
8
gRPC 的请求消息由 Netty HTTP/2 协议栈接入,通过 gRPC 注册的 Http2FrameListener,将解码成功之后的 HTTP Header 和 HTTP Body 发送到 gRPC 的 NettyServerHandler 中,实现基于 HTTP/2 的 RPC 请求消息接入。

gRPC 请求消息接入流程如下:
关键流程解读如下:
Netty 4.1 提供了 HTTP/2 底层协议栈,通过 Http2ConnectionHandler 及其依赖的其它类库,实现了 HTTP/2 消息的统一接入和处理。通过注册 Http2FrameListener 监听器,可以回调接收 HTTP2 协议的消息头、消息体、优先级、Ping、SETTINGS 等。gRPC 通过 FrameListener 重载 Http2FrameListener 的 onDataRead、onHeadersRead 等方法,将 Netty 的 HTTP/2 消息转发到 gRPC 的 NettyServerHandler 中。
Netty 的 HTTP/2 协议接入仍然是通过 ChannelHandler 的 CodeC 机制实现,它并不影响 NIO 线程模型。
因此,理论上各种协议、以及同一个协议的多个服务端实例可以共用同一个 NIO 线程池(NioEventLoopGroup).也可以独占。
在实践中独占模式普遍会存在线程资源占用过载问题,很容易出现句柄等资源泄漏。在 gRPC 中,为了避免该问题,默认采用共享池模式创建 NioEventLoopGroup,所有的 gRPC 服务端实例,都统一从 SharedResourceHolder 分配 NioEventLoopGroup 资源,实现 NioEventLoopGroup 的共享。
  1. gRPC 消息头和消息体处理;

gRPC 消息头的处理入口是 NettyServerHandler 的 onHeadersRead(),处理流程如下所示:

1
2
3
4
5
6
7
8
对 HTTP Header 的 Content-Type 校验,此处必须是 “application/grpc”;
从 HTTP Header 的 URL 中提取接口和方法名,以 HelloWorldServer 为例,它的 method 为:”helloworld.Greeter/SayHello”;
将 Netty 的 HTTP Header 转换成 gRPC 内部的 Metadata,Metadata 内部维护了一个键值对的二维数组 namesAndValues,以及一系列的类型转换方法:
创建 NettyServerStream 对象,它持有了 Sink 和 TransportState 类,负责将消息封装成 GrpcFrameCommand,与底层 Netty 进行交互,实现协议消息的处理;
创建 NettyServerStream 之后,会触发 ServerTransportListener 的 streamCreated 方法,在该方法中,主要完成了消息上下文和 gRPC 业务监听器的创建;
gRPC 上下文创建:CancellableContext 创建之后,支持超时取消,如果 gRPC 客户端请求消息在 Http Header 中携带了“grpc-timeout”,系统在创建 CancellableContext 的同时会启动一个延时定时任务,延时周期为超时时间,一旦该定时器成功执行,就会调用 CancellableContext.CancellationListener 的 cancel 方法,发送 CancelServerStreamCommand 指令;
JumpToApplicationThreadServerStreamListener 的创建:它是 ServerImpl 的内部类,从命名上基本可以看出它的用途,即从 ServerStream 跳转到应用线程中进行服务调用,gRPC 服务端的接口调用主要通过 JumpToApplicationThreadServerStreamListener 的 messageRead 和 halfClosed 方法完成;
将 NettyServerStream 的 TransportState 缓存到 Netty 的 Http2Stream 中,当处理请求消息体时,可以根据 streamId 获取到 Http2Stream,进而根据“streamKey”还原 NettyServerStream 的 TransportState,进行后续处理。

gRPC 消息体的处理入口是 NettyServerHandler 的 onDataRead(),处理流程如下所示:

1
2
3
4
消息体处理比较简单,下面就关键技术点进行讲解:

因为 Netty HTTP/2 协议 Http2FrameListener 分别提供了 onDataRead 和 onHeadersRead 回调方法,所以 gRPC NettyServerHandler 在处理完消息头之后需要缓存上下文,以便后续处理消息体时使用;
onDataRead 和 onHeadersRead 方法都是由 Netty 的 NIO 线程负责调度,但是在执行 onDataRead 的过程中发生了线程切换,如下所示(ServerTransportListenerImpl 类):
  1. 内部的服务路由和调用

主要包括如下几个步骤:

  • 将请求消息体反序列为 Java 的 POJO 对象,即 IDL 中定义的请求参数对象;
  • 根据请求消息头中的方法名到注册中心查询到对应的服务定义信息;
  • 通过 Java 本地接口调用方式,调用服务端启动时注册的 IDL 接口实现类。
1
2
3
4
5
6
7
中间的交互流程比较复杂,涉及的类较多,但是关键步骤主要有三个:

解码:对 HTTP/2 Body 进行应用层解码,转换成服务端接口的请求参数,解码的关键就是调用 requestMarshaller.parse(input),将 PB 码流转换成 Java 对象;

路由:根据 URL 中的方法名从内部服务注册中心查询到对应的服务实例,路由的关键是调用 registry.lookupMethod(methodName) 获取到 ServerMethodDefinition 对象;

调用:调用服务端接口实现类的指定方法,实现 RPC 调用,与一些 RPC 框架不同的是,此处调用是 Java 本地接口调用,非反射调用,性能更优,它的实现关键是 UnaryRequestMethod.invoke(request, responseObserver) 方法。
  1. 响应消息的发送

响应消息的发送由 StreamObserver 的 onNext 触发,流程如下所示:

1
2
3
4
5
响应消息的发送原理如下:
1. 分别发送 gRPC HTTP/2 响应消息头和消息体,由 NettyServerStream 的 Sink 将响应消息封装成 SendResponseHeadersCommand 和 SendGrpcFrameCommand,加入到 WriteQueue 中;
WriteQueue 通过 Netty 的 NioEventLoop 线程进行消息处理,NioEventLoop 将 SendResponseHeadersCommand 和 SendGrpcFrameCommand 写入到 Netty 的 Channel 中,进而触发 DefaultChannelPipeline 的
2. write(Object msg, ChannelPromise promise) 操作;
响应消息通过 ChannelPipeline 职责链进行调度,触发 NettyServerHandler 的 sendResponseHeaders 和 sendGrpcFrame 方法,调用 Http2ConnectionEncoder 的 writeHeaders 和 writeData 方法,将响应消息通过 Netty 的 HTTP/2 协议栈发送给客户端.

2.2 源码分析

3. grpc 实现远程调用例子

gRPC的使用通常包括如下几个步骤:

  1. 通过protobuf来定义接口和数据类型
  2. 编写gRPC server端代码
  3. 编写gRPC client端代码

代码示例:

  1. pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>a.b.c</groupId>
<artifactId>grpc-demo</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.34.1</grpc.version><!-- CURRENT_GRPC_VERSION -->
<protobuf.version>3.12.0</protobuf.version>
<protoc.version>3.12.0</protoc.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-bom</artifactId>
<version>${grpc.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>${protobuf.version}</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
<protoSourceRoot>src/main/resources/proto</protoSourceRoot>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

</project>
  1. 创建proto文件夹并创建.proto文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
文件夹地址 : src/main/resources/proto
文件名:add.proto

syntax = "proto3"; // 使用的语法

package mygrpc;
option java_package = "a.b.c"; // 真正的包名
option java_outer_classname = "AddServiceProto";
option java_multiple_files = true;

// AddService 服务名
// AddService 参数名
// AddReply 返回值
service AddService{
rpc add(AddRequest) returns (AddReply){}
}

// 定义参数和返回值的 数据结构类型
message AddRequest{
int32 a = 1;
int32 b = 2;
}

message AddReply{
int32 res = 1;
}
  1. maven install 后 项目会产生.proto文件
    • 使用gRPC protobuf生成工具生成对应语言的库函数
  • 其中AddServiceGrpc 是一个生成的类,里面有一个非常重要的抽象类

  • 我们需要做的就是继承AddServiceGrpc这个抽象类,并且重写里面的add方法即可

  1. addServer.java : 服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package a.b.c;

import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;

import java.io.IOException;

import static io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall;

/**
* @author Zhuuu
* @date 2021/1/9 16:24
*/

// AddServiceImplBase 抽象类并重写add方法
public class AddServer extends AddServiceGrpc.AddServiceImplBase {
public static void main(String[] args) throws IOException {
// 1. 主方法监听服务
ServerBuilder.forPort(9999)
.addService(new AddServer()) // 加入的服务是自己
.build() // 编译
.start(); // 启动
System.out.println("server start at 9999");
while (true){ // 这里便于一直监听

}
}

// 2. 重写add方法
public void add(AddRequest request, StreamObserver<AddReply> responseObserver) {
int res = myAdd(request.getA(),request.getB()); // 拿到结果
responseObserver.onNext(AddReply.newBuilder().setRes(res).build()); // 返回结果
responseObserver.onCompleted();
}

// 3. 业务代码
private int myAdd(int a, int b){
return a + b;
}
}
  1. addClient.java : 客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package a.b.c;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

/**
* @author Zhuuu
* @date 2021/1/9 16:28
*/
public class AddClient {
AddServiceGrpc.AddServiceBlockingStub stub; // 客户端代理
ManagedChannel channel; // 与服务端的连接通道

public static void main(String[] args) {
int a = 101;
int b = 102;
// 2. 客户端建立通道,并设置参数
AddClient client = new AddClient();

AddReply reply =
client.stub.add(AddRequest.newBuilder().setA(a).setB(b).build());
System.out.println(reply.getRes()); // 最后拿到调用结果
}

// 1. 与服务端建立连接
public AddClient(){
channel = ManagedChannelBuilder
.forAddress("127.0.0.1",9999)
.usePlaintext() // 纯本文的方式
.build();
stub =
AddServiceGrpc.newBlockingStub(channel); // 阻塞的代理
}
}

小结:

  • 以上模拟了一个基本的grpc + protobuf的调用案例

4. ProtoBuf

打赏
  • 版权声明: 本博客所有文章除特别声明外,均采用 Apache License 2.0 许可协议。转载请注明出处!
  • © 2019-2022 Zhuuu
  • PV: UV:

请我喝杯咖啡吧~

支付宝
微信