brpc 自适应限流分析

1、注册并发设置

GlobalInitializeOrDieImpl中

注册名称和对于的并发限制类。

    // Concurrency Limiters
    ConcurrencyLimiterExtension()->RegisterOrDie("auto", &g_ext->auto_cl);
    ConcurrencyLimiterExtension()->RegisterOrDie("constant", &g_ext->constant_cl);

2、设置并发使用的类

StartInternal中
    for (MethodMap::iterator it = _method_map.begin();
        it != _method_map.end(); ++it) {
        if (it->second.is_builtin_service) {
            it->second.status->SetConcurrencyLimiter(NULL);
        } else {
            const AdaptiveMaxConcurrency* amc = &it->second.max_concurrency;
            if (amc->type() == AdaptiveMaxConcurrency::UNLIMITED()) {
                amc = &_options.method_max_concurrency;
            }
            ConcurrencyLimiter* cl = NULL;
            if (!CreateConcurrencyLimiter(*amc, &cl)) {    【1】获取并发限制类
                LOG(ERROR) << "Fail to create ConcurrencyLimiter for method";
                return -1;
            }
            it->second.status->SetConcurrencyLimiter(cl);  【2】设置并发限制类
        }
    }
CreateConcurrencyLimiter实现:
static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
                                     ConcurrencyLimiter** out) {
    if (amc.type() == AdaptiveMaxConcurrency::UNLIMITED()) {
        *out = NULL;
        return true;
    }
    const ConcurrencyLimiter* cl =
        ConcurrencyLimiterExtension()->Find(amc.type().c_str()); 【3】根据名称获取并发配置
    if (cl == NULL) {
        LOG(ERROR) << "Fail to find ConcurrencyLimiter by `" << amc.value() << "'";
        return false;
    }
    ConcurrencyLimiter* cl_copy = cl->New(amc);
    if (cl_copy == NULL) {
        LOG(ERROR) << "Fail to new ConcurrencyLimiter";
        return false;
    }
    *out = cl_copy;
    return true;
}

3、默认协议baidu_std中自适应并发控制的更新

SendRpcResponse中

ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);

~ConcurrencyRemover调用OnResponded

ConcurrencyRemover::~ConcurrencyRemover() {
    if (_status) {
        _status->OnResponded(_c->ErrorCode(), butil::cpuwide_time_us() - _received_us);
        _status = NULL;
    }
    ServerPrivateAccessor(_c->server()).RemoveConcurrency(_c);
}

OnResponded中调用 AutoConcurrencyLimiter::OnResponded

inline void MethodStatus::OnResponded(int error_code, int64_t latency) {
    _nconcurrency.fetch_sub(1, butil::memory_order_relaxed);
    if (0 == error_code) {
        _latency_rec << latency;
    } else {
        _nerror_bvar << 1;
    }
    if (NULL != _cl) {
        _cl->OnResponded(error_code, latency);
    }
}

AutoConcurrencyLimiter::OnResponded调用AddSample

void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
...
            bool sample_window_submitted = AddSample(error_code, latency_us, 
                                                     now_time_us);
...
}

AddSample中调用UpdateMaxConcurrency

UpdateMaxConcurrency(sampling_time_us);

UpdateMaxConcurrency实现
本次分析调用关系,这里算法和说明文档的算法对于关系,暂时没有仔细分析。

void AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
    int32_t total_succ_req = _total_succ_req.load(butil::memory_order_relaxed);
    double failed_punish = _sw.total_failed_us * FLAGS_auto_cl_fail_punish_ratio;
    int64_t avg_latency = 
        std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count);
    double qps = 1000000.0 * total_succ_req / (sampling_time_us - _sw.start_time_us);
    UpdateMinLatency(avg_latency);
    UpdateQps(qps);

    int next_max_concurrency = 0;
    // Remeasure min_latency at regular intervals
    if (_remeasure_start_us <= sampling_time_us) {
        const double reduce_ratio = FLAGS_auto_cl_reduce_ratio_while_remeasure;
        _reset_latency_us = sampling_time_us + avg_latency * 2;
        next_max_concurrency = 
            std::ceil(_ema_max_qps * _min_latency_us / 1000000 * reduce_ratio);
    } else {
        const double change_step = FLAGS_auto_cl_change_rate_of_explore_ratio;
        const double max_explore_ratio = FLAGS_auto_cl_max_explore_ratio;
        const double min_explore_ratio = FLAGS_auto_cl_min_explore_ratio;
        const double correction_factor = FLAGS_auto_cl_latency_fluctuation_correction_factor;
        if (avg_latency <= _min_latency_us * (1.0 + min_explore_ratio * correction_factor) || 
            qps <= _ema_max_qps / (1.0 + min_explore_ratio)) {
            _explore_ratio  = std::min(max_explore_ratio, _explore_ratio + change_step); 
        } else {
            _explore_ratio = std::max(min_explore_ratio, _explore_ratio - change_step);
        }
        next_max_concurrency = 
            _min_latency_us * _ema_max_qps / 1000000 *  (1 + _explore_ratio);
    }

    if (next_max_concurrency != _max_concurrency) {
        _max_concurrency = next_max_concurrency;
    }
}

4、默认协议中自适应并发的使用

如果并发超过设置_max_concurrency拒绝当前请求。

ProcessRpcRequest中

            if (!method_status->OnRequested(&rejected_cc)) {
                cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
                                mp->method->full_name().c_str(), rejected_cc);
                break;
            }

OnRequested实现

bool AutoConcurrencyLimiter::OnRequested(int current_concurrency) {
    return current_concurrency <= _max_concurrency;
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。