Go-kit 框架学习

使用 Go kit 构建的服务分为三层:

  • 传输层(Transport layer)

  • 端点层(Endpoint layer)

  • 服务层(Service layer)

请求在第1层进入服务,向下流到第3层,响应则相反。

Transports
传输域绑定到具体的传输协议,如 HTTP 或 gRPC。在一个微服务可能支持一个或多个传输协议的世界中,这是非常强大的:你可以在单个微服务中支持原有的 HTTP API 和新增的 RPC 服务。
Endpoints
端点就像控制器上的动作/处理程序; 它是安全性和抗脆弱性逻辑的所在。如果实现两种传输(HTTP 和 gRPC) ,则可能有两种将请求发送到同一端点的方法。

Services
服务(指Go kit中的service层)是实现所有业务逻辑的地方。服务层通常将多个端点粘合在一起。在 Go kit 中,服务层通常被抽象为接口,这些接口的实现包含业务逻辑。Go kit 服务层应该努力遵守整洁架构或六边形架构。也就是说,业务逻辑不需要了解端点(尤其是传输域)概念:你的服务层不应该关心HTTP 头或 gRPC 错误代码。

Middlewares
Go kit 试图通过使用中间件(或装饰器)模式来执行严格的关注分离(separation of concerns)。中间件可以包装端点或服务以添加功能,比如日志记录、速率限制、负载平衡或分布式跟踪。围绕一个端点或服务链接多个中间件是很常见的。

接下来通过一个AddServer的例子来简单了解

逻辑层

首先定义一个借口类型,接口设定了要实现的方法。

// AddService 把两个东西加到一起
type AddService interface {
    Sum(ctx context.Context, a, b int) (int, error)
    Concat(ctx context.Context, a, b string) (string, error)
}

紧接着定义实现

type addService struct{} // 创建结构体定义方法,实现接口

const maxLen = 10

var (
    // ErrTwoZeroes  Sum方法的业务规则不能对两个0求和
    ErrTwoZeroes = errors.New("can't sum two zeroes")

    // ErrIntOverflow Sum参数越界
    ErrIntOverflow = errors.New("integer overflow")

    // ErrTwoEmptyStrings Concat方法业务规则规定参数不能是两个空字符串.
    ErrTwoEmptyStrings = errors.New("can't concat two empty strings")

    // ErrMaxSizeExceeded Concat方法的参数超出范围
    ErrMaxSizeExceeded = errors.New("result exceeds maximum size")
)

// Sum 对两个数字求和,实现AddService。
func (s addService) Sum(_ context.Context, a, b int) (int, error) {
    if a == 0 && b == 0 {
        return 0, ErrTwoZeroes
    }
    if (b > 0 && a > (math.MaxInt-b)) || (b < 0 && a < (math.MinInt-b)) {
        return 0, ErrIntOverflow
    }
    return a + b, nil
}

// Concat 连接两个字符串,实现AddService。
func (s addService) Concat(_ context.Context, a, b string) (string, error) {
    if a == "" && b == "" {
        return "", ErrTwoEmptyStrings
    }
    if len(a)+len(b) > maxLen {
        return "", ErrMaxSizeExceeded
    }
    return a + b, nil
}

端点层

在 Go kit 中,主要的消息模式是 RPC。因此,我们接口中的每个方法都将被建模为一个远程过程调用。对于每个方法,我们定义请求和响应结构体,分别捕获所有的输入和输出参数。



📌 结构体放在 endpoint 层,可以:

  • 让 transport 用它来解码请求

  • 让 service 接受 unpack 后的参数

并且定义请求和响应结构体
这样做有以下好处:





因此定义请求响应结构体

// SumRequest Sum方法的参数.
type SumRequest struct {
    A int `json:"a"`
    B int `json:"b"`
}

// SumResponse Sum方法的响应
type SumResponse struct {
    V   int    `json:"v"`
    Err string `json:"err,omitempty"`
}

// ConcatRequest Concat方法的参数.
type ConcatRequest struct {
    A string `json:"a"`
    B string `json:"b"`
}

// ConcatResponse  Concat方法的响应.
type ConcatResponse struct {
    V   string `json:"v"`
    Err string `json:"err,omitempty"`
}

