之前在“服务过载时的一些思考”中说到过载相关的问题,最后有五个小问题:一是如何从源头避免当请求的服务过载时的反应,是重试还是如何?二是服务端在过载时如何优雅的拒绝后面的请求?三是过载时如何快速恢复正常服务?四是框架是否有可改进和优化的地方?第五个问题是服务如何判断当前服务是否过载?这里就这个问题分析下brpc中的相关实现。
因为这里brpc收发消息和业务处理都在一个进程中,包括之前的skynet也是,所以这里收到消息后,只能在后续逻辑中判断要不要处理,而不是比如单独的接入层进程收发消息在接入层处理负载问题。
简单的方法可以在收到消息时对每个请求加上当前时间,逻辑那边处理该请求消息时判断下时间,过久的话直接丢弃,包括后续的请求。如果消息队列长度无限制,也可以根据当前的消息数量来决定。但这里有个问题,有些场合是需要先解析消息等一些判断,这个也是花时间的,可能处理消息的时间远远小于解析的时间?还个就是应用场景的情况,在游戏中,大部分是写请求,如果丢掉当前的,可能会有些影响。包括早期不正确的设计,比如释放一个技能,开始和结束都要广播给周围的玩家,如果没有开始的消息只有结束的,可能处理就有问题,不过这些设计后期都已经修正(优化)。
这里说的是同时在处理的请求数,而非socket连接数。当同时处理的请求数达到服务的限制时,直接给客户端回brpc::ELIMIT错误,而不会调用服务回调。看到ELIMIT错误的client应重试另一个server。
在服务收到请求时会进行[server级]的并发请求数字段累加并判断:
396 if (!server_accessor.AddConcurrency(cntl.get())) {
397 cntl->SetFailed(
398 ELIMIT, "Reached server's max_concurrency=%d",
399 server->options().max_concurrency);
400 break;
401 }
402
403 if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
404 cntl->SetFailed(ELIMIT, "Too many user code to run when"
405 " -usercode_in_pthread is on");
406 break;
407 }
43 // Returns true if the `max_concurrency' limit is not reached.
44 bool AddConcurrency(Controller* c) {
45 if (_server->options().max_concurrency <= 0) {
46 return true;
47 }
48 c->add_flag(Controller::FLAGS_ADDED_CONCURRENCY);
49 return (butil::subtle::NoBarrier_AtomicIncrement(&_server->_concurrency, 1)
50 <= _server->options().max_concurrency);
51 }
接着对[method级]进行判断是否达到并发数上限:
440 method_status = mp->status;
441 if (method_status) {
442 int rejected_cc = 0;
443 if (!method_status->OnRequested(&rejected_cc)) {
444 cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
445 mp->method->full_name().c_str(), rejected_cc);
446 break;
447 }
448 }
93 inline bool MethodStatus::OnRequested(int* rejected_cc) {
94 const int cc = _nconcurrency.fetch_add(1, butil::memory_order_relaxed) + 1;
95 if (NULL == _cl || _cl->OnRequested(cc)) {
96 return true;
97 }
98 if (rejected_cc) {
99 *rejected_cc = cc;
100 }
101 return false;
102 }
90 bool AutoConcurrencyLimiter::OnRequested(int current_concurrency) {
91 return current_concurrency <= _max_concurrency;
92 }
这个类AutoConcurrencyLimiter后面分析。以上是请求到达时,先进行server层的限流,然后再进行具体的method层的限流。收到请求时_nconcurrency增加一,返回响应时减一,当返回响应时由concurrency_remover析构时进行减操作:
152 ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
153 ConcurrencyRemover::~ConcurrencyRemover() {
154 if (_status) {
155 _status->OnResponded(_c->ErrorCode(), butil::cpuwide_time_us() - _received_us);
156 _status = NULL;
157 }
158 ServerPrivateAccessor(_c->server()).RemoveConcurrency(_c);
159 }
104 inline void MethodStatus::OnResponded(int error_code, int64_t latency) {
105 _nconcurrency.fetch_sub(1, butil::memory_order_relaxed);
106 if (0 == error_code) {
107 _latency_rec << latency;
108 } else {
109 _nerror_bvar << 1;
110 }
111 if (NULL != _cl) {
112 _cl->OnResponded(error_code, latency);
113 }
114 }
53 void RemoveConcurrency(const Controller* c) {
54 if (c->has_flag(Controller::FLAGS_ADDED_CONCURRENCY)) {
55 butil::subtle::NoBarrier_AtomicIncrement(&_server->_concurrency, -1);
56 }
57 }
以上实现简单的判断是否超过并发数限制,而上面并没有给出如何达到限制数,只是简单的判断current_concurrency <= _max_concurrency。
对于一个请求,会设置它的达到时间,处理完后算出latency并更新AutoConcurrencyLimiter实例的相关数据,前者继承自ConcurrencyLimiter类,有两个virtual接口OnRequested和OnResponded用于在请求到来时判断是否要处理和返回响应时再设置:
32 // This method should be called each time a request comes in. It returns
33 // false when the concurrency reaches the upper limit, otherwise it
34 // returns true. Normally, when OnRequested returns false, you should
35 // return an ELIMIT error directly.
36 virtual bool OnRequested(int current_concurrency) = 0;
37
38 // Each request should call this method before responding.
39 // `error_code' : Error code obtained from the controller, 0 means success.
40 // `latency' : Microseconds taken by RPC.
41 // NOTE: Even if OnRequested returns false, after sending ELIMIT, you
42 // still need to call OnResponded.
43 virtual void OnResponded(int error_code, int64_t latency_us) = 0;
94 void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
95 if (0 == error_code) {
96 _total_succ_req.fetch_add(1, butil::memory_order_relaxed);
97 } else if (ELIMIT == error_code) {
98 return;
99 }
100
101 const int64_t now_time_us = butil::gettimeofday_us();
102 int64_t last_sampling_time_us =
103 _last_sampling_time_us.load(butil::memory_order_relaxed);
104
105 if (last_sampling_time_us == 0 ||
106 now_time_us - last_sampling_time_us >=
107 FLAGS_auto_cl_sampling_interval_ms * 1000) {
108 bool sample_this_call = _last_sampling_time_us.compare_exchange_strong(
109 last_sampling_time_us, now_time_us, butil::memory_order_relaxed);
110 if (sample_this_call) {
111 bool sample_window_submitted = AddSample(error_code, latency_us,
112 now_time_us);
113 if (sample_window_submitted) {
114 //more code...
123 }
124 }
125 }
这里根据处理的返回值来更新采样数据,对返回值ELIMIT的情况不统计,其中auto_cl_sampling_interval_ms是采样间隔,默认是0.1毫秒。如果该间隔内处理成功的请求越多,则_total_succ_req越大,当超过采样间隔时,则更新AddSample:
138 bool AutoConcurrencyLimiter::AddSample(int error_code,
139 int64_t latency_us,
140 int64_t sampling_time_us) {
141 std::unique_lock<butil::Mutex> lock_guard(_sw_mutex);
142 if (_reset_latency_us != 0) {
143 // min_latency is about to be reset soon.
144 if (_reset_latency_us > sampling_time_us) {
145 // ignoring samples during waiting for the deadline.
146 return false;
147 }
148 // Remeasure min_latency when concurrency has dropped to low load
149 _min_latency_us = -1;
150 _reset_latency_us = 0;
151 _remeasure_start_us = NextResetTime(sampling_time_us);
152 ResetSampleWindow(sampling_time_us);
153 }
154
155 if (_sw.start_time_us == 0) {
156 _sw.start_time_us = sampling_time_us;
157 }
158
159 if (error_code != 0 && FLAGS_auto_cl_enable_error_punish) {
160 ++_sw.failed_count;
161 _sw.total_failed_us += latency_us;
162 } else if (error_code == 0) {
163 ++_sw.succ_count;
164 _sw.total_succ_us += latency_us;
165 }
167 if (_sw.succ_count + _sw.failed_count < FLAGS_auto_cl_min_sample_count) {
168 if (sampling_time_us - _sw.start_time_us >=
169 FLAGS_auto_cl_sample_window_size_ms * 1000) {
170 // If the sample size is insufficient at the end of the sampling
171 // window, discard the entire sampling window
172 ResetSampleWindow(sampling_time_us);
173 }
174 return false;
175 }
176 if (sampling_time_us - _sw.start_time_us <
177 FLAGS_auto_cl_sample_window_size_ms * 1000 &&
178 _sw.succ_count + _sw.failed_count < FLAGS_auto_cl_max_sample_count) {
179 return false;
180 }
181
182 if(_sw.succ_count > 0) {
183 UpdateMaxConcurrency(sampling_time_us);
184 } else {
185 // All request failed
186 _max_concurrency /= 2;
187 }
188 ResetSampleWindow(sampling_time_us);
189 return true;
190 }
其中_sw为采样窗口数据统计,每一个auto_cl_sample_window_size_ms周期内,对于处理失败的情况且有惩罚值则会统计总失败个数和total_failed_us和。auto_cl_sample_window_size_ms为一次采样的周期时间。总请求个数小于auto_cl_min_sample_count时,如果超过采样周期则重置_sw,否则继续统计,之后要么超过采样周期,要么达到auto_cl_max_sample_count最大请求个数,窗口中积累足够多的样本数据,那么本次采样结束,此时会更新_max_concurrency,对于所有请求都失败的情况直接把_max_concurrency降为一半。否则会UpdateMaxConcurrency:
219 void AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
220 int32_t total_succ_req = _total_succ_req.load(butil::memory_order_relaxed);
221 double failed_punish = _sw.total_failed_us * FLAGS_auto_cl_fail_punish_ratio;
222 int64_t avg_latency =
223 std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count);
224 double qps = 1000000.0 * total_succ_req / (sampling_time_us - _sw.start_time_us);
225 UpdateMinLatency(avg_latency);
226 UpdateQps(qps);
227
228 int next_max_concurrency = 0;
229 // Remeasure min_latency at regular intervals
230 if (_remeasure_start_us <= sampling_time_us) {
231 const double reduce_ratio = FLAGS_auto_cl_reduce_ratio_while_remeasure;
232 _reset_latency_us = sampling_time_us + avg_latency * 2;
233 next_max_concurrency =
234 std::ceil(_ema_max_qps * _min_latency_us / 1000000 * reduce_ratio);
235 } else {
236 const double change_step = FLAGS_auto_cl_change_rate_of_explore_ratio;
237 const double max_explore_ratio = FLAGS_auto_cl_max_explore_ratio;
238 const double min_explore_ratio = FLAGS_auto_cl_min_explore_ratio;
239 const double correction_factor = FLAGS_auto_cl_latency_fluctuation_correction_factor;
240 if (avg_latency <= _min_latency_us * (1.0 + min_explore_ratio * correction_factor) ||
241 qps <= _ema_max_qps / (1.0 + min_explore_ratio)) {
242 _explore_ratio = std::min(max_explore_ratio, _explore_ratio + change_step);
243 } else {
244 _explore_ratio = std::max(min_explore_ratio, _explore_ratio - change_step);
245 }
246 next_max_concurrency =
247 _min_latency_us * _ema_max_qps / 1000000 * (1 + _explore_ratio);
248 }
249
250 if (next_max_concurrency != _max_concurrency) {
251 _max_concurrency = next_max_concurrency;
252 }
253 }
201 void AutoConcurrencyLimiter::UpdateMinLatency(int64_t latency_us) {
202 const double ema_factor = FLAGS_auto_cl_alpha_factor_for_ema;
203 if (_min_latency_us <= 0) {
204 _min_latency_us = latency_us;
205 } else if (latency_us < _min_latency_us) {
206 _min_latency_us = latency_us * ema_factor + _min_latency_us * (1 - ema_factor);
207 }
208 }
210 void AutoConcurrencyLimiter::UpdateQps(double qps) {
211 const double ema_factor = FLAGS_auto_cl_alpha_factor_for_ema / 10;
212 if (qps >= _ema_max_qps) {
213 _ema_max_qps = qps;
214 } else {
215 _ema_max_qps = qps * ema_factor + _ema_max_qps * (1 - ema_factor);
216 }
217 }
avg_latency为该采样窗口内,处理失败请求所花时间总和,乘以惩罚系数的时间,加上处理成功请求所花的总时间,除以成功请求总个数。
在UpdateMinLatency中,为了减少个别窗口的抖动对限流算法的影响,同时尽量降低计算开销,计算min_latency时会通过使用EMA来进行平滑处理,同理UpdateQps。
引用:自适应限流
假如从启动到打满qps的时间过长,这期间会损失大量流量。在这里我们采取的措施有两个:
1采样方面,一旦采到的请求数量足够多,直接提交当前采样窗口,而不是等待采样窗口的到时间了才提交
2计算公式方面,当current_qps > 保存的max_qps时,直接进行更新,不进行平滑处理
在进行了这两个处理之后,绝大部分情况下都能够在2秒左右将qps打满。
_max_concurrency的有效时间由_reset_latency_us决定,即以当前时间延后avg_latency的两倍。在这段时间内的样本数据不采集直到超时重新计算_min_latency_us和_max_concurrency。_max_concurrency会在每个采样窗口周期内进行更新,_min_latency_us和_ema_max_qps进行平滑处理。若_remeasure_start_us超时则:
233 next_max_concurrency = std::ceil(_ema_max_qps * _min_latency_us / 1000000 * reduce_ratio);
否则可能要以一定的比例系数增大或减小_max_concurrency,根据avg_latency和qps:
240 if (avg_latency <= _min_latency_us * (1.0 + min_explore_ratio * correction_factor) ||
241 qps <= _ema_max_qps / (1.0 + min_explore_ratio)) {
242 _explore_ratio = std::min(max_explore_ratio, _explore_ratio + change_step);
243 } else {
244 _explore_ratio = std::max(min_explore_ratio, _explore_ratio - change_step);
245 }
246 next_max_concurrency =
247 _min_latency_us * _ema_max_qps / 1000000 * (1 + _explore_ratio);
当latency与min_latency很接近时,根据计算公式会得到一个较高max_concurrency来适应concurrency的波动,从而尽可能的减少“误杀”。同时,随着latency的升高,max_concurrency会逐渐降低,以保护服务不会过载。
max_concurrency = max_qps * ((2+alpha) * min_latency - latency)
这块代码其实很好明白,但需要涉及过多的理论基础才能充分理解为何这么设计,以及各比例参数的确定。