介绍
gRPC 是开源的远程过程调用(RPC)框架,可在任何环境运行。使用 gRPC 可以有效地连接数据中心内和跨数据中心的服务,gRPC 具有可插拔的负载均衡、追踪、健康检查和身份验证支持。它也适用于将设备、移动应用程序和浏览器连接到后端服务的分布式计算的最后一英里。
概览
在 gRPC 中,客户端应用程序可以直接调用部署在不同机器上的服务端应用程序中的方法,就好像它是本地对象一样,使用 gRPC 可以更容易地创建分布式应用程序和服务。与许多 RPC 系统一样,gRPC 基于定义服务的思想,指定可以通过参数和返回类型远程调用的方法。在服务端侧,服务端实现接口,运行 gRPC 服务,处理客户端调用。在客户端侧,客户端拥有存根(Stub,在某些语言中称为客户端),它提供与服务端相同的方法。
gRPC 客户端和服务端可以在各种环境中运行和相互通信 - 从 Google 内部的服务器到你自己的桌面 - 并且可以使用 gRPC 支持的任何语言编写。因此,比如,你可以轻松地用 Java 创建 gRPC 服务端,使用 Go、Python 或 Ruby 创建客户端。此外,最新的 Google API 将包含 gRPC 版本的接口,使你轻松地将 Google 功能构建到你的应用程序中。
默认,gRPC 使用 Protocol Buffers(https://developers.google.com/protocol-buffers/docs/overview),Google 的成熟的用于序列化结构化数据的开源机制(尽管可以使用其它数据格式,比如 JSON)。下面将快速介绍如何使用它。
message Person {
string name = 1;
int32 id = 2;
bool has_ponycopter = 3;
}
在指定数据结构后,可以使用 Protocol Buffer 编译器 protoc 从 proto 定义中生成所选语言的数据访问类。它们为每个字段提供简单的访问器,比如 name() 和 set_name(),以及用于将整个结构序列化成原始字节流和从原始字节流解析整个结构的方法。因此,比如,如果你选择的语言是 C++,那么在上面的示例上运行编译器将生成名为 Person 的类。然后可以在应用程序中使用该类填充、序列化和检索 Person Protocol Buffer 消息。
在普通的 proto 文件中定义 gRPC 服务,RPC 方法参数和返回类型被指定为 Protocol Buffer 消息:
// The greeter service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
gRPC 使用带特殊的 gRPC 插件的 protoc 从 proto 文件生成代码:你将获得生成的 gRPC 客户端和服务端代码,以及用于填充、序列化和检索消息类型的常规 Protocol Buffer 代码。如果想要了解更多关于 Protocol Buffer 的信息,包括如何在所选语言中安装带 gRPC 插件的 protoc,请参阅 protocol buffers documentation。
虽然开源用户使用 protocol buffers 已有一段时间,但是本站点的大多数示例使用 Protocol Buffer 版本 3(proto3),它拥有略微简化的语法、一些有用的新特性,以及支持更多语言。Proto3 目前可用于 Java、C++、Dart、Python、Objective-C、C#、lite-runtime(Android Java)、Ruby 和来自 Protocol Buffer Github 仓库的 JavaScript,以及来自 golang/protobuf 官方包的 Go 语言生成器,更多语言正在开发中。你可以在 Proto3 语言指南和每种语言的可用参考文档中找到更多信息。参考文档也包含 .proto 文件格式的正式规范。
核心概念
与许多 RPC 系统一样,gRPC 基于定义服务的思想,指定可以通过参数和返回类型远程调用的方法。默认情况下,gRPC 使用 Protocol Buffer 作为接口定义语言(IDL),来描述服务接口和负载消息的结构。如果需要,也可以使用其它替代品。
service HelloService {
rpc SayHello (HelloRequest) returns (HelloResponse);
}
message HelloRequest {
string greeting = 1;
}
message HelloResponse {
string reply = 1;
}
gRPC 支持定义四种服务方法:
rpc SayHello(HelloRequest) returns (HelloResponse);
服务端流 RPC,客户端向服务端发送一个请求,取回用于读取消息序列的流。客户端从返回的流中读取,直到没有更多消息。gRPC 保证单个 RPC 调用中的消息顺序。
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
双向流 RPC,两侧都使用读-写流发送消息序列。这两个流独立运行,因此客户端和服务端可以按照它们喜欢的任何顺序进行读写:比如,服务端可以在写入响应之前等待接收所有客户端消息,或者交替地读消息,然后写消息,或者执行某些其它读写组合。每个流中的消息顺序保持不变。
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
同步 RPC 调用阻塞直到服务端的响应到达,这与 RPC 所期望的过程调用的抽象最接近。另一方面,网络本质上是异步的,在许多情况下,能够在不阻塞当前线程的情况下启动 RPC 非常有用。
首先看最简单的 RPC 类型,客户端发送单个请求,取回单个响应。
1. 当客户端调用存根(Stub)方法时,服务端将收到通知,客户端使用用于本次调用的元数据(metadata)、方法名称和指定的截止时间(deadline)调用 RPC。
2. 然后,服务端可以直接发送回它自己的初始元数据(必须在任何响应之前发送),也可以等待客户端的请求消息。先发生哪个,是特定于应用程序的。
3. 一旦服务端拥有客户端的请求消息,它就会执行创建和填充响应所需的所有工作。然后将响应(如果成功)连同状态详细信息(状态代码和可选的状态消息)和可选的尾随元数据一起返回到客户端。
4. 如果响应状态是 OK,那么客户端获取响应,在客户端侧完成响应。
服务端流 RPC 与一元 RPC 类似,不同之处在于服务端返回消息流,以响应客户端的请求。在发送所有消息后,服务端的状态详情(状态码和可选的状态消息)和可选的尾随元数据被发送到客户端。这完成服务端侧的处理。客户端在拥有服务端的所有消息后完成处理。
客户端流 RPC 与一元 RPC 类似,不同之处在于客户端向服务端发送消息流,而不是单个消息。服务端响应单条消息(以及它的状态详细信息和可选的尾随元数据),通常(但不一定)在它接收到所有客户端消息之后。
在双向流 RPC 中,调用由调用方法的客户端发起,服务端接收客户端的元数据、方法名和截止时间。服务端可以选择发送回它的初始元数据,或者等待客户端开始流消息。
客户端侧和服务端侧流处理是特定于语言的,由于这两个流是独立的,客户端和服务端可以以任意顺序读写消息。例如,服务端可以等待到收到客户端的所有消息后,再写它的消息,或者服务端和客户端可以玩“乒乓”游戏 - 服务器接收请求,然后发送回响应,然后客户端根据响应发送另一个请求,以此类推。
gRPC 允许客户端指定在 RPC 以 DEADLINE_EXCEEDED 错误终止之前,他们愿意等待多久。在服务端,服务可以查询特定的 RPC 是否超时,或者还剩下多少时间来完成该 RPC。
指定截止日期或超时是特定于语言的:一些语言 API 根据超时(持续时间)工作,一些语言 API 根据截止时间(固定的时间点)工作,并且可能有也可能没有默认截止时间。
在 gRPC 中,客户端和服务端都对调用的成功做出独立的本地决定,并且它们的结论可能不匹配。这意味着,比如,你可以有一个在服务端成功完成(“我已经发送所有响应!”),但在客户端失败(“响应在截止时间之后到达!”)的 RPC。服务端也可以在客户端发送所有请求之前决定完成。
客户端和服务端都可以随时取消 RPC,取消将立即终止 RPC,因此不会执行进一步的工作。
警告 在取消前发生的变更不会回滚。
元数据是键-值对列表形式的关于特定 RPC 调用(比如身份验证详细信息)的信息,其中键是字符串,值通常是字符串,但也可以是二进制数据。
键不区分大小写,由 ASCII 字母、数字和特殊字符 -、_ 和 . 组成,并且不能以 grpc- 开头(为 gRPC 本身保留)。二进制值的键以 -bin 结尾,而 ASCII 值的键则不是。
用户定义的元数据不会被 gRPC 使用,它允许客户端向服务端提供与本次调用相关的信息,反之亦然。
对元数据的访问依赖于语言。
gRPC 通道提供到指定主机和端口上的 gRPC 服务的连接。在创建客户端存根时使用。客户端可以指定通道参数来修改 gRPC 的默认行为,比如打开或关闭消息压缩。通道有状态,包括已连接和空闲。
快速入门 Go gRPC
操作系统:macOS 12.6
Go:go version go1.19 darwin/amd64
Protobuf:3.21.5
mkdir hellogrpc
cd hellogrpc
go mod init hellogrpc
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
$ go install google.golang.org/grpc/cmd/protoc-gen-go[email protected].2
2. 更新 PATH,以便 protoc 编译器可以找到插件:
$ export PATH="$PATH:$(go env GOPATH)/bin"
$ go get google.golang.org/grpc
hellogrpc % tree .
.
├── go.mod
├── go.sum
├── greeter_client
│ └── main.go
├── greeter_server
│ └── main.go
└── hellogrpc
└── hellogrpc.proto
3 directories, 5 files
go.mod:
module hellogrpc
go 1.19
require google.golang.org/grpc v1.51.0
require (
github.com/golang/protobuf v1.5.2 // indirect
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
golang.org/x/text v0.4.0 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
google.golang.org/protobuf v1.27.1 // indirect
)
greeter_client/main.go:
package main
import (
"context"
"flag"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "hellogrpc/hellogrpc"
)
const (
defaultName = "world"
)
var (
addr = flag.String("addr", "localhost:50051", "the address to connect to")
name = flag.String("name", defaultName, "Name to greet")
)
func main() {
flag.Parse()
// Set up a connection to the server.
conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
// Contact the server and print out its response.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.GetMessage())
r, err = c.SayHelloAgain(ctx, &pb.HelloRequest{Name: *name})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.GetMessage())
}
greeter_server/main.go:
package main
import (
"context"
"flag"
"fmt"
"log"
"net"
"google.golang.org/grpc"
pb "hellogrpc/hellogrpc"
)
var (
port = flag.Int("port", 50051, "The server port")
)
// server is used to implement helloworld.GreeterServer.
type server struct {
pb.UnimplementedGreeterServer
}
// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("Received: %v", in.GetName())
return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}
func (s *server) SayHelloAgain(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
return &pb.HelloReply{Message: "Hello again " + in.GetName()}, nil
}
func main() {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
syntax = "proto3";
option go_package = "hellogrpc/hellogrpc";
package hellogrpc;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
rpc SayHelloAgain (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
$ protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
hellogrpc/hellogrpc.proto
这将生成 hellogrpc.pb.go 和 hellogrpc_grpc.pb.go 文件,它们包含:
用于填充、序列化和检索 HelloRequest 和 HelloReply 消息类型的代码。
生成的客户端和服务端代码。
$ go run greeter_server/main.go
$ go run greeter_client/main.go --name=Tim
将看到如下输出(时间已被省略):
Greeting: Hello Tim
Greeting: Hello again Tim
基础教程 Go gRPC
通过该示例,你将学习如何:
在 .proto 文件中定义 service。
使用 Protocol Buffer 编译器生成服务端和客户端代码。
为什么使用 gRPC?
获取代码
示例代码是 grpc-go (https://github.com/grpc/grpc-go)仓库的一部分。
1. 下载 zip 文件形式的仓库(https://github.com/grpc/grpc-go/archive/v1.50.0.zip),然后解压缩,或克隆仓库:
$ git clone -b v1.50.0 --depth 1 https://github.com/grpc/grpc-go
2. 切换到示例目录:
$ cd grpc-go/examples/route_guide
定义服务
第一步是使用 protocol buffers (https://developers.google.com/protocol-buffers/docs/overview)定义 gRPC 服务和方法 request 和 response 类型。查看 routeguide/route_guide.proto(https://github.com/grpc/grpc-go/blob/master/examples/route_guide/routeguide/route_guide.proto) 获取完整的 .proto 文件。
要定义服务,需要在 .proto 文件中指定一个命名 service:
service RouteGuide {
...
}
然后在服务定义里面定义 rpc 方法,指定它们的请求和响应类型。gRPC 支持定义四种类型的服务方法:
// Obtains the feature at a given position.
rpc GetFeature(Point) returns (Feature) {}
// Obtains the Features available within the given Rectangle. Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
.proto 文件还包含我们的服务方法中使用的所有请求和响应类型的 Protocol Buffer 消息类型定义 - 比如,下面是 Point 消息类型:
// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
生成客户端和服务端代码
接下来我们需要从 .proto 服务定义生成 gRPC 客户端和服务端接口。使用带特殊的 gRPC Go 插件的 Protocol Buffer 编译器 protoc 完成这项工作。这与我们在快速入门中做的事情类似。
在 examples/route_guide 目录,运行如下命令:
$ protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
routeguide/route_guide.proto
运行该命令将在 route_guide 目录生成如下文件:
route_guide.pb.go,其中包含用于填充、序列化和检索请求和响应消息类型的所有 Protocol Buffer 代码。
route_guide_grpc.pb.go,包含:
供客户端调用的接口类型(或存根),该接口类型拥有 RouteGuide 服务中定义的方法。
供服务端实现的接口类型,它也拥有 RouteGuide 服务中定义的方法。
创建服务端
要使我们的 RouteGuide 服务发挥作用,需要做两部分工作:
实现从服务定义生成的服务接口:完成服务的实际“工作”。
运行 gRPC 服务器,监听来自客户端的请求,将它们分派到正确的服务实现。
你可以在 server/server.go (https://github.com/grpc/grpc-go/tree/master/examples/route_guide/server/server.go)找到 RouteGuide 示例服务。
如你所见,我们的服务有一个实现生成的 routeGuideServer 接口的 routeGuideServer 结构体类型:
type routeGuideServer struct {
...
}
...
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
...
}
...
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
...
}
...
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
...
}
...
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
...
}
...
routeGuideServer 实现所有服务方法。我们首先看最简单的类型,GetFeature,它仅从客户端获取一个 Point,然后从数据库中返回 Feature 里的相应特性信息。
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
return feature, nil
}
}
// No feature was found, return an unnamed feature
return &pb.Feature{Location: point}, nil
}
该方法接收用于 RPC 的上下文对象和客户端的 Point Protocol Buffer 请求。它返回带有响应信息的 Feature Protocol Buffer 对象,以及一个 error 对象。在该方法中,我们使用相应信息填充 Feature,然后连同 nil 错误一起返回它,告诉 gRPC 我们已经完成 RPC 处理,该 Feature 可被返回给客户端。
ListFeatures 是一个服务端流 RPC,因此我们需要向客户端发送回多个 Feature。
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
for _, feature := range s.savedFeatures {
if inRange(feature.Location, rect) {
if err := stream.Send(feature); err != nil {
return err
}
}
}
return nil
}
如你所见,我们的方法参数没有获取简单的请求和响应对象,而是获取一个请求对象(客户端想要在其中寻找 Feature 的 Rectangle)和用于写响应的 RouteGuide_ListFeaturesServer 特殊对象。
在该方法中,我们填充多个我们需要返回的 Feature 对象,使用 RouteGuide_ListFeaturesServer 的 Send() 方法将它们写到 RouteGuide_ListFeaturesServer。最后,就像在简单 RPC 中一样,我们返回 nil 错误,告诉 gRPC 我们已经完成写响应。如果在这个调用中发生任何错误,我们返回非 nil 错误;gRPC 层将其转换为适当的 RPC 状态,并且发送到网络上。
现在我们看客户端流方法 RecordRoute,我们从客户端获取 Point 流,返回包含行程信息的单个 RouteSummary。如你所见,这次方法根本没有请求参数。而是获取 RouteGuide_RecordRouteServer 流,服务端可以使用它读写消息 - 可以使用它的 Recv() 方法接收客户端消息,使用它的 SendAndClose() 返回单个响应。
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
var pointCount, featureCount, distance int32
var lastPoint *pb.Point
startTime := time.Now()
for {
point, err := stream.Recv()
if err == io.EOF {
endTime := time.Now()
return stream.SendAndClose(&pb.RouteSummary{
PointCount: pointCount,
FeatureCount: featureCount,
Distance: distance,
ElapsedTime: int32(endTime.Sub(startTime).Seconds()),
})
}
if err != nil {
return err
}
pointCount++
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
featureCount++
}
}
if lastPoint != nil {
distance += calcDistance(lastPoint, point)
}
lastPoint = point
}
}
在方法体中,我们使用 RouteGuide_RecordRouteServer 的 Recv() 方法反复将客户端的请求读入到请求对象(在本例中是 Point)中,直到没有更多的消息:服务器需要在每次调用后检查 Recv() 返回的错误。如果为 nil,那么流仍然是好的,可以继续读取;如果为 io.EOF,那么消息流已经结束,服务端可以返回它的 RouteSummary。如果为任何其它值,我们将“原样”返回错误,以便 gRPC 层将其转换为 RPC 状态。
最后我们看双向流 RPC RouteChat()。
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
key := serialize(in.Location)
... // look for notes to be sent to client
for _, note := range s.routeNotes[key] {
if err := stream.Send(note); err != nil {
return err
}
}
}
}
这次我们获取 RouteGuide_RouteChatServer 流,与客户端流示例一样,可使用它读写消息。但是,这次当客户端仍在向它们的消息流写消息时,我们通过方法的流返回值。
这里读写的语法与客户端流方法非常相似,不同之处是服务端使用流的 Send() 方法而不是 SendAndClose(),因为它要写入多个响应。
尽管每一方总是按照对方的写入顺序获得消息,但客户端和服务端都可以以任何顺序读取和写入消息 - 流完全独立地运行。
在实现所有方法后,我们还需要启动 gRPC 服务,以便客户端可以实际使用我们的服务。下面的代码片段展示我们如何为 RouteGuide 服务做到这一点:
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
var opts []grpc.ServerOption
...
grpcServer := grpc.NewServer(opts...)
pb.RegisterRouteGuideServer(grpcServer, newServer())
grpcServer.Serve(lis)
为构建和启动服务,我们:
1. 指定我们想要使用的端口,监听客户端请求:lis, err := net.Listen(...)。
2. 使用 grpc.NewServer(...) 创建 gRPC 服务实例。
3. 使用端口详细信息,调用服务上的 Serve(),来进行阻塞等待,直到进程被杀死或调用 Stop()。
创建客户端
接下来,我们看如何为 RouteGuide 服务创建 Go 客户端。你可以在 grpc-go/examples/route_guide/client/client.go 查看完整的示例客户端代码。
为调用服务方法,我们首先需要创建服务端进行通信的 gRPC 通道。我们通过将服务端地址和端口号传递给 grpc.Dial() 的方式,创建它,如下所示:
var opts []grpc.DialOption
...
conn, err := grpc.Dial(*serverAddr, opts...)
if err != nil {
...
}
defer conn.Close()
如果服务需要认证,那么使用 DialOptions 在 grpc.Dial 中设置认证凭据(例如 TLS、GCE 凭据或 JWT 凭据)。RouteGuide 服务不需要任何凭据。
设置 gRPC 通道后,我们需要客户端存根来执行 RPC。我们使用由示例 .proto 文件生成的 pb 包提供的 NewRouteGuideClient 方法来获取它。
接下来我们看如何调用服务方法。注意,在 gRPC-Go 中,RPC 以阻塞/同步模式运行,这意味着 RPC 调用等待服务端响应,并且将返回响应或错误。
调用简单 RPC GetFeature 几乎和调用本地方法一样简单。
feature, err := client.GetFeature(context.Background(), &pb.Point{409146138, -746188906})
if err != nil {
...
}
如你所见,我们调用存根上的方法。在方法参数中,我们创建及填充请求 Protocol Buffer 对象(在本例中为 Point)。我们也传递一个 context.Context 对象,它使我们必要时改变 RPC 的行为,比如超时/取消运行中的 RPC。如果调用未返回错误,那么可以从服务端的第一个返回值中读取响应信息。
log.Println(feature)
下面是我们调用服务端流方法 ListFeatures 的地方,该方法返回地理 Feature 流。在客户端和服务端,流 RPC 的实现方式类似:
rect := &pb.Rectangle{ ... } // initialize a pb.Rectangle
stream, err := client.ListFeatures(context.Background(), rect)
if err != nil {
...
}
for {
feature, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
}
log.Println(feature)
}
与简单 RPC 中一样,我们给方法传递 context 和 request。但是,我们取回 RouteGuide_ListFeaturesClient 实例,而不是响应对象。客户端使用 RouteGuide_ListFeaturesClient 流读取服务端的响应。
我们使用 RouteGuide_ListFeaturesClient 的 Recv() 方法反复地将服务端的响应读入到响应 Protocol Buffer 对象(在本例中为 Feature),直到没有更多的消息:客户端需要在每次调用后检查 Recv() 返回的错误 err。如果为 nil,那么流仍然是好的,可以继续读取;如果为 io.EOF,那么消息流已终止;否则,必须通过 err 传递 RPC 错误。
客户端流方法 RecordRoute 与服务端方法类似,不同之处在于我们只给方法传递上下文对象,并且取回 RouteGuide_RecordRouteClient 流,我们使用该流读写消息。
// Create a random number of random points
r := rand.New(rand.NewSource(time.Now().UnixNano()))
pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
var points []*pb.Point
for i := 0; i < pointCount; i++ {
points = append(points, randomPoint(r))
}
log.Printf("Traversing %d points.", len(points))
stream, err := client.RecordRoute(context.Background())
if err != nil {
log.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
}
for _, point := range points {
if err := stream.Send(point); err != nil {
log.Fatalf("%v.Send(%v) = %v", stream, point, err)
}
}
reply, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
}
log.Printf("Route summary: %v", reply)
可以使用 RouteGuide_RecordRouteClient 的 Send() 方法向服务端发送请求。我们使用 Send() 完成向流写入客户端请求后,需要调用流上的 CloseAndRecv(),让 gRPC 知道,我们已经完成写,期望接收响应。我们从 CloseAndRecv() 返回的 err 获取 RPC 状态。如果状态为 nil,那么 CloseAndRecv() 的第一个返回值是有效的服务端响应。
RouteChat() 是双向流 RPC,像 RecordRoute() 一样,我们只给方法传递上下文对象,并且取回用于读写消息的流。但是,这次当服务端仍然向它们的消息流写消息时,我们通过方法的流返回值。
stream, err := client.RouteChat(context.Background())
waitc := make(chan struct{})
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
close(waitc)
return
}
if err != nil {
log.Fatalf("Failed to receive a note : %v", err)
}
log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
}
}()
for _, note := range notes {
if err := stream.Send(note); err != nil {
log.Fatalf("Failed to send a note: %v", err)
}
}
stream.CloseSend()
<-waitc
读写语法与客户端流方法非常相似,不同之处在于一旦我们完成调用,我们使用流的 CloseSend() 方法。尽管每一方总是按照对方的写入顺序获得消息,但客户端和服务端都可以以任何顺序读取和写入消息 - 流完全独立地运行。
从 examples/route_guide 目录,运行如下命令:
1. 运行服务端:
$ go run server/server.go
2. 从另外一个终端,运行客户端:
$ go run client/client.go
将看到类似下面的输出(已省略时间戳):
Getting feature for point (409146138, -746188906)
name:"Berkshire Valley Management Area Trail, Jefferson, NJ, USA" location:<latitude:409146138 longitude:-746188906 >
Getting feature for point (0, 0)
location:<>
Looking for features within lo:<latitude:400000000 longitude:-750000000 > hi:<latitude:420000000 longitude:-730000000 >
name:"Patriots Path, Mendham, NJ 07945, USA" location:<latitude:407838351 longitude:-746143763 >
...
name:"3 Hasta Way, Newton, NJ 07860, USA" location:<latitude:410248224 longitude:-747127767 >
Traversing 56 points.
Route summary: point_count:56 distance:497013163
Got message First message at point(0, 1)
Got message Second message at point(0, 2)
Got message Third message at point(0, 3)
Got message First message at point(0, 1)
Got message Fourth message at point(0, 1)
Got message Second message at point(0, 2)
Got message Fifth message at point(0, 2)
Got message Third message at point(0, 3)
Got message Sixth message at point(0, 3)
参考文档
1. https://grpc.io/docs/what-is-grpc/
关于Portal Lab
星阑科技 Portal Lab 致力于前沿安全技术研究及能力工具化。主要研究方向为API 安全、应用安全、攻防对抗等领域。实验室成员研究成果曾发表于BlackHat、HITB、BlueHat、KCon、XCon等国内外知名安全会议,并多次发布开源安全工具。未来,Portal Lab将继续以开放创新的态度积极投入各类安全技术研究,持续为安全社区及企业级客户提供高质量技术输出。