import "github.com/go-kit/kit/endpoint"
//定义endpoint.Endpoint,endpoint.Endpoint 将所有请求和响应全都抽象为request和response
func makeSumEndpoint(svc AddService) endpoint.Endpoint {
//这里传入的是接口,这样以后所有实现接口的结构体都可以调用此方法
    return func(ctx context.Context, request interface{}) (interface{}, error) {
        req := request.(SumRequest) //将接口进行类型断言
        v, err := svc.Sum(ctx, req.A, req.B)
        if err != nil {
            return SumResponse{V: v, Err: err.Error()}, nil
        }
        return SumResponse{V: v}, nil
    }
}

func makeConcatEndpoint(svc AddService) endpoint.Endpoint {
    return func(ctx context.Context, request interface{}) (interface{}, error) {
        req := request.(ConcatRequest)
        v, err := svc.Concat(ctx, req.A, req.B)
        if err != nil {
            return ConcatResponse{V: v, Err: err.Error()}, nil
        }
        return ConcatResponse{V: v}, nil
    }
}

接下来是传输层,在传输层中需要定义传输的协议即编解码方法。

import httptransport "github.com/go-kit/kit/transport/http"

func decodeSumRequest(_ context.Context, r *http.Request) (interface{}, error) {
    var request SumRequest
    if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
        return nil, err
    }
    return request, nil
}

func decodeCountRequest(_ context.Context, r *http.Request) (interface{}, error) {
    var request ConcatRequest
    if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
        return nil, err
    }
    return request, nil
}

func encodeResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
    return json.NewEncoder(w).Encode(response)
}

func main() {
    svc := addService{}

    sumHandler := httptransport.NewServer(
        makeSumEndpoint(svc),
        decodeSumRequest,
        encodeResponse,
    )

    concatHandler := httptransport.NewServer(
        makeConcatEndpoint(svc),
        decodeCountRequest,
        encodeResponse,
    )

    http.Handle("/sum", sumHandler)
    http.Handle("/concat", concatHandler)
    log.Fatal(http.ListenAndServe(":8080", nil))
}

GRPC版本

先写proto文件

syntax = "proto3";

package pb;

option go_package="addsrv/pb";


service Add {
  // Sum 对两个数字求和
  rpc Sum (SumRequest) returns (SumResponse) {}

  // Concat 方法拼接两个字符串
  rpc Concat (ConcatRequest) returns (ConcatResponse) {}
}


// Sum方法的请求参数
message SumRequest {
  int64 a = 1;
  int64 b = 2;
}

// Sum方法的响应
message SumResponse {
  int64 v = 1;
  string err = 2;
}

// Concat方法的请求参数
message ConcatRequest {
  string a = 1;
  string b = 2;
}

// Concat方法的响应
message ConcatResponse {
  string v = 1;
  string err = 2;
}

生成后在传输层编写结构体,不同于传统的grpc,go-kit 中的grpc需要编写Handler,他的作用是解耦“传输层”与“业务逻辑层”,是架构中的中介桥梁。
📌 它的职责主要是三件事:

  • 请求解码(Request Decoder)

把 gRPC/HTTP 等协议的请求结构 req 转换为通用的 endpoint 输入结构。

  • 调用 Endpoint

endpoint 是业务逻辑的抽象封装,它与传输协议无关(对内只处理标准的输入输出结构)。

  • 响应编码(Response Encoder)

将 endpoint 的返回结果再次封装成 gRPC/HTTP 等协议需要的响应结构 resp。

import grpctransport "github.com/go-kit/kit/transport/grpc"

//我们自定义了一个结构体 grpcServer,它实现了 pb.AddServer 接口,以便作为 gRPC 的服务端注册到 grpc.Server 中。
type grpcServer struct {
    pb.UnimplementedAddServer
    sum    grpctransport.Handler
    concat grpctransport.Handler
}
//虽然 sum 和 concat 已经封装好了 handler 逻辑,但由于 grpcServer 结构体要作为服务端被 gRPC 注册,
//必须实现 pb.AddServer 接口中定义的方法,将 gRPC 的调用交由已经封装好的 Handler 处理,自己只做一个调用的转发器。
func (s *grpcServer) Sum(ctx context.Context, req *pb.SumRequest) (*pb.SumResponse, error) {
    _, resp, err := s.sum.ServeGRPC(ctx, req)
    if err != nil {
        return nil, err
    }
    return resp.(*pb.SumResponse), nil
}

