脚本之家,脚本语言编程技术及教程分享平台!
分类导航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服务器之家 - 脚本之家 - Golang - Go gRPC服务端流式RPC教程示例

Go gRPC服务端流式RPC教程示例

2022-10-21 11:47烟花易冷人憔悴 Golang

这篇文章主要为大家介绍了Go gRPC服务端流式RPC教程示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

前言

上一篇介绍了简单模式RPC,当数据量大或者需要不断传输数据时候,我们应该使用流式RPC,它允许我们边处理边传输数据。本篇先介绍服务端流式RPC。

服务端流式RPC:客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。

情景模拟:实时获取股票走势

1.客户端要获取某原油股的实时走势,客户端发送一个请求

2.服务端实时返回该股票的走势

新建proto文件

新建server_stream.proto文件

1.定义发送信息

?
1
2
3
4
5
6
// 定义发送请求信息
message SimpleRequest{
    // 定义发送的参数,采用驼峰命名方式,小写加下划线,如:student_name
    // 请求参数
    string data = 1;
}

2.定义接收信息

?
1
2
3
4
5
// 定义流式响应信息
message StreamResponse{
    // 流式响应数据
    string stream_value = 1;
}

3.定义服务方法ListValue

服务端流式rpc,只要在响应数据前添加stream即可

?
1
2
3
4
5
// 定义我们的服务(可定义多个服务,每个服务可定义多个接口)
service StreamServer{
    // 服务端流式rpc,在响应数据前添加stream
    rpc ListValue(SimpleRequest)returns(stream StreamResponse){};
}

4.编译proto文件

进入server_stream.proto所在目录,运行指令:

?
1
protoc --go_out=plugins=grpc:./ ./server_stream.proto

创建Server端

1.定义我们的服务,并实现ListValue方法

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// SimpleService 定义我们的服务
type StreamService struct{}
// ListValue 实现ListValue方法
func (s *StreamService) ListValue(req *pb.SimpleRequest, srv pb.StreamServer_ListValueServer) error {
    for n := 0; n < 5; n++ {
        // 向流中发送消息, 默认每次send送消息最大长度为`math.MaxInt32`bytes
        err := srv.Send(&pb.StreamResponse{
            StreamValue: req.Data + strconv.Itoa(n),
        })
        if err != nil {
            return err
        }
    }
    return nil
}

初学者可能觉得比较迷惑,ListValue的参数和返回值是怎样确定的。其实这些都是编译proto时生成的.pb.go文件中有定义,我们只需要实现就可以了。

Go gRPC服务端流式RPC教程示例

2.启动gRPC服务器

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
const (
    // Address 监听地址
    Address string = ":8000"
    // Network 网络通信协议
    Network string = "tcp"
)
 
func main() {
    // 监听本地端口
    listener, err := net.Listen(Network, Address)
    if err != nil {
        log.Fatalf("net.Listen err: %v", err)
    }
    log.Println(Address + " net.Listing...")
    // 新建gRPC服务器实例
    // 默认单次接收最大消息长度为`1024*1024*4`bytes(4M),单次发送消息最大长度为`math.MaxInt32`bytes
    // grpcServer := grpc.NewServer(grpc.MaxRecvMsgSize(1024*1024*4), grpc.MaxSendMsgSize(math.MaxInt32))
    grpcServer := grpc.NewServer()
    // 在gRPC服务器注册我们的服务
    pb.RegisterStreamServerServer(grpcServer, &StreamService{})
 
    //用服务器 Serve() 方法以及我们的端口信息区实现阻塞等待,直到进程被杀死或者 Stop() 被调用
    err = grpcServer.Serve(listener)
    if err != nil {
        log.Fatalf("grpcServer.Serve err: %v", err)
    }
}

运行服务端

?
1
2
go run server.go
:8000 net.Listing...

创建Client端

1.创建调用服务端ListValue方法

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// listValue 调用服务端的ListValue方法
func listValue() {
    // 创建发送结构体
    req := pb.SimpleRequest{
        Data: "stream server grpc ",
    }
    // 调用我们的服务(ListValue方法)
    stream, err := grpcClient.ListValue(context.Background(), &req)
    if err != nil {
        log.Fatalf("Call ListStr err: %v", err)
    }
    for {
        //Recv() 方法接收服务端消息,默认每次Recv()最大消息长度为`1024*1024*4`bytes(4M)
        res, err := stream.Recv()
        // 判断消息流是否已经结束
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatalf("ListStr get stream err: %v", err)
        }
        // 打印返回值
        log.Println(res.StreamValue)
    }
}

2.启动gRPC客户端

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Address 连接地址
const Address string = ":8000"
 
var grpcClient pb.StreamServerClient
 
func main() {
    // 连接服务器
    conn, err := grpc.Dial(Address, grpc.WithInsecure())
    if err != nil {
        log.Fatalf("net.Connect err: %v", err)
    }
    defer conn.Close()
 
    // 建立gRPC连接
    grpcClient = pb.NewStreamServerClient(conn)
    route()
    listValue()
}

运行客户端

