1、注册并发设置
GlobalInitializeOrDieImpl中
注册名称和对于的并发限制类。
// Concurrency Limiters
ConcurrencyLimiterExtension()->RegisterOrDie("auto", &g_ext->auto_cl);
ConcurrencyLimiterExtension()->RegisterOrDie("constant", &g_ext->constant_cl);
2、设置并发使用的类
StartInternal中
for (MethodMap::iterator it = _method_map.begin();
it != _method_map.end(); ++it) {
if (it->second.is_builtin_service) {
it->second.status->SetConcurrencyLimiter(NULL);
} else {
const AdaptiveMaxConcurrency* amc = &it->second.max_concurrency;
if (amc->type() == AdaptiveMaxConcurrency::UNLIMITED()) {
amc = &_options.method_max_concurrency;
}
ConcurrencyLimiter* cl = NULL;
if (!CreateConcurrencyLimiter(*amc, &cl)) { 【1】获取并发限制类
LOG(ERROR) << "Fail to create ConcurrencyLimiter for method";
return -1;
}
it->second.status->SetConcurrencyLimiter(cl); 【2】设置并发限制类
}
}
CreateConcurrencyLimiter实现:
static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
ConcurrencyLimiter** out) {
if (amc.type() == AdaptiveMaxConcurrency::UNLIMITED()) {
*out = NULL;
return true;
}
const ConcurrencyLimiter* cl =
ConcurrencyLimiterExtension()->Find(amc.type().c_str()); 【3】根据名称获取并发配置
if (cl == NULL) {
LOG(ERROR) << "Fail to find ConcurrencyLimiter by `" << amc.value() << "'";
return false;
}
ConcurrencyLimiter* cl_copy = cl->New(amc);
if (cl_copy == NULL) {
LOG(ERROR) << "Fail to new ConcurrencyLimiter";
return false;
}
*out = cl_copy;
return true;
}
3、默认协议baidu_std中自适应并发控制的更新
SendRpcResponse中
ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
~ConcurrencyRemover调用OnResponded
ConcurrencyRemover::~ConcurrencyRemover() {
if (_status) {
_status->OnResponded(_c->ErrorCode(), butil::cpuwide_time_us() - _received_us);
_status = NULL;
}
ServerPrivateAccessor(_c->server()).RemoveConcurrency(_c);
}
OnResponded中调用 AutoConcurrencyLimiter::OnResponded
inline void MethodStatus::OnResponded(int error_code, int64_t latency) {
_nconcurrency.fetch_sub(1, butil::memory_order_relaxed);
if (0 == error_code) {
_latency_rec << latency;
} else {
_nerror_bvar << 1;
}
if (NULL != _cl) {
_cl->OnResponded(error_code, latency);
}
}
AutoConcurrencyLimiter::OnResponded调用AddSample
void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
...
bool sample_window_submitted = AddSample(error_code, latency_us,
now_time_us);
...
}
AddSample中调用UpdateMaxConcurrency
UpdateMaxConcurrency(sampling_time_us);
UpdateMaxConcurrency实现
本次分析调用关系,这里算法和说明文档的算法对于关系,暂时没有仔细分析。
void AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
int32_t total_succ_req = _total_succ_req.load(butil::memory_order_relaxed);
double failed_punish = _sw.total_failed_us * FLAGS_auto_cl_fail_punish_ratio;
int64_t avg_latency =
std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count);
double qps = 1000000.0 * total_succ_req / (sampling_time_us - _sw.start_time_us);
UpdateMinLatency(avg_latency);
UpdateQps(qps);
int next_max_concurrency = 0;
// Remeasure min_latency at regular intervals
if (_remeasure_start_us <= sampling_time_us) {
const double reduce_ratio = FLAGS_auto_cl_reduce_ratio_while_remeasure;
_reset_latency_us = sampling_time_us + avg_latency * 2;
next_max_concurrency =
std::ceil(_ema_max_qps * _min_latency_us / 1000000 * reduce_ratio);
} else {
const double change_step = FLAGS_auto_cl_change_rate_of_explore_ratio;
const double max_explore_ratio = FLAGS_auto_cl_max_explore_ratio;
const double min_explore_ratio = FLAGS_auto_cl_min_explore_ratio;
const double correction_factor = FLAGS_auto_cl_latency_fluctuation_correction_factor;
if (avg_latency <= _min_latency_us * (1.0 + min_explore_ratio * correction_factor) ||
qps <= _ema_max_qps / (1.0 + min_explore_ratio)) {
_explore_ratio = std::min(max_explore_ratio, _explore_ratio + change_step);
} else {
_explore_ratio = std::max(min_explore_ratio, _explore_ratio - change_step);
}
next_max_concurrency =
_min_latency_us * _ema_max_qps / 1000000 * (1 + _explore_ratio);
}
if (next_max_concurrency != _max_concurrency) {
_max_concurrency = next_max_concurrency;
}
}
4、默认协议中自适应并发的使用
如果并发超过设置_max_concurrency拒绝当前请求。
ProcessRpcRequest中
if (!method_status->OnRequested(&rejected_cc)) {
cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
mp->method->full_name().c_str(), rejected_cc);
break;
}
OnRequested实现
bool AutoConcurrencyLimiter::OnRequested(int current_concurrency) {
return current_concurrency <= _max_concurrency;
}