使用 Go kit 构建的服务分为三层:
传输层(Transport layer)
端点层(Endpoint layer)
服务层(Service layer)
请求在第1层进入服务,向下流到第3层,响应则相反。
Transports
传输域绑定到具体的传输协议,如 HTTP 或 gRPC。在一个微服务可能支持一个或多个传输协议的世界中,这是非常强大的:你可以在单个微服务中支持原有的 HTTP API 和新增的 RPC 服务。
Endpoints
端点就像控制器上的动作/处理程序; 它是安全性和抗脆弱性逻辑的所在。如果实现两种传输(HTTP 和 gRPC) ,则可能有两种将请求发送到同一端点的方法。
Services
服务(指Go kit中的service层)是实现所有业务逻辑的地方。服务层通常将多个端点粘合在一起。在 Go kit 中,服务层通常被抽象为接口,这些接口的实现包含业务逻辑。Go kit 服务层应该努力遵守整洁架构或六边形架构。也就是说,业务逻辑不需要了解端点(尤其是传输域)概念:你的服务层不应该关心HTTP 头或 gRPC 错误代码。
Middlewares
Go kit 试图通过使用中间件(或装饰器)模式来执行严格的关注分离(separation of concerns)。中间件可以包装端点或服务以添加功能,比如日志记录、速率限制、负载平衡或分布式跟踪。围绕一个端点或服务链接多个中间件是很常见的。
接下来通过一个AddServer的例子来简单了解
逻辑层
首先定义一个借口类型,接口设定了要实现的方法。
// AddService 把两个东西加到一起
type AddService interface {
Sum(ctx context.Context, a, b int) (int, error)
Concat(ctx context.Context, a, b string) (string, error)
}
紧接着定义实现
type addService struct{} // 创建结构体定义方法,实现接口
const maxLen = 10
var (
// ErrTwoZeroes Sum方法的业务规则不能对两个0求和
ErrTwoZeroes = errors.New("can't sum two zeroes")
// ErrIntOverflow Sum参数越界
ErrIntOverflow = errors.New("integer overflow")
// ErrTwoEmptyStrings Concat方法业务规则规定参数不能是两个空字符串.
ErrTwoEmptyStrings = errors.New("can't concat two empty strings")
// ErrMaxSizeExceeded Concat方法的参数超出范围
ErrMaxSizeExceeded = errors.New("result exceeds maximum size")
)
// Sum 对两个数字求和,实现AddService。
func (s addService) Sum(_ context.Context, a, b int) (int, error) {
if a == 0 && b == 0 {
return 0, ErrTwoZeroes
}
if (b > 0 && a > (math.MaxInt-b)) || (b < 0 && a < (math.MinInt-b)) {
return 0, ErrIntOverflow
}
return a + b, nil
}
// Concat 连接两个字符串,实现AddService。
func (s addService) Concat(_ context.Context, a, b string) (string, error) {
if a == "" && b == "" {
return "", ErrTwoEmptyStrings
}
if len(a)+len(b) > maxLen {
return "", ErrMaxSizeExceeded
}
return a + b, nil
}
端点层
在 Go kit 中,主要的消息模式是 RPC。因此,我们接口中的每个方法都将被建模为一个远程过程调用。对于每个方法,我们定义请求和响应结构体,分别捕获所有的输入和输出参数。
📌 结构体放在 endpoint 层,可以:
让 transport 用它来解码请求
让 service 接受 unpack 后的参数
并且定义请求和响应结构体
这样做有以下好处:
因此定义请求响应结构体
// SumRequest Sum方法的参数.
type SumRequest struct {
A int `json:"a"`
B int `json:"b"`
}
// SumResponse Sum方法的响应
type SumResponse struct {
V int `json:"v"`
Err string `json:"err,omitempty"`
}
// ConcatRequest Concat方法的参数.
type ConcatRequest struct {
A string `json:"a"`
B string `json:"b"`
}
// ConcatResponse Concat方法的响应.
type ConcatResponse struct {
V string `json:"v"`
Err string `json:"err,omitempty"`
}
import "github.com/go-kit/kit/endpoint"
//定义endpoint.Endpoint,endpoint.Endpoint 将所有请求和响应全都抽象为request和response
func makeSumEndpoint(svc AddService) endpoint.Endpoint {
//这里传入的是接口,这样以后所有实现接口的结构体都可以调用此方法
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(SumRequest) //将接口进行类型断言
v, err := svc.Sum(ctx, req.A, req.B)
if err != nil {
return SumResponse{V: v, Err: err.Error()}, nil
}
return SumResponse{V: v}, nil
}
}
func makeConcatEndpoint(svc AddService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(ConcatRequest)
v, err := svc.Concat(ctx, req.A, req.B)
if err != nil {
return ConcatResponse{V: v, Err: err.Error()}, nil
}
return ConcatResponse{V: v}, nil
}
}
接下来是传输层,在传输层中需要定义传输的协议即编解码方法。
import httptransport "github.com/go-kit/kit/transport/http"
func decodeSumRequest(_ context.Context, r *http.Request) (interface{}, error) {
var request SumRequest
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
return nil, err
}
return request, nil
}
func decodeCountRequest(_ context.Context, r *http.Request) (interface{}, error) {
var request ConcatRequest
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
return nil, err
}
return request, nil
}
func encodeResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
return json.NewEncoder(w).Encode(response)
}
func main() {
svc := addService{}
sumHandler := httptransport.NewServer(
makeSumEndpoint(svc),
decodeSumRequest,
encodeResponse,
)
concatHandler := httptransport.NewServer(
makeConcatEndpoint(svc),
decodeCountRequest,
encodeResponse,
)
http.Handle("/sum", sumHandler)
http.Handle("/concat", concatHandler)
log.Fatal(http.ListenAndServe(":8080", nil))
}
GRPC版本
先写proto文件
syntax = "proto3";
package pb;
option go_package="addsrv/pb";
service Add {
// Sum 对两个数字求和
rpc Sum (SumRequest) returns (SumResponse) {}
// Concat 方法拼接两个字符串
rpc Concat (ConcatRequest) returns (ConcatResponse) {}
}
// Sum方法的请求参数
message SumRequest {
int64 a = 1;
int64 b = 2;
}
// Sum方法的响应
message SumResponse {
int64 v = 1;
string err = 2;
}
// Concat方法的请求参数
message ConcatRequest {
string a = 1;
string b = 2;
}
// Concat方法的响应
message ConcatResponse {
string v = 1;
string err = 2;
}
生成后在传输层编写结构体,不同于传统的grpc,go-kit 中的grpc需要编写Handler,他的作用是解耦“传输层”与“业务逻辑层”,是架构中的中介桥梁。
📌 它的职责主要是三件事:
- 请求解码(Request Decoder)
把 gRPC/HTTP 等协议的请求结构 req 转换为通用的 endpoint 输入结构。
- 调用 Endpoint
endpoint 是业务逻辑的抽象封装,它与传输协议无关(对内只处理标准的输入输出结构)。
- 响应编码(Response Encoder)
将 endpoint 的返回结果再次封装成 gRPC/HTTP 等协议需要的响应结构 resp。
import grpctransport "github.com/go-kit/kit/transport/grpc"
//我们自定义了一个结构体 grpcServer,它实现了 pb.AddServer 接口,以便作为 gRPC 的服务端注册到 grpc.Server 中。
type grpcServer struct {
pb.UnimplementedAddServer
sum grpctransport.Handler
concat grpctransport.Handler
}
//虽然 sum 和 concat 已经封装好了 handler 逻辑,但由于 grpcServer 结构体要作为服务端被 gRPC 注册,
//必须实现 pb.AddServer 接口中定义的方法,将 gRPC 的调用交由已经封装好的 Handler 处理,自己只做一个调用的转发器。
func (s *grpcServer) Sum(ctx context.Context, req *pb.SumRequest) (*pb.SumResponse, error) {
_, resp, err := s.sum.ServeGRPC(ctx, req)
if err != nil {
return nil, err
}
return resp.(*pb.SumResponse), nil
}
func (s *grpcServer) Concat(ctx context.Context, req *pb.ConcatRequest) (*pb.ConcatResponse, error) {
_, resp, err := s.concat.ServeGRPC(ctx, req)
if err != nil {
return nil, err
}
return resp.(*pb.ConcatResponse), nil
}
/ decodeGRPCSumRequest 将Sum方法的gRPC请求参数转为内部的SumRequest
func decodeGRPCSumRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*pb.SumRequest)
return SumRequest{A: int(req.A), B: int(req.B)}, nil
}
// decodeGRPCConcatRequest 将Concat方法的gRPC请求参数转为内部的ConcatRequest
func decodeGRPCConcatRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*pb.ConcatRequest)
return ConcatRequest{A: req.A, B: req.B}, nil
}
// encodeGRPCSumResponse 封装Sum的gRPC响应
func encodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) {
resp := response.(SumResponse)
return &pb.SumResponse{V: int64(resp.V), Err: resp.Err}, nil
}
// encodeGRPCConcatResponse 封装Concat的gRPC响应
func encodeGRPCConcatResponse(_ context.Context, response interface{}) (interface{}, error) {
resp := response.(ConcatResponse)
return &pb.ConcatResponse{V: resp.V, Err: resp.Err}, nil
}
// NewGRPCServer grpcServer构造函数
//构造函数 NewGRPCServer 完成了整个传输层的依赖注入,核心逻辑是将 endpoint 与编解码器绑定在一起,生成 handler:
//transport 层完全不依赖于业务实现细节,只需要传入接口 AddService 即可
func NewGRPCServer(svc AddService) pb.AddServer {
return &grpcServer{
sum: grpctransport.NewServer(
makeSumEndpoint(svc),
decodeGRPCSumRequest,
encodeGRPCSumResponse,
),
concat: grpctransport.NewServer(
makeConcatEndpoint(svc),
decodeGRPCConcatRequest,
encodeGRPCConcatResponse,
),
}
}
代码分层
service
service层负责我们业务逻辑的实现。
在项目下新建service.go文件,将与业务逻辑相关的代码保存至service.go文件中。
// service.go
import (
"context"
"errors"
)
// AddService 列出当前服务所有RPC方法的接口类型
type AddService interface {
Sum(ctx context.Context, a, b int) (int, error)
Concat(ctx context.Context, a, b string) (string, error)
}
// addService 实现AddService接口
type addService struct {
// ...
}
var (
// ErrEmptyString 两个参数都是空字符串的错误
ErrEmptyString = errors.New("两个参数都是空字符串")
)
// Sum 返回两个数的和
func (addService) Sum(_ context.Context, a, b int) (int, error) {
// 业务逻辑
return a + b, nil
}
// Concat 拼接两个字符串
func (addService) Concat(_ context.Context, a, b string) (string, error) {
if a == "" && b == "" {
return "", ErrEmptyString
}
return a + b, nil
}
// NewService 创建一个add service
func NewService() AddService {
return &addService{}
}
endpoint
endpoint层负责存放我们项目中对外暴露的RPC方法。
将以下代码存放在项目目录下的endpoint.go文件中。
// endpoint.go
import (
"context"
"github.com/go-kit/kit/endpoint"
)
type SumRequest struct {
A int `json:"a"`
B int `json:"b"`
}
type SumResponse struct {
V int `json:"v"`
Err string `json:"err,omitempty"`
}
type ConcatRequest struct {
A string `json:"a"`
B string `json:"b"`
}
type ConcatResponse struct {
V string `json:"v"`
Err string `json:"err,omitempty"`
}
func makeSumEndpoint(srv AddService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(SumRequest)
v, err := srv.Sum(ctx, req.A, req.B) // 方法调用
if err != nil {
return SumResponse{V: v, Err: err.Error()}, nil
}
return SumResponse{V: v}, nil
}
}
func makeConcatEndpoint(srv AddService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(ConcatRequest)
v, err := srv.Concat(ctx, req.A, req.B) // 方法调用
if err != nil {
return ConcatResponse{V: v, Err: err.Error()}, nil
}
return ConcatResponse{V: v}, nil
}
}
transport
transport层表示项目对外通信相关的部分,包括对外支持的协议等内容。
将项目中与网络传输相关的代码保存至项目目录下的transport.go文件中。
// transport.go
import (
"context"
"encoding/json"
"net/http"
grpctransport "github.com/go-kit/kit/transport/grpc"
httptransport "github.com/go-kit/kit/transport/http"
"github.com/gorilla/mux"
"addsrv/pb"
)
// gRPC的请求与响应
// decodeGRPCSumRequest 将Sum方法的gRPC请求参数转为内部的SumRequest
func decodeGRPCSumRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*pb.SumRequest)
return SumRequest{A: int(req.A), B: int(req.B)}, nil
}
// decodeGRPCConcatRequest 将Concat方法的gRPC请求参数转为内部的ConcatRequest
func decodeGRPCConcatRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*pb.ConcatRequest)
return ConcatRequest{A: req.A, B: req.B}, nil
}
// encodeGRPCSumResponse 封装Sum的gRPC响应
func encodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) {
resp := response.(SumResponse)
return &pb.SumResponse{V: int64(resp.V), Err: resp.Err}, nil
}
// encodeGRPCConcatResponse 封装Concat的gRPC响应
func encodeGRPCConcatResponse(_ context.Context, response interface{}) (interface{}, error) {
resp := response.(ConcatResponse)
return &pb.ConcatResponse{V: resp.V, Err: resp.Err}, nil
}
// gRPC
type grpcServer struct {
pb.UnimplementedAddServer
sum grpctransport.Handler
concat grpctransport.Handler
}
func (s grpcServer) Sum(ctx context.Context, req *pb.SumRequest) (*pb.SumResponse, error) {
_, resp, err := s.sum.ServeGRPC(ctx, req)
if err != nil {
return nil, err
}
return resp.(*pb.SumResponse), nil
}
func (s grpcServer) Concat(ctx context.Context, req *pb.ConcatRequest) (*pb.ConcatResponse, error) {
_, resp, err := s.concat.ServeGRPC(ctx, req)
if err != nil {
return nil, err
}
return resp.(*pb.ConcatResponse), nil
}
// NewGRPCServer 构造函数
func NewGRPCServer(svc AddService) pb.AddServer {
return &grpcServer{
sum: grpctransport.NewServer(
makeSumEndpoint(svc), // endpoint
decodeGRPCSumRequest,
encodeGRPCSumResponse,
),
concat: grpctransport.NewServer(
makeConcatEndpoint(svc),
decodeGRPCConcatRequest,
encodeGRPCConcatResponse,
),
}
}
// HTTP
func decodeSumRequest(ctx context.Context, r *http.Request) (interface{}, error) {
var request SumRequest
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
return nil, err
}
return request, nil
}
func decodeConcatRequest(ctx context.Context, r *http.Request) (interface{}, error) {
var request ConcatRequest
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
return nil, err
}
return request, nil
}
func encodeResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error {
return json.NewEncoder(w).Encode(response)
}
// HTTP Server
func NewHTTPServer(svc AddService) http.Handler {
sumHandler := httptransport.NewServer(
makeSumEndpoint(svc),
decodeSumRequest,
encodeResponse,
)
concatHandler := httptransport.NewServer(
makeConcatEndpoint(svc),
decodeConcatRequest,
encodeResponse,
)
// use github.com/gorilla/mux
r := mux.NewRouter()
r.Handle("/sum", sumHandler).Methods("POST")
r.Handle("/concat", concatHandler).Methods("POST")
// use gin
// r := gin.Default()
// r.POST("/sum", gin.WrapH(sumHandler))
// r.POST("/concat", gin.WrapH(concatHandler))
return r
}