func (s *grpcServer) Concat(ctx context.Context, req *pb.ConcatRequest) (*pb.ConcatResponse, error) {
    _, resp, err := s.concat.ServeGRPC(ctx, req)
    if err != nil {
        return nil, err
    }
    return resp.(*pb.ConcatResponse), nil
}
/ decodeGRPCSumRequest 将Sum方法的gRPC请求参数转为内部的SumRequest
func decodeGRPCSumRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
    req := grpcReq.(*pb.SumRequest)
    return SumRequest{A: int(req.A), B: int(req.B)}, nil
}

// decodeGRPCConcatRequest 将Concat方法的gRPC请求参数转为内部的ConcatRequest
func decodeGRPCConcatRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
    req := grpcReq.(*pb.ConcatRequest)
    return ConcatRequest{A: req.A, B: req.B}, nil
}

// encodeGRPCSumResponse 封装Sum的gRPC响应 
func encodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) {
    resp := response.(SumResponse)
    return &pb.SumResponse{V: int64(resp.V), Err: resp.Err}, nil
}

// encodeGRPCConcatResponse 封装Concat的gRPC响应
func encodeGRPCConcatResponse(_ context.Context, response interface{}) (interface{}, error) {
    resp := response.(ConcatResponse)
    return &pb.ConcatResponse{V: resp.V, Err: resp.Err}, nil
}

// NewGRPCServer grpcServer构造函数
//构造函数 NewGRPCServer 完成了整个传输层的依赖注入,核心逻辑是将 endpoint 与编解码器绑定在一起,生成 handler:
//transport 层完全不依赖于业务实现细节,只需要传入接口 AddService 即可
func NewGRPCServer(svc AddService) pb.AddServer {
    return &grpcServer{
        sum: grpctransport.NewServer(
            makeSumEndpoint(svc),
            decodeGRPCSumRequest,
            encodeGRPCSumResponse,
        ),
        concat: grpctransport.NewServer(
            makeConcatEndpoint(svc),
            decodeGRPCConcatRequest,
            encodeGRPCConcatResponse,
        ),
    }
}

代码分层

service
service层负责我们业务逻辑的实现。

在项目下新建service.go文件,将与业务逻辑相关的代码保存至service.go文件中。

// service.go

import (
    "context"
    "errors"
)

// AddService 列出当前服务所有RPC方法的接口类型
type AddService interface {
    Sum(ctx context.Context, a, b int) (int, error)
    Concat(ctx context.Context, a, b string) (string, error)
}

// addService 实现AddService接口
type addService struct {
    // ...
}

var (
    // ErrEmptyString 两个参数都是空字符串的错误
    ErrEmptyString = errors.New("两个参数都是空字符串")
)

// Sum 返回两个数的和
func (addService) Sum(_ context.Context, a, b int) (int, error) {
    // 业务逻辑
    return a + b, nil
}

// Concat 拼接两个字符串
func (addService) Concat(_ context.Context, a, b string) (string, error) {
    if a == "" && b == "" {
        return "", ErrEmptyString
    }
    return a + b, nil
}

// NewService 创建一个add service
func NewService() AddService {
    return &addService{}
}

endpoint
endpoint层负责存放我们项目中对外暴露的RPC方法。

将以下代码存放在项目目录下的endpoint.go文件中。

// endpoint.go

import (
    "context"

    "github.com/go-kit/kit/endpoint"
)

type SumRequest struct {
    A int `json:"a"`
    B int `json:"b"`
}

type SumResponse struct {
    V   int    `json:"v"`
    Err string `json:"err,omitempty"`
}

type ConcatRequest struct {
    A string `json:"a"`
    B string `json:"b"`
}

type ConcatResponse struct {
    V   string `json:"v"`
    Err string `json:"err,omitempty"`
}

func makeSumEndpoint(srv AddService) endpoint.Endpoint {
    return func(ctx context.Context, request interface{}) (interface{}, error) {
        req := request.(SumRequest)
        v, err := srv.Sum(ctx, req.A, req.B) // 方法调用
        if err != nil {
            return SumResponse{V: v, Err: err.Error()}, nil
        }
        return SumResponse{V: v}, nil
    }
}

