gRPC流模式

79次阅读

共计 3756 个字符,预计需要花费 10 分钟才能阅读完成。

gRPC 中有四种数据流,分别是简单模式、服务端数据流模式、客户端数据流模式、双向数据流模式。

简单模式:客户端发起一次请求,服务端返回一个响应。

服务端数据流模式:客户端发起一次请求,服务端返回一段连续的数据流,如获取股票实时数据。

客户端数据流模式:客户端不断像服务器发送数据流,发送结束后,由服务端返回一个响应。如物联网终端向服务器发送数据。

双向数据流模式:客户端和服务端都可以向对方发送数据流,如实时聊天。

Proto 文件

syntax = "proto3";

package proto;

option go_package = "/stream;stream";

message RequestInfo {string data = 1;}

message ResponseInfo {string data = 1;}

service Stream {rpc ServerStream (RequestInfo) returns (stream ResponseInfo) {} // 服务端数据流模式
  rpc ClientStream (stream RequestInfo) returns (ResponseInfo) {} // 客户端数据流模式
  rpc AllStream (stream RequestInfo) returns (stream ResponseInfo) {} // 双向数据流模式}

使用 stream 声明数据流模式。

服务端数据流模式

服务端

package main

import (
    "fmt"
    "google.golang.org/grpc"
    "net"
    "test/stream"
    "time"
)

type Stream struct {stream.UnimplementedStreamServer}

func (s *Stream) ServerStream(req *stream.RequestInfo, res stream.Stream_ServerStreamServer) error {fmt.Println(req.Data)
    for i := 0; i < 10; i++ {
        // 往客户端发送数据
        _ = res.Send(&stream.ResponseInfo{Data: fmt.Sprintf("%v", time.Now().Unix()),
        })
        time.Sleep(time.Second)
    }
    return nil
}

func main() {
    // 监听
    listen, _ := net.Listen("tcp", ":8080")
    // 实例化 grpc 服务
    s := grpc.NewServer()
    // 注册服务
    stream.RegisterStreamServer(s, &Stream{})
    // 启动
    s.Serve(listen)
}

客户端

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "test/stream"
)

func main() {
    // 建立连接
    conn, _ := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))

    // 实例化客户端
    client := stream.NewStreamClient(conn)

    // 调用服务
    res, _ := client.ServerStream(context.Background(), &stream.RequestInfo{Data: "I am coming!",})
    for {
        // 接受服务端传送的数据
        data, err := res.Recv()
        if err != nil {break}
        fmt.Println(data)
    }
}

客户端数据流模式

服务端

package main

import (
    "fmt"
    "google.golang.org/grpc"
    "net"
    "test/stream"
    "time"
)

type Stream struct {stream.UnimplementedStreamServer}

func (s *Stream) ClientStream(cli stream.Stream_ClientStreamServer) error {
    for {
        // 接收客户端传输的数据
        if res, err := cli.Recv(); err != nil {fmt.Println(err)
            break
        } else {fmt.Println(res.Data)
        }
    }
    return nil
}

func main() {
    // 监听
    listen, _ := net.Listen("tcp", ":8080")
    // 实例化 grpc 服务
    s := grpc.NewServer()
    // 注册服务
    stream.RegisterStreamServer(s, &Stream{})
    // 启动
    s.Serve(listen)
}

客户端

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "test/stream"
)

func main() {
    // 建立连接
    conn, _ := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))

    // 实例化客户端
    client := stream.NewStreamClient(conn)

    // 调用服务
    res, _ := client.ClientStream(context.Background())
    for i := 0; i < 10; i++ {
        // 往服务端发送数据
        _ = res.Send(&stream.RequestInfo{Data: fmt.Sprintf("%v", time.Now().Unix()),
        })

        time.Sleep(time.Second)
    }
}

双向数据流模式

服务端

package main

import (
    "fmt"
    "google.golang.org/grpc"
    "net"
    "sync"
    "test/stream"
    "time"
)

type Stream struct {stream.UnimplementedStreamServer}

func (s *Stream) AllStream(all stream.Stream_AllStreamServer) error {wg := sync.WaitGroup{}
    wg.Add(2)
    go func() {defer wg.Done()
        for {
            // 接收客户端消息
            if res, err := all.Recv(); err != nil {fmt.Println(err)
                break
            } else {fmt.Println(res.Data)
            }
        }
    }()
    go func() {defer wg.Done()
        for i := 0; i < 10; i++ {
            // 往客户端发送消息
            _ = all.Send(&stream.ResponseInfo{Data: fmt.Sprintf(" 服务端消息:%v", time.Now().Unix()),
            })
            time.Sleep(time.Second)
        }
    }()
    wg.Wait()
    return nil
}

func main() {
    // 监听
    listen, _ := net.Listen("tcp", ":8080")
    // 实例化 grpc 服务
    s := grpc.NewServer()
    // 注册服务
    stream.RegisterStreamServer(s, &Stream{})
    // 启动
    s.Serve(listen)
}

客户端

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "test/stream"
)

func main() {
    // 建立连接
    conn, _ := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))

    // 实例化客户端
    client := stream.NewStreamClient(conn)

    // 调用服务
    wg := sync.WaitGroup{}
    wg.Add(2)
    all, _ := client.AllStream(context.Background())
    go func() {defer wg.Done()
        for {
            // 接收服务端消息
            if res, err := all.Recv(); err != nil {fmt.Println(err)
                break
            } else {fmt.Println(res.Data)
            }
        }
    }()
    go func() {defer wg.Done()
        for i := 0; i < 10; i++ {
            // 往服务端发送消息
            _ = all.Send(&stream.RequestInfo{Data: fmt.Sprintf(" 客户端消息:%v", time.Now().Unix()),
            })
            time.Sleep(time.Second)
        }
    }()
    wg.Wait()}
正文完
 
dkp
版权声明:本站原创文章,由 dkp 2024-01-04发表,共计3756字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。