远程调用
我们的业务逻辑通常会依赖其他微服务,需要通过RPC调用其他微服务。go-kit 提供传输中间件来解决出现的许多问题。
现在,假设addService服务需要调用另外一个微服务——trim_service来实现Concat方法。
trim_service服务是一个去除空格的服务,具体功能是接收一个字符串参数,将其中所有的空格去掉后再返回。
此时需要编写另一个微服务
其中trim.proto内容如下:
syntax = "proto3";
package pb;
option go_package="trim_service/pb";
service Trim {
rpc TrimSpace (TrimRequest) returns (TrimResponse) {}
}
// Trim方法的请求参数
message TrimRequest {
string s = 1;
}
// Trim方法的响应
message TrimResponse {
string s = 1;
}
注册服务
// main.go
package main
import (
"context"
"flag"
"fmt"
"google.golang.org/grpc"
"net"
"strings"
"trim_service/pb"
)
var port = flag.Int("port", 8975, "service port")
// trim service
type server struct {
pb.UnimplementedTrimServer
}
// TrimSpace 去除字符串参数中的空格
func (s *server) TrimSpace(_ context.Context, req *pb.TrimRequest) (*pb.TrimResponse, error) {
ov := req.GetS()
v := strings.ReplaceAll(ov, " ", "")
fmt.Printf("ov:%s v:%v\n", ov, v)
return &pb.TrimResponse{S: v}, nil
}
func main() {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
fmt.Printf("failed to listen: %v", err)
return
}
s := grpc.NewServer()
pb.RegisterTrimServer(s, &server{})
err = s.Serve(lis)
if err != nil {
fmt.Printf("failed to serve: %v", err)
return
}
}
如同中间件一样
在调用方设置 中间件,这里需要注意的是endpoint
不仅可以在本地解耦还有调用外部功能的作用
type withTrimMiddleware struct {
next AddService
trimService endpoint.Endpoint // 通过它调用其他的服务
}
在endpoint
中层定义相关请求参数和响应参数。此处的endpoint与我们之前定义的endpoint有所不同,它属于客户端endpoint。之前的endpoint是我们的程序直接服务(serve)的,而TrimEndpoint是用来调用外部请求(invoke)的。
type trimRequest struct {
s string
}
type trimResponse struct {
s string
}
func makeTrimEndpoint(conn *grpc.ClientConn) endpoint.Endpoint {
return grpctransport.NewClient(
conn,
"pb.Trim",
"TrimSpace",
encodeTrimRequest,
decodeTrimResponse,
pb.TrimResponse{},
).Endpoint()
}
传输层进行解码编码
// encodeTrimRequest 将内部使用的数据编码为proto
func encodeTrimRequest(_ context.Context, response interface{}) (request interface{}, err error) {
resp := response.(trimRequest)
return &pb.TrimRequest{S: resp.s}, nil
}
// decodeTrimResponse 解析pb消息
func decodeTrimResponse(_ context.Context, in interface{}) (interface{}, error) {
resp := in.(*pb.TrimResponse)
return trimResponse{s: resp.S}, nil
}
服务层
type withTrimMiddleware struct {
next AddService
trimService endpoint.Endpoint // trim 交给这个endpoint处理
}
func NewServiceWithTrim(trimEndpoint endpoint.Endpoint, svc AddService) AddService {
return &withTrimMiddleware{
trimService: trimEndpoint,
next: svc,
}
}
func (mw withTrimMiddleware) Sum(ctx context.Context, a, b int) (res int, err error) {
return mw.Sum(ctx, a, b) // 与之前一致
}
// Concat 方法需要先发起gRPC调用外部trim service服务
func (mw withTrimMiddleware) Concat(ctx context.Context, a, b string) (res string, err error) {
// 先调用trim服务,去除字符串中可能存在的空格
respA, err := mw.trimService(ctx, trimRequest{s: a}) // 请求trim服务处理a
if err != nil {
return "", err
}
respB, err := mw.trimService(ctx, trimRequest{s: b}) // 请求trim服务处理b
if err != nil {
return "", err
}
trimA := respA.(trimResponse)
trimB := respB.(trimResponse)
return mw.next.Concat(ctx, trimA.s, trimB.s)
}