go run client.go
stream server grpc 0
stream server grpc 1
stream server grpc 2
stream server grpc 3
stream server grpc 4

客户端不断从服务端获取数据

思考

假如服务端不停发送数据,类似获取股票走势实时数据,客户端能自己停止获取数据吗?

答案:可以的

1.我们把服务端的ListValue方法稍微修改

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// ListValue 实现ListValue方法
func (s *StreamService) ListValue(req *pb.SimpleRequest, srv pb.StreamServer_ListValueServer) error {
    for n := 0; n < 15; n++ {
        // 向流中发送消息, 默认每次send送消息最大长度为`math.MaxInt32`bytes
        err := srv.Send(&pb.StreamResponse{
            StreamValue: req.Data + strconv.Itoa(n),
        })
        if err != nil {
            return err
        }
        log.Println(n)
        time.Sleep(1 * time.Second)
    }
    return nil
}

2.再把客户端调用ListValue方法的实现稍作修改,就可以得到结果了

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// listValue 调用服务端的ListValue方法
func listValue() {
    // 创建发送结构体
    req := pb.SimpleRequest{
        Data: "stream server grpc ",
    }
    // 调用我们的服务(Route方法)
    // 同时传入了一个 context.Context ,在有需要时可以让我们改变RPC的行为,比如超时/取消一个正在运行的RPC
    stream, err := grpcClient.ListValue(context.Background(), &req)
    if err != nil {
        log.Fatalf("Call ListStr err: %v", err)
    }
    for {
        //Recv() 方法接收服务端消息,默认每次Recv()最大消息长度为`1024*1024*4`bytes(4M)
        res, err := stream.Recv()
        // 判断消息流是否已经结束
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatalf("ListStr get stream err: %v", err)
        }
        // 打印返回值
        log.Println(res.StreamValue)
        break
    }
    //可以使用CloseSend()关闭stream,这样服务端就不会继续产生流消息
    //调用CloseSend()后,若继续调用Recv(),会重新激活stream,接着之前结果获取消息
    stream.CloseSend()
}

只需要调用CloseSend()方法,就可以关闭服务端的stream,让它停止发送数据。值得注意的是,调用CloseSend()后,若继续调用Recv(),会重新激活stream,接着当前的结果继续获取消息。

这能完美解决客户端暂停->继续获取数据的操作。

总结

本篇介绍了服务端流式RPC的简单使用,客户端发起一个请求,服务端不停返回数据,直到服务端停止发送数据或客户端主动停止接收数据为止。下篇将介绍客户端流式RPC。

教程源码地址:https://github.com/Bingjian-Zhu/go-grpc-example

参考:gRPC官方文档中文版

更多关于Go gRPC服务端流式RPC的资料请关注服务器之家其它相关文章!

原文链接:https://www.cnblogs.com/FireworksEasyCool/p/12693749.html

延伸 · 阅读

精彩推荐
  • Golang一篇文章带你了解Go语言基础之变量

    一篇文章带你了解Go语言基础之变量

    简单点说,我们写的程序默认数据都是保存在内存条中的,我们不可能直接通过地址找到这个变量,因为地址太长了,而且不容易记。...

    Go语言进阶学习5872021-09-30
  • GolangGolang 1.16中Module机制更新

    Golang 1.16中Module机制更新

    最近,Golang发布了一个新版本1.16。版本引入了很多新功能,其中在模块方面的改进,今天我们就一起来深入学习一下。 ...

    虫虫安全18502021-02-22
  • GolangGoLang 逃逸分析的机制详解

    GoLang 逃逸分析的机制详解

    这篇文章主要介绍了GoLang-逃逸分析的机制详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随...

    帘外五更风4742020-06-03
  • GolangGO制作微信机器人的流程分析

    GO制作微信机器人的流程分析

    这篇文章主要介绍了利用go制作微信机器人,本文主要包括项目基础配置及详细代码讲解,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考...

    Code Writers5252022-08-01
  • GolangGo语言空结构体详解

    Go语言空结构体详解

    本文主要介绍了Go语言空结构体详解,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    漫漫Coding路9902022-08-31
  • Golang详解Go 语言中的比较操作符

    详解Go 语言中的比较操作符

    这篇文章专注于 6 个操作符,==,!=,<,<=,> 和 >=。我们将深入探讨它们的语法和用法的细微差别,感兴趣的朋友跟随脚本之家小编一起看看吧 ...

    themoonbear6192020-05-17
  • Golanggolang 熔断器的实现过程

    golang 熔断器的实现过程

    这篇文章主要介绍了golang 熔断器的实现过程,Go 项目中使用熔断技术提高系统容错性。接下俩就来给打家介绍 go 熔断器和其使用,需要的朋友可以参考一下...

    运维派10352022-08-29
  • GolangGolang中切片的用法与本质详解

    Golang中切片的用法与本质详解

    Go的切片类型为处理同类型数据序列提供一个方便而高效的方式,下面这篇文章就来给大家介绍了关于Golang中切片的用法与本质的相关资料,文中通过示例...

    hgqxjj5292020-05-17