示例源码:dubbo-go-samples/streaming。
Streaming 流式通信是 Dubbo3 新提供的一种 RPC 数据传输模式,适用于以下场景:
Streaming 流式通信类型分为以下三种:
本文档演示如何在 Dubbo-go 中使用流式通信。
在 proto 文件中,为需要流式通信的方法请求类型或响应类型添加 stream,然后使用 protoc-gen-go-triple
生成对应代码。
service GreetService {
rpc Greet(GreetRequest) returns (GreetResponse) {}
rpc GreetStream(stream GreetStreamRequest) returns (stream GreetStreamResponse) {}
rpc GreetClientStream(stream GreetClientStreamRequest) returns (GreetClientStreamResponse) {}
rpc GreetServerStream(GreetServerStreamRequest) returns (stream GreetServerStreamResponse) {}
}
编写服务端handler文件
源文件路径: dubbo-go-sample/streaming/go-server/cmd/server.go
type GreetTripleServer struct {
}
func (srv *GreetTripleServer) Greet(ctx context.Context, req *greet.GreetRequest) (*greet.GreetResponse, error) {
resp := &greet.GreetResponse{Greeting: req.Name}
return resp, nil
}
func (srv *GreetTripleServer) GreetStream(ctx context.Context, stream greet.GreetService_GreetStreamServer) error {
for {
req, err := stream.Recv()
if err != nil {
if triple.IsEnded(err) {
break
}
return fmt.Errorf("triple BidiStream recv error: %s", err)
}
if err := stream.Send(&greet.GreetStreamResponse{Greeting: req.Name}); err != nil {
return fmt.Errorf("triple BidiStream send error: %s", err)
}
}
return nil
}
func (srv *GreetTripleServer) GreetClientStream(ctx context.Context, stream greet.GreetService_GreetClientStreamServer) (*greet.GreetClientStreamResponse, error) {
var reqs []string
for stream.Recv() {
reqs = append(reqs, stream.Msg().Name)
}
if stream.Err() != nil && !triple.IsEnded(stream.Err()) {
return nil, fmt.Errorf("triple ClientStream recv err: %s", stream.Err())
}
resp := &greet.GreetClientStreamResponse{
Greeting: strings.Join(reqs, ","),
}
return resp, nil
}
func (srv *GreetTripleServer) GreetServerStream(ctx context.Context, req *greet.GreetServerStreamRequest, stream greet.GreetService_GreetServerStreamServer) error {
for i := 0; i < 5; i++ {
if err := stream.Send(&greet.GreetServerStreamResponse{Greeting: req.Name}); err != nil {
return fmt.Errorf("triple ServerStream send err: %s", err)
}
}
return nil
}
编写客户端client文件
源文件路径: dubbo-go-sample/streaming/go-client/cmd/client.go
func main() {
cli, err := client.NewClient(
client.WithClientURL("tri://127.0.0.1:20000"),
)
if err != nil {
panic(err)
}
svc, err := greet.NewGreetService(cli)
if err != nil {
panic(err)
}
TestClient(svc)
}
func TestClient(cli greet.GreetService) {
if err := testUnary(cli); err != nil {
logger.Error(err)
}
if err := testBidiStream(cli); err != nil {
logger.Error(err)
}
if err := testClientStream(cli); err != nil {
logger.Error(err)
}
if err := testServerStream(cli); err != nil {
logger.Error(err)
}
}
func testUnary(cli greet.GreetService) error {
logger.Info("start to test TRIPLE unary call")
resp, err := cli.Greet(context.Background(), &greet.GreetRequest{Name: "triple"})
if err != nil {
return err
}
logger.Infof("TRIPLE unary call resp: %s", resp.Greeting)
return nil
}
func testBidiStream(cli greet.GreetService) error {
logger.Info("start to test TRIPLE bidi stream")
stream, err := cli.GreetStream(context.Background())
if err != nil {
return err
}
if sendErr := stream.Send(&greet.GreetStreamRequest{Name: "triple"}); sendErr != nil {
return err
}
resp, err := stream.Recv()
if err != nil {
return err
}
logger.Infof("TRIPLE bidi stream resp: %s", resp.Greeting)
if err := stream.CloseRequest(); err != nil {
return err
}
if err := stream.CloseResponse(); err != nil {
return err
}
return nil
}
func testClientStream(cli greet.GreetService) error {
logger.Info("start to test TRIPLE client stream")
stream, err := cli.GreetClientStream(context.Background())
if err != nil {
return err
}
for i := 0; i < 5; i++ {
if sendErr := stream.Send(&greet.GreetClientStreamRequest{Name: "triple"}); sendErr != nil {
return err
}
}
resp, err := stream.CloseAndRecv()
if err != nil {
return err
}
logger.Infof("TRIPLE client stream resp: %s", resp.Greeting)
return nil
}
func testServerStream(cli greet.GreetService) error {
logger.Info("start to test TRIPLE server stream")
stream, err := cli.GreetServerStream(context.Background(), &greet.GreetServerStreamRequest{Name: "triple"})
if err != nil {
return err
}
for stream.Recv() {
logger.Infof("TRIPLE server stream resp: %s", stream.Msg().Greeting)
}
if stream.Err() != nil {
return err
}
if err := stream.Close(); err != nil {
return err
}
return nil
}
流式调用也可以通过生成的 stream API 读写 Triple metadata:
| 位置 | API | 用途 |
|---|---|---|
| 客户端流和双向流 client | RequestHeader() | 在发送第一条消息前写入请求 metadata |
| 服务端流和双向流 client | ResponseHeader() | 读取服务端返回的响应 header |
| 服务端流和双向流 client | ResponseTrailer() | 响应结束后读取服务端返回的 trailer |
| Provider 侧 stream handler | RequestHeader() | 读取客户端发送的请求 metadata |
| Provider 侧 stream handler | ResponseHeader() / ResponseTrailer() | 设置响应 header 和 trailer |
这些 metadata 使用 http.Header 表示,可以用 Set、Add、Get、Values 等标准方法读写。
例如,在 client 侧可以在发送第一条消息前写入请求 metadata:
stream, err := cli.GreetStream(context.Background())
if err != nil {
return err
}
stream.RequestHeader().Set("x-sample-token", "demo-token")
在服务端 stream handler 中,也可以返回响应 metadata:
func (srv *GreetTripleServer) GreetServerStream(
ctx context.Context,
req *greet.GreetServerStreamRequest,
stream greet.GreetService_GreetServerStreamServer,
) error {
stream.ResponseHeader().Set("x-stream-header", "ready")
stream.ResponseTrailer().Set("x-stream-trailer", "done")
return nil
}
运行服务端和客户端,可以看到请求正常返回
[start to test TRIPLE unary call]
TRIPLE unary call resp: [triple]
[start to test TRIPLE bidi stream]
TRIPLE bidi stream resp: [triple]
[start to test TRIPLE client stream]
TRIPLE client stream resp: [triple,triple,triple,triple,triple]
[start to test TRIPLE server stream]
TRIPLE server stream resp: [triple]
TRIPLE server stream resp: [triple]
TRIPLE server stream resp: [triple]
TRIPLE server stream resp: [triple]
TRIPLE server stream resp: [triple]