func makeConcatEndpoint(srv AddService) endpoint.Endpoint {
    return func(ctx context.Context, request interface{}) (interface{}, error) {
        req := request.(ConcatRequest)
        v, err := srv.Concat(ctx, req.A, req.B) // 方法调用
        if err != nil {
            return ConcatResponse{V: v, Err: err.Error()}, nil
        }
        return ConcatResponse{V: v}, nil
    }
}

transport
transport层表示项目对外通信相关的部分,包括对外支持的协议等内容。

将项目中与网络传输相关的代码保存至项目目录下的transport.go文件中。

// transport.go

import (
    "context"
    "encoding/json"
    "net/http"

    grpctransport "github.com/go-kit/kit/transport/grpc"
    httptransport "github.com/go-kit/kit/transport/http"
    "github.com/gorilla/mux"

    "addsrv/pb"
)

// gRPC的请求与响应
// decodeGRPCSumRequest 将Sum方法的gRPC请求参数转为内部的SumRequest
func decodeGRPCSumRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
    req := grpcReq.(*pb.SumRequest)
    return SumRequest{A: int(req.A), B: int(req.B)}, nil
}

// decodeGRPCConcatRequest 将Concat方法的gRPC请求参数转为内部的ConcatRequest
func decodeGRPCConcatRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
    req := grpcReq.(*pb.ConcatRequest)
    return ConcatRequest{A: req.A, B: req.B}, nil
}

// encodeGRPCSumResponse 封装Sum的gRPC响应
func encodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) {
    resp := response.(SumResponse)
    return &pb.SumResponse{V: int64(resp.V), Err: resp.Err}, nil
}

// encodeGRPCConcatResponse 封装Concat的gRPC响应
func encodeGRPCConcatResponse(_ context.Context, response interface{}) (interface{}, error) {
    resp := response.(ConcatResponse)
    return &pb.ConcatResponse{V: resp.V, Err: resp.Err}, nil
}

// gRPC
type grpcServer struct {
    pb.UnimplementedAddServer

    sum    grpctransport.Handler
    concat grpctransport.Handler
}

func (s grpcServer) Sum(ctx context.Context, req *pb.SumRequest) (*pb.SumResponse, error) {
    _, resp, err := s.sum.ServeGRPC(ctx, req)
    if err != nil {
        return nil, err
    }
    return resp.(*pb.SumResponse), nil
}

func (s grpcServer) Concat(ctx context.Context, req *pb.ConcatRequest) (*pb.ConcatResponse, error) {
    _, resp, err := s.concat.ServeGRPC(ctx, req)
    if err != nil {
        return nil, err
    }
    return resp.(*pb.ConcatResponse), nil
}

// NewGRPCServer 构造函数
func NewGRPCServer(svc AddService) pb.AddServer {
    return &grpcServer{
        sum: grpctransport.NewServer(
            makeSumEndpoint(svc), // endpoint
            decodeGRPCSumRequest,
            encodeGRPCSumResponse,
        ),
        concat: grpctransport.NewServer(
            makeConcatEndpoint(svc),
            decodeGRPCConcatRequest,
            encodeGRPCConcatResponse,
        ),
    }
}

// HTTP
func decodeSumRequest(ctx context.Context, r *http.Request) (interface{}, error) {
    var request SumRequest
    if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
        return nil, err
    }
    return request, nil
}

func decodeConcatRequest(ctx context.Context, r *http.Request) (interface{}, error) {
    var request ConcatRequest
    if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
        return nil, err
    }
    return request, nil
}

func encodeResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error {
    return json.NewEncoder(w).Encode(response)
}

// HTTP Server
func NewHTTPServer(svc AddService) http.Handler {
    sumHandler := httptransport.NewServer(
        makeSumEndpoint(svc),
        decodeSumRequest,
        encodeResponse,
    )

    concatHandler := httptransport.NewServer(
        makeConcatEndpoint(svc),
        decodeConcatRequest,
        encodeResponse,
    )
    // use github.com/gorilla/mux
    r := mux.NewRouter()
    r.Handle("/sum", sumHandler).Methods("POST")
    r.Handle("/concat", concatHandler).Methods("POST")

    // use gin
    // r := gin.Default()
    // r.POST("/sum", gin.WrapH(sumHandler))
    // r.POST("/concat", gin.WrapH(concatHandler))
    return r
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容