评价系统(7)

添加redis与singleflight,并使用es查询

首先添加redis配置项与es地址。由于之前的conf/conf.proto已经有了redis,此时只需补充es配置即可。


message Bootstrap {
  Server server = 1;
  Data data = 2;
  Snowflake snowflake = 3;
  Elasticsearch elasticsearch = 4;
}

message Elasticsearch {
  repeated string addresses = 1;
}

之后生成配置,修改config.yaml

elasticsearch:
  addresses:
    - "http://127.0.0.1:9200"

接着以 ListReviewByStoreID为例编写流程。
首先添加service


func (s *ReviewService) ListReviewByStoreID(ctx context.Context, req *pb.ListReviewByStoreIDRequest) (*pb.ListReviewByStoreIDReply, error) {
    fmt.Printf("[service] ListReviewByStoreID req:%#v\n", req)
    reviewList, err := s.uc.ListReviewByStoreID(ctx, req.StoreID, int(req.Page), int(req.Size))
    if err != nil {
        return nil, err
    }
    // format
    list := make([]*pb.ReviewInfo, 0, len(reviewList))
    for _, r := range reviewList {
        list = append(list, &pb.ReviewInfo{
            ReviewID:     r.ReviewID,
            UserID:       r.UserID,
            OrderID:      r.OrderID,
            Score:        r.Score,
            ServiceScore: r.ServiceScore,
            ExpressScore: r.ExpressScore,
            Content:      r.Content,
            PicInfo:      r.PicInfo,
            VideoInfo:    r.VideoInfo,
            Status:       r.Status,
        })
    }

    return &pb.ListReviewByStoreIDReply{List: list}, nil
}

再添加biz

type ReviewRepo interface {

    ListReviewByStoreID(ctx context.Context, storeID int64, offset, limit int) ([]*MyReviewInfo, error)
}

// ListReviewByStoreID 根据storeID分页查询评价
func (uc ReviewUsecase) ListReviewByStoreID(ctx context.Context, storeID int64, page, size int) ([]*MyReviewInfo, error) {
    if page <= 0 {
        page = 1
    }
    if size <= 0 || size > 50 {
        size = 10
    }
    offset := (page - 1) * size
    limit := size
    uc.log.WithContext(ctx).Debugf("[biz] ListReviewByStoreID storeID:%v", storeID)
    return uc.repo.ListReviewByStoreID(ctx, storeID, offset, limit)
}

type MyReviewInfo struct {
    *model.ReviewInfo
    CreateAt     MyTime `json:"create_at"` // 创建时间
    UpdateAt     MyTime `json:"update_at"` // 创建时间
    Anonymous    int32  `json:"anonymous,string"`
    Score        int32  `json:"score,string"`
    ServiceScore int32  `json:"service_score,string"`
    ExpressScore int32  `json:"express_score,string"`
    HasMedia     int32  `json:"has_media,string"`
    Status       int32  `json:"status,string"`
    IsDefault    int32  `json:"is_default,string"`
    HasReply     int32  `json:"has_reply,string"`
    ID           int64  `json:"id,string"`
    Version      int32  `json:"version,string"`
    ReviewID     int64  `json:"review_id,string"`
    OrderID      int64  `json:"order_id,string"`
    SkuID        int64  `json:"sku_id,string"`
    SpuID        int64  `json:"spu_id,string"`
    StoreID      int64  `json:"store_id,string"`
    UserID       int64  `json:"user_id,string"`
}

type MyTime time.Time

// UnmarshalJSON json.Unmarshal 的时候会自动调用这个方法
func (t *MyTime) UnmarshalJSON(data []byte) error {
    // data = "\"2023-12-17 14:20:18\""
    s := strings.Trim(string(data), `"`)
    tmp, err := time.Parse(time.DateTime, s)
    if err != nil {
        return err
    }
    *t = MyTime(tmp)
    return nil
}

在代码中可以看到设计了一个MyReviewInfo一个新结构体,该结构体中自定义了type MyTime time.Time,和int类型,这是因为在es中获得的信息都是string类型需要将其转换。
最后编写data层,首先是/data/data.go需要定义redis,es客户端的连接,方便添加缓存和查找。然后编写/data/review.go添加逻辑。

type Data struct {
    // db *gorm.DB
    query *query.Query
    log   *log.Helper
    es    *elasticsearch.TypedClient // github.com/elastic/go-elasticsearch/v8
    rdb   *redis.Client
}

// NewESClient ES Client 的构造函数
func NewESClient(cfg *conf.Elasticsearch) (*elasticsearch.TypedClient, error) {
    // ES 配置
    c := elasticsearch.Config{
        Addresses: cfg.GetAddresses(),
    }

    // 创建客户端连接
    return elasticsearch.NewTypedClient(c)
}
func NewRedisClient(cfg *conf.Data) *redis.Client {
    return redis.NewClient(&redis.Options{
        Addr:         cfg.Redis.Addr,
        WriteTimeout: cfg.Redis.WriteTimeout.AsDuration(),
        ReadTimeout:  cfg.Redis.ReadTimeout.AsDuration(),
    })
}

// ListReviewByStoreID 根据storeID 分页查询评价
func (r *reviewRepo) ListReviewByStoreID(ctx context.Context, storeID int64, offset, limit int) ([]*biz.MyReviewInfo, error) {
    // return r.getData1(ctx, storeID, offset, limit) // 第一版直接查ES
    return r.getData2(ctx, storeID, offset, limit) // 第二版增加缓存和singleflight

}

func (r *reviewRepo) getData1(ctx context.Context, storeID int64, offset, limit int) ([]*biz.MyReviewInfo, error) {
    // 去ES里面查询评价
    resp, err := r.data.es.Search().
        Index("review").
        From(offset).
        Size(limit).
        Query(&types.Query{
            Bool: &types.BoolQuery{
                Filter: []types.Query{
                    {
                        Term: map[string]types.TermQuery{
                            "store_id": {Value: storeID},
                        },
                    },
                },
            },
        }).
        Do(ctx)
    fmt.Printf("--> es search: %v %v\n", resp, err)
    if err != nil {
        return nil, err
    }
    fmt.Printf("es result total:%v\n", resp.Hits.Total.Value)
    // 反序列化数据
    // resp.Hits.Hits[0].Source_(json.RawMessage)  ==>  model.ReviewInfo
    list := make([]*biz.MyReviewInfo, 0, resp.Hits.Total.Value) // ?
    // list := make([]*model.ReviewInfo)                           // ?

    for _, hit := range resp.Hits.Hits {
        tmp := &biz.MyReviewInfo{}
        if err := json.Unmarshal(hit.Source_, tmp); err != nil {
            r.log.Errorf("json.Unmarshal(hit.Source_, tmp) failed, err:%v", err)
            continue
        }
        list = append(list, tmp)
    }

    return list, nil
}

var g singleflight.Group

// 升级版带缓存版本的查询函数
func (r *reviewRepo) getData2(ctx context.Context, storeID int64, offset, limit int) ([]*biz.MyReviewInfo, error) {
    // 取数据
    // 1. 先查询Redis缓存
    // 2. 缓存没有则查询ES
    // 3. 通过singleflight 合并短时间内大量的并发查询

    key := fmt.Sprintf("review:%d:%d:%d", storeID, offset, limit)
    b, err := r.getDataBySingleflight(ctx, key)
    if err != nil {
        return nil, err
    }
    hm := new(types.HitsMetadata)
    if err := json.Unmarshal(b, hm); err != nil {
        return nil, err
    }

    // 反序列化
    // 反序列化数据
    // resp.Hits.Hits[0].Source_(json.RawMessage)  ==>  model.ReviewInfo
    list := make([]*biz.MyReviewInfo, 0, hm.Total.Value) // ?
    // list := make([]*model.ReviewInfo)                           // ?

    for _, hit := range hm.Hits {
        tmp := &biz.MyReviewInfo{}
        if err := json.Unmarshal(hit.Source_, tmp); err != nil {
            r.log.Errorf("json.Unmarshal(hit.Source_, tmp) failed, err:%v", err)
            continue
        }
        list = append(list, tmp)
    }
    return list, nil
}

// key review:76089:1:10  --> "[{},{},{}]"
// json.Unmarshal([]byte)

func (r *reviewRepo) getDataBySingleflight(ctx context.Context, key string) ([]byte, error) {
    v, err, shared := g.Do(key, func() (interface{}, error) {
        // 查缓存
        data, err := r.getDataFromCache(ctx, key)
        r.log.Debugf("r.getDataFromCache(ctx, key) data:%s, err:%v\n", data, err)
        if err == nil {
            return data, nil
        }
        // 只有在缓存中没有这个key的错误时才查ES
        if errors.Is(err, redis.Nil) {
            // 缓存中没有这个key,说明缓存失效了,需要查ES
            data, err := r.getDataFromES(ctx, key)
            if err == nil {
                // 设置缓存
                return data, r.setCache(ctx, key, data)
            }
            return nil, err
        }
        // 查缓存失败了,直接返回错误,不继续向下传导压力
        return nil, err
    })
    r.log.Debugf("singleflight ret: v:%v err:%v shared:%v\n", v, err, shared)
    if err != nil {
        return nil, err
    }
    return v.([]byte), nil
}

// getDataFromCache 读缓存
func (r *reviewRepo) getDataFromCache(ctx context.Context, key string) ([]byte, error) {
    r.log.Debugf("getDataFromCache key:%v\n", key)
    return r.data.rdb.Get(ctx, key).Bytes()
}

// setCache 设置缓存
func (r *reviewRepo) setCache(ctx context.Context, key string, data []byte) error {
    return r.data.rdb.Set(ctx, key, data, time.Second*10).Err()
}

// getDataFromES 从ES查询
func (r *reviewRepo) getDataFromES(ctx context.Context, key string) ([]byte, error) {
    values := strings.Split(key, ":")
    if len(values) < 4 {
        return nil, errors.New("invalid key")
    }
    index, storeID, offsetStr, limitStr := values[0], values[1], values[2], values[3]

    offset, err := strconv.Atoi(offsetStr)
    if err != nil {
        return nil, err
    }
    limit, err := strconv.Atoi(limitStr)
    if err != nil {
        return nil, err
    }
    resp, err := r.data.es.Search().
        Index(index).
        From(offset).
        Size(limit).
        Query(&types.Query{
            Bool: &types.BoolQuery{
                Filter: []types.Query{
                    {
                        Term: map[string]types.TermQuery{
                            "store_id": {Value: storeID},
                        },
                    },
                },
            },
        }).
        Do(ctx)
    if err != nil {
        return nil, err
    }
    return json.Marshal(resp.Hits)
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容