添加redis与singleflight,并使用es查询
首先添加redis配置项与es地址。由于之前的conf/conf.proto
已经有了redis,此时只需补充es配置即可。
message Bootstrap {
Server server = 1;
Data data = 2;
Snowflake snowflake = 3;
Elasticsearch elasticsearch = 4;
}
message Elasticsearch {
repeated string addresses = 1;
}
之后生成配置,修改config.yaml
elasticsearch:
addresses:
- "http://127.0.0.1:9200"
接着以 ListReviewByStoreID
为例编写流程。
首先添加service
层
func (s *ReviewService) ListReviewByStoreID(ctx context.Context, req *pb.ListReviewByStoreIDRequest) (*pb.ListReviewByStoreIDReply, error) {
fmt.Printf("[service] ListReviewByStoreID req:%#v\n", req)
reviewList, err := s.uc.ListReviewByStoreID(ctx, req.StoreID, int(req.Page), int(req.Size))
if err != nil {
return nil, err
}
// format
list := make([]*pb.ReviewInfo, 0, len(reviewList))
for _, r := range reviewList {
list = append(list, &pb.ReviewInfo{
ReviewID: r.ReviewID,
UserID: r.UserID,
OrderID: r.OrderID,
Score: r.Score,
ServiceScore: r.ServiceScore,
ExpressScore: r.ExpressScore,
Content: r.Content,
PicInfo: r.PicInfo,
VideoInfo: r.VideoInfo,
Status: r.Status,
})
}
return &pb.ListReviewByStoreIDReply{List: list}, nil
}
再添加biz
层
type ReviewRepo interface {
ListReviewByStoreID(ctx context.Context, storeID int64, offset, limit int) ([]*MyReviewInfo, error)
}
// ListReviewByStoreID 根据storeID分页查询评价
func (uc ReviewUsecase) ListReviewByStoreID(ctx context.Context, storeID int64, page, size int) ([]*MyReviewInfo, error) {
if page <= 0 {
page = 1
}
if size <= 0 || size > 50 {
size = 10
}
offset := (page - 1) * size
limit := size
uc.log.WithContext(ctx).Debugf("[biz] ListReviewByStoreID storeID:%v", storeID)
return uc.repo.ListReviewByStoreID(ctx, storeID, offset, limit)
}
type MyReviewInfo struct {
*model.ReviewInfo
CreateAt MyTime `json:"create_at"` // 创建时间
UpdateAt MyTime `json:"update_at"` // 创建时间
Anonymous int32 `json:"anonymous,string"`
Score int32 `json:"score,string"`
ServiceScore int32 `json:"service_score,string"`
ExpressScore int32 `json:"express_score,string"`
HasMedia int32 `json:"has_media,string"`
Status int32 `json:"status,string"`
IsDefault int32 `json:"is_default,string"`
HasReply int32 `json:"has_reply,string"`
ID int64 `json:"id,string"`
Version int32 `json:"version,string"`
ReviewID int64 `json:"review_id,string"`
OrderID int64 `json:"order_id,string"`
SkuID int64 `json:"sku_id,string"`
SpuID int64 `json:"spu_id,string"`
StoreID int64 `json:"store_id,string"`
UserID int64 `json:"user_id,string"`
}
type MyTime time.Time
// UnmarshalJSON json.Unmarshal 的时候会自动调用这个方法
func (t *MyTime) UnmarshalJSON(data []byte) error {
// data = "\"2023-12-17 14:20:18\""
s := strings.Trim(string(data), `"`)
tmp, err := time.Parse(time.DateTime, s)
if err != nil {
return err
}
*t = MyTime(tmp)
return nil
}
在代码中可以看到设计了一个MyReviewInfo
一个新结构体,该结构体中自定义了type MyTime time.Time
,和int
类型,这是因为在es
中获得的信息都是string
类型需要将其转换。
最后编写data
层,首先是/data/data.go
需要定义redis,es
客户端的连接,方便添加缓存和查找。然后编写/data/review.go
添加逻辑。
type Data struct {
// db *gorm.DB
query *query.Query
log *log.Helper
es *elasticsearch.TypedClient // github.com/elastic/go-elasticsearch/v8
rdb *redis.Client
}
// NewESClient ES Client 的构造函数
func NewESClient(cfg *conf.Elasticsearch) (*elasticsearch.TypedClient, error) {
// ES 配置
c := elasticsearch.Config{
Addresses: cfg.GetAddresses(),
}
// 创建客户端连接
return elasticsearch.NewTypedClient(c)
}
func NewRedisClient(cfg *conf.Data) *redis.Client {
return redis.NewClient(&redis.Options{
Addr: cfg.Redis.Addr,
WriteTimeout: cfg.Redis.WriteTimeout.AsDuration(),
ReadTimeout: cfg.Redis.ReadTimeout.AsDuration(),
})
}
// ListReviewByStoreID 根据storeID 分页查询评价
func (r *reviewRepo) ListReviewByStoreID(ctx context.Context, storeID int64, offset, limit int) ([]*biz.MyReviewInfo, error) {
// return r.getData1(ctx, storeID, offset, limit) // 第一版直接查ES
return r.getData2(ctx, storeID, offset, limit) // 第二版增加缓存和singleflight
}
func (r *reviewRepo) getData1(ctx context.Context, storeID int64, offset, limit int) ([]*biz.MyReviewInfo, error) {
// 去ES里面查询评价
resp, err := r.data.es.Search().
Index("review").
From(offset).
Size(limit).
Query(&types.Query{
Bool: &types.BoolQuery{
Filter: []types.Query{
{
Term: map[string]types.TermQuery{
"store_id": {Value: storeID},
},
},
},
},
}).
Do(ctx)
fmt.Printf("--> es search: %v %v\n", resp, err)
if err != nil {
return nil, err
}
fmt.Printf("es result total:%v\n", resp.Hits.Total.Value)
// 反序列化数据
// resp.Hits.Hits[0].Source_(json.RawMessage) ==> model.ReviewInfo
list := make([]*biz.MyReviewInfo, 0, resp.Hits.Total.Value) // ?
// list := make([]*model.ReviewInfo) // ?
for _, hit := range resp.Hits.Hits {
tmp := &biz.MyReviewInfo{}
if err := json.Unmarshal(hit.Source_, tmp); err != nil {
r.log.Errorf("json.Unmarshal(hit.Source_, tmp) failed, err:%v", err)
continue
}
list = append(list, tmp)
}
return list, nil
}
var g singleflight.Group
// 升级版带缓存版本的查询函数
func (r *reviewRepo) getData2(ctx context.Context, storeID int64, offset, limit int) ([]*biz.MyReviewInfo, error) {
// 取数据
// 1. 先查询Redis缓存
// 2. 缓存没有则查询ES
// 3. 通过singleflight 合并短时间内大量的并发查询
key := fmt.Sprintf("review:%d:%d:%d", storeID, offset, limit)
b, err := r.getDataBySingleflight(ctx, key)
if err != nil {
return nil, err
}
hm := new(types.HitsMetadata)
if err := json.Unmarshal(b, hm); err != nil {
return nil, err
}
// 反序列化
// 反序列化数据
// resp.Hits.Hits[0].Source_(json.RawMessage) ==> model.ReviewInfo
list := make([]*biz.MyReviewInfo, 0, hm.Total.Value) // ?
// list := make([]*model.ReviewInfo) // ?
for _, hit := range hm.Hits {
tmp := &biz.MyReviewInfo{}
if err := json.Unmarshal(hit.Source_, tmp); err != nil {
r.log.Errorf("json.Unmarshal(hit.Source_, tmp) failed, err:%v", err)
continue
}
list = append(list, tmp)
}
return list, nil
}
// key review:76089:1:10 --> "[{},{},{}]"
// json.Unmarshal([]byte)
func (r *reviewRepo) getDataBySingleflight(ctx context.Context, key string) ([]byte, error) {
v, err, shared := g.Do(key, func() (interface{}, error) {
// 查缓存
data, err := r.getDataFromCache(ctx, key)
r.log.Debugf("r.getDataFromCache(ctx, key) data:%s, err:%v\n", data, err)
if err == nil {
return data, nil
}
// 只有在缓存中没有这个key的错误时才查ES
if errors.Is(err, redis.Nil) {
// 缓存中没有这个key,说明缓存失效了,需要查ES
data, err := r.getDataFromES(ctx, key)
if err == nil {
// 设置缓存
return data, r.setCache(ctx, key, data)
}
return nil, err
}
// 查缓存失败了,直接返回错误,不继续向下传导压力
return nil, err
})
r.log.Debugf("singleflight ret: v:%v err:%v shared:%v\n", v, err, shared)
if err != nil {
return nil, err
}
return v.([]byte), nil
}
// getDataFromCache 读缓存
func (r *reviewRepo) getDataFromCache(ctx context.Context, key string) ([]byte, error) {
r.log.Debugf("getDataFromCache key:%v\n", key)
return r.data.rdb.Get(ctx, key).Bytes()
}
// setCache 设置缓存
func (r *reviewRepo) setCache(ctx context.Context, key string, data []byte) error {
return r.data.rdb.Set(ctx, key, data, time.Second*10).Err()
}
// getDataFromES 从ES查询
func (r *reviewRepo) getDataFromES(ctx context.Context, key string) ([]byte, error) {
values := strings.Split(key, ":")
if len(values) < 4 {
return nil, errors.New("invalid key")
}
index, storeID, offsetStr, limitStr := values[0], values[1], values[2], values[3]
offset, err := strconv.Atoi(offsetStr)
if err != nil {
return nil, err
}
limit, err := strconv.Atoi(limitStr)
if err != nil {
return nil, err
}
resp, err := r.data.es.Search().
Index(index).
From(offset).
Size(limit).
Query(&types.Query{
Bool: &types.BoolQuery{
Filter: []types.Query{
{
Term: map[string]types.TermQuery{
"store_id": {Value: storeID},
},
},
},
},
}).
Do(ctx)
if err != nil {
return nil, err
}
return json.Marshal(resp.Hits)
}