brpc之自适应限流分析

之前在“服务过载时的一些思考”中说到过载相关的问题,最后有五个小问题:一是如何从源头避免当请求的服务过载时的反应,是重试还是如何?二是服务端在过载时如何优雅的拒绝后面的请求?三是过载时如何快速恢复正常服务?四是框架是否有可改进和优化的地方?第五个问题是服务如何判断当前服务是否过载?这里就这个问题分析下brpc中的相关实现。

因为这里brpc收发消息和业务处理都在一个进程中,包括之前的skynet也是,所以这里收到消息后,只能在后续逻辑中判断要不要处理,而不是比如单独的接入层进程收发消息在接入层处理负载问题。

简单的方法可以在收到消息时对每个请求加上当前时间,逻辑那边处理该请求消息时判断下时间,过久的话直接丢弃,包括后续的请求。如果消息队列长度无限制,也可以根据当前的消息数量来决定。但这里有个问题,有些场合是需要先解析消息等一些判断,这个也是花时间的,可能处理消息的时间远远小于解析的时间?还个就是应用场景的情况,在游戏中,大部分是写请求,如果丢掉当前的,可能会有些影响。包括早期不正确的设计,比如释放一个技能,开始和结束都要广播给周围的玩家,如果没有开始的消息只有结束的,可能处理就有问题,不过这些设计后期都已经修正(优化)。

这里说的是同时在处理的请求数,而非socket连接数。当同时处理的请求数达到服务的限制时,直接给客户端回brpc::ELIMIT错误,而不会调用服务回调。看到ELIMIT错误的client应重试另一个server。

在服务收到请求时会进行[server级]的并发请求数字段累加并判断:

396         if (!server_accessor.AddConcurrency(cntl.get())) {
397             cntl->SetFailed(
398                 ELIMIT, "Reached server's max_concurrency=%d",
399                 server->options().max_concurrency);
400             break;
401         }
402 
403         if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
404             cntl->SetFailed(ELIMIT, "Too many user code to run when"
405                             " -usercode_in_pthread is on");
406             break;
407         }

 43     // Returns true if the `max_concurrency' limit is not reached.
 44     bool AddConcurrency(Controller* c) {
 45         if (_server->options().max_concurrency <= 0) {
 46             return true;
 47         }
 48         c->add_flag(Controller::FLAGS_ADDED_CONCURRENCY);
 49         return (butil::subtle::NoBarrier_AtomicIncrement(&_server->_concurrency, 1)
 50                 <= _server->options().max_concurrency);
 51     }

接着对[method级]进行判断是否达到并发数上限:

440         method_status = mp->status;
441         if (method_status) {
442             int rejected_cc = 0;
443             if (!method_status->OnRequested(&rejected_cc)) {
444                 cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
445                                 mp->method->full_name().c_str(), rejected_cc);
446                 break;
447             }
448         }

 93 inline bool MethodStatus::OnRequested(int* rejected_cc) {
 94     const int cc = _nconcurrency.fetch_add(1, butil::memory_order_relaxed) + 1;
 95     if (NULL == _cl || _cl->OnRequested(cc)) {
 96         return true;
 97     } 
 98     if (rejected_cc) {   
 99         *rejected_cc = cc;
100     }
101     return false;
102 }

 90 bool AutoConcurrencyLimiter::OnRequested(int current_concurrency) {
 91     return current_concurrency <= _max_concurrency;
 92 }

这个类AutoConcurrencyLimiter后面分析。以上是请求到达时,先进行server层的限流,然后再进行具体的method层的限流。收到请求时_nconcurrency增加一,返回响应时减一,当返回响应时由concurrency_remover析构时进行减操作:

152     ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);

153 ConcurrencyRemover::~ConcurrencyRemover() {
154     if (_status) {
155         _status->OnResponded(_c->ErrorCode(), butil::cpuwide_time_us() - _received_us);
156         _status = NULL;
157     }   
158     ServerPrivateAccessor(_c->server()).RemoveConcurrency(_c);
159 }

104 inline void MethodStatus::OnResponded(int error_code, int64_t latency) {
105     _nconcurrency.fetch_sub(1, butil::memory_order_relaxed);
106     if (0 == error_code) {
107         _latency_rec << latency;
108     } else {
109         _nerror_bvar << 1;
110     }
111     if (NULL != _cl) {
112         _cl->OnResponded(error_code, latency);
113     }   
114 }

 53     void RemoveConcurrency(const Controller* c) {
 54         if (c->has_flag(Controller::FLAGS_ADDED_CONCURRENCY)) {
 55             butil::subtle::NoBarrier_AtomicIncrement(&_server->_concurrency, -1);
 56         }
 57     }

以上实现简单的判断是否超过并发数限制,而上面并没有给出如何达到限制数,只是简单的判断current_concurrency <= _max_concurrency。

对于一个请求,会设置它的达到时间,处理完后算出latency并更新AutoConcurrencyLimiter实例的相关数据,前者继承自ConcurrencyLimiter类,有两个virtual接口OnRequested和OnResponded用于在请求到来时判断是否要处理和返回响应时再设置:

 32     // This method should be called each time a request comes in. It returns
 33     // false when the concurrency reaches the upper limit, otherwise it 
 34     // returns true. Normally, when OnRequested returns false, you should 
 35     // return an ELIMIT error directly.
 36     virtual bool OnRequested(int current_concurrency) = 0;
 37 
 38     // Each request should call this method before responding.
 39     // `error_code' : Error code obtained from the controller, 0 means success.
 40     // `latency' : Microseconds taken by RPC.
 41     // NOTE: Even if OnRequested returns false, after sending ELIMIT, you 
 42     // still need to call OnResponded.
 43     virtual void OnResponded(int error_code, int64_t latency_us) = 0;
 94 void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
 95     if (0 == error_code) {
 96         _total_succ_req.fetch_add(1, butil::memory_order_relaxed);
 97     } else if (ELIMIT == error_code) {
 98         return;
 99     }
100 
101     const int64_t now_time_us = butil::gettimeofday_us();
102     int64_t last_sampling_time_us =
103         _last_sampling_time_us.load(butil::memory_order_relaxed);
104 
105     if (last_sampling_time_us == 0 ||
106         now_time_us - last_sampling_time_us >=
107             FLAGS_auto_cl_sampling_interval_ms * 1000) {
108         bool sample_this_call = _last_sampling_time_us.compare_exchange_strong(
109             last_sampling_time_us, now_time_us, butil::memory_order_relaxed);
110         if (sample_this_call) {
111             bool sample_window_submitted = AddSample(error_code, latency_us,
112                                                      now_time_us);
113             if (sample_window_submitted) {
114                 //more code...
123         }
124     }
125 }

这里根据处理的返回值来更新采样数据,对返回值ELIMIT的情况不统计,其中auto_cl_sampling_interval_ms是采样间隔,默认是0.1毫秒。如果该间隔内处理成功的请求越多,则_total_succ_req越大,当超过采样间隔时,则更新AddSample:

138 bool AutoConcurrencyLimiter::AddSample(int error_code,
139                                        int64_t latency_us,
140                                        int64_t sampling_time_us) {
141     std::unique_lock<butil::Mutex> lock_guard(_sw_mutex);
142     if (_reset_latency_us != 0) {
143         // min_latency is about to be reset soon.
144         if (_reset_latency_us > sampling_time_us) {
145             // ignoring samples during waiting for the deadline.
146             return false;
147         }
148         // Remeasure min_latency when concurrency has dropped to low load
149         _min_latency_us = -1;
150         _reset_latency_us = 0;
151         _remeasure_start_us = NextResetTime(sampling_time_us);
152         ResetSampleWindow(sampling_time_us);
153     }
154 
155     if (_sw.start_time_us == 0) {
156         _sw.start_time_us = sampling_time_us;
157     }
158 
159     if (error_code != 0 && FLAGS_auto_cl_enable_error_punish) {
160         ++_sw.failed_count;
161         _sw.total_failed_us += latency_us;
162     } else if (error_code == 0) {
163         ++_sw.succ_count;
164         _sw.total_succ_us += latency_us;
165     }
167     if (_sw.succ_count + _sw.failed_count < FLAGS_auto_cl_min_sample_count) {
168         if (sampling_time_us - _sw.start_time_us >=
169             FLAGS_auto_cl_sample_window_size_ms * 1000) {
170             // If the sample size is insufficient at the end of the sampling 
171             // window, discard the entire sampling window
172             ResetSampleWindow(sampling_time_us);
173         }
174         return false;
175     }
176     if (sampling_time_us - _sw.start_time_us <
177         FLAGS_auto_cl_sample_window_size_ms * 1000 &&
178         _sw.succ_count + _sw.failed_count < FLAGS_auto_cl_max_sample_count) {
179         return false;
180     }
181 
182     if(_sw.succ_count > 0) {
183         UpdateMaxConcurrency(sampling_time_us);
184     } else {
185         // All request failed
186         _max_concurrency /= 2;
187     }
188     ResetSampleWindow(sampling_time_us);
189     return true;
190 }

其中_sw为采样窗口数据统计,每一个auto_cl_sample_window_size_ms周期内,对于处理失败的情况且有惩罚值则会统计总失败个数和total_failed_us和。auto_cl_sample_window_size_ms为一次采样的周期时间。总请求个数小于auto_cl_min_sample_count时,如果超过采样周期则重置_sw,否则继续统计,之后要么超过采样周期,要么达到auto_cl_max_sample_count最大请求个数,窗口中积累足够多的样本数据,那么本次采样结束,此时会更新_max_concurrency,对于所有请求都失败的情况直接把_max_concurrency降为一半。否则会UpdateMaxConcurrency:

219 void AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
220     int32_t total_succ_req = _total_succ_req.load(butil::memory_order_relaxed);
221     double failed_punish = _sw.total_failed_us * FLAGS_auto_cl_fail_punish_ratio;
222     int64_t avg_latency =
223         std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count);
224     double qps = 1000000.0 * total_succ_req / (sampling_time_us - _sw.start_time_us);
225     UpdateMinLatency(avg_latency);
226     UpdateQps(qps);
227 
228     int next_max_concurrency = 0;
229     // Remeasure min_latency at regular intervals
230     if (_remeasure_start_us <= sampling_time_us) {
231         const double reduce_ratio = FLAGS_auto_cl_reduce_ratio_while_remeasure;
232         _reset_latency_us = sampling_time_us + avg_latency * 2;
233         next_max_concurrency =
234             std::ceil(_ema_max_qps * _min_latency_us / 1000000 * reduce_ratio);
235     } else {
236         const double change_step = FLAGS_auto_cl_change_rate_of_explore_ratio;
237         const double max_explore_ratio = FLAGS_auto_cl_max_explore_ratio;
238         const double min_explore_ratio = FLAGS_auto_cl_min_explore_ratio;
239         const double correction_factor = FLAGS_auto_cl_latency_fluctuation_correction_factor;
240         if (avg_latency <= _min_latency_us * (1.0 + min_explore_ratio * correction_factor) ||
241             qps <= _ema_max_qps / (1.0 + min_explore_ratio)) {
242             _explore_ratio  = std::min(max_explore_ratio, _explore_ratio + change_step);
243         } else {
244             _explore_ratio = std::max(min_explore_ratio, _explore_ratio - change_step);
245         }
246         next_max_concurrency =
247             _min_latency_us * _ema_max_qps / 1000000 *  (1 + _explore_ratio);
248     }
249 
250     if (next_max_concurrency != _max_concurrency) {
251         _max_concurrency = next_max_concurrency;
252     }
253 }

201 void AutoConcurrencyLimiter::UpdateMinLatency(int64_t latency_us) {
202     const double ema_factor = FLAGS_auto_cl_alpha_factor_for_ema;
203     if (_min_latency_us <= 0) {
204         _min_latency_us = latency_us;
205     } else if (latency_us < _min_latency_us) {
206         _min_latency_us = latency_us * ema_factor + _min_latency_us * (1 - ema_factor);
207     }
208 }

210 void AutoConcurrencyLimiter::UpdateQps(double qps) {
211     const double ema_factor = FLAGS_auto_cl_alpha_factor_for_ema / 10;
212     if (qps >= _ema_max_qps) {
213         _ema_max_qps = qps;
214     } else {
215         _ema_max_qps = qps * ema_factor + _ema_max_qps * (1 - ema_factor);
216     }
217 }

avg_latency为该采样窗口内,处理失败请求所花时间总和,乘以惩罚系数的时间,加上处理成功请求所花的总时间,除以成功请求总个数。
在UpdateMinLatency中,为了减少个别窗口的抖动对限流算法的影响,同时尽量降低计算开销,计算min_latency时会通过使用EMA来进行平滑处理,同理UpdateQps。

引用:自适应限流
假如从启动到打满qps的时间过长,这期间会损失大量流量。在这里我们采取的措施有两个:
1采样方面,一旦采到的请求数量足够多,直接提交当前采样窗口,而不是等待采样窗口的到时间了才提交
2计算公式方面,当current_qps > 保存的max_qps时,直接进行更新,不进行平滑处理
在进行了这两个处理之后,绝大部分情况下都能够在2秒左右将qps打满。

_max_concurrency的有效时间由_reset_latency_us决定,即以当前时间延后avg_latency的两倍。在这段时间内的样本数据不采集直到超时重新计算_min_latency_us和_max_concurrency。_max_concurrency会在每个采样窗口周期内进行更新,_min_latency_us和_ema_max_qps进行平滑处理。若_remeasure_start_us超时则:

233 next_max_concurrency = std::ceil(_ema_max_qps * _min_latency_us / 1000000 * reduce_ratio);

否则可能要以一定的比例系数增大或减小_max_concurrency,根据avg_latency和qps:

240         if (avg_latency <= _min_latency_us * (1.0 + min_explore_ratio * correction_factor) ||
241             qps <= _ema_max_qps / (1.0 + min_explore_ratio)) {
242             _explore_ratio  = std::min(max_explore_ratio, _explore_ratio + change_step);
243         } else {
244             _explore_ratio = std::max(min_explore_ratio, _explore_ratio - change_step);
245         }
246         next_max_concurrency =
247             _min_latency_us * _ema_max_qps / 1000000 *  (1 + _explore_ratio);

当latency与min_latency很接近时,根据计算公式会得到一个较高max_concurrency来适应concurrency的波动,从而尽可能的减少“误杀”。同时,随着latency的升高,max_concurrency会逐渐降低,以保护服务不会过载。

max_concurrency = max_qps * ((2+alpha) * min_latency - latency)

这块代码其实很好明白,但需要涉及过多的理论基础才能充分理解为何这么设计,以及各比例参数的确定。

自适应限流
限制最大并发

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,245评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,749评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,960评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,575评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,668评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,670评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,664评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,422评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,864评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,178评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,340评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,015评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,646评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,265评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,494评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,261评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,206评论 2 352

推荐阅读更多精彩内容