brpc server端异步回包源码分析(二)

大家好,我是dandyhuang。上回给大家介绍了brpc从客户端到服务端整个收包的逻辑,详情可见Brpc 服务端收包源码分析(一),本次咱们介绍server端异步回包逻辑,同步直接response写数据即可。

server异步处理

// 异步回调
void helloServiceImpl::ServiceCb(brpc::Controller* cntl, ::hello::CommonResponse* response){
  LOG(INFO) << "response: " << response->ByteSize();
  if (cntl->ErrorCode() != 0) {
    LOG(ERROR) << " hello error code:" << cntl->ErrorCode() << " msg:" << cntl->ErrorText();
  }

  // std::unique_ptr makes sure cntl/response will be deleted before returning.
  std::unique_ptr<brpc::Controller> cntl_guard(cntl);
  std::unique_ptr<::hello::CommonResponse> response_guard(response);
  response_->Swap(response);
  // 回包
  done_->Run();
}

// 服务端
void helloServiceImpl::echo(::google::protobuf::RpcController* controller,
                               const ::hello::CommonRequest* request,
                               ::hello::CommonResponse* response,
                               ::google::protobuf::Closure* done) {
  // 初始化存储客户端发送的信息
  brpc::ClosureGuard done_guard(done);
  brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
  start_time_ = butil::gettimeofday_us();
  done_ = done;
  response_ = response;

    // channel初始化
  brpc::Channel channel;
  brpc::ChannelOptions options;
  options.protocol = FLAGS_protocol;
  options.connection_type = FLAGS_connection_type;
  options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
  options.max_retry = FLAGS_max_retry;
  if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) {
      LOG(ERROR) << "Fail to initialize channel";
      return -1;
  }

  auto cli_cntl = new brpc::Controller();
  ::hello::CommonResponse* cli_resp = new ::hello::CommonResponse();
    // 调用下游服务
  hello::CommonService_Stub stub(channel);
  auto cli_done = brpc::NewCallback(this, &helloServiceImpl::ServiceCb, cli_cntl, cli_resp);
  stub.echo(cli_cntl, request, cli_resp, cli_done);
    // 先释放、并没有立刻回包给客户端
  done_guard.release();
}

~ClosureGuard() {
  // _done为NULL,不执行run
  if (_done) {
    _done->Run();
  }
}

google::protobuf::Closure* release() {
  google::protobuf::Closure* const prev_done = _done;
  // 提前先将_done置空,析构的时候,就不会回包
  _done = NULL;
  return prev_done;
}

⚠️注意:我们这里当服务端收到数据时,我们brpc::ClosureGuard 将done节点存储起来,在调用下游服务后,再调用done_guard.release(),将done先释放,但不回包。在处理完业务逻辑后,异步回调,在调用done_->Run()。对客户端进行回包,从而达到异步的效果。

ProcessHttpRequest回包逻辑

void ProcessHttpRequest(InputMessageBase *msg) {
    Controller* cntl = new (std::nothrow) Controller;
    // cntl存储
    HttpResponseSender resp_sender(cntl);
    // resp存储
    google::protobuf::Message* res = svc->GetResponsePrototype(method).New();
    resp_sender.own_response(res);
    // 最终都会存到HttpResponseSenderAsDone
    google::protobuf::Closure* done = new HttpResponseSenderAsDone(&resp_sender);
   ···
    // res都会存储到resp_sender中
    svc->CallMethod(method, cntl, req, res, done);
  
}

以http例子为,回包resdone都会存储到HttpResponseSenderAsDone中,所以业务逻辑代码中done_->Run();我们看它是如何回包的。

HttpResponseSenderAsDone->Run()

class HttpResponseSenderAsDone : public google::protobuf::Closure {
public:
    HttpResponseSenderAsDone(HttpResponseSender* s) : _sender(std::move(*s)) {}
    void Run() override { delete this; }
private:
    // 析构delete
    HttpResponseSender _sender;
};

HttpResponseSender::~HttpResponseSender() {
    // cntl信息
    Controller* cntl = _cntl.get();
    if (cntl == NULL) {
        return;
    }
    ControllerPrivateAccessor accessor(cntl);
    Span* span = accessor.span();
    if (span) {
        span->set_start_send_us(butil::cpuwide_time_us());
    }
    ConcurrencyRemover concurrency_remover(_method_status, cntl, _received_us);
    // 获取套接字信息
    Socket* socket = accessor.get_sending_socket();
    // 回包的pb msg。就是我们填充respone返回的protobuf的数据
    const google::protobuf::Message* res = _res.get();
    // 判断一下client是否连接关闭
    if (cntl->IsCloseConnection()) {
        socket->SetFailed();
        return;
    }
    // 请求的包头
    const HttpHeader* req_header = &cntl->http_request();
    // 返回http的包头
    HttpHeader* res_header = &cntl->http_response();
    res_header->set_version(req_header->major_version(),
                            req_header->minor_version());
    //后续都是设置一些http回包的包头
    const std::string* content_type_str = &res_header->content_type();
    if (content_type_str->empty()) {
        // Use request's content_type if response's is not set.
        content_type_str = &req_header->content_type();
        res_header->set_content_type(*content_type_str);
    }
    bool is_grpc_ct = false;
    const HttpContentType content_type = ParseContentType(*content_type_str, &is_grpc_ct);
    const bool is_http2 = req_header->is_http2();
    const bool is_grpc = (is_http2 && is_grpc_ct);

    if (res != NULL &&
        cntl->response_attachment().empty() && res->GetDescriptor()->field_count() > 0 &&!cntl->Failed()) { 
        butil::IOBufAsZeroCopyOutputStream wrapper(&cntl->response_attachment());
        // 对body 是json和pb的转化处理
        if (content_type == HTTP_CONTENT_PROTO) {
            if (!res->SerializeToZeroCopyStream(&wrapper)) {
                cntl->SetFailed(ERESPONSE, "Fail to serialize %s", res->GetTypeName().c_str());
            }
        } else {
            std::string err;
            json2pb::Pb2JsonOptions opt;
            opt.bytes_to_base64 = cntl->has_pb_bytes_to_base64();
            opt.jsonify_empty_array = cntl->has_pb_jsonify_empty_array();
            opt.always_print_primitive_fields = cntl->has_always_print_primitive_fields();
            opt.enum_option = (FLAGS_pb_enum_as_number
                               ? json2pb::OUTPUT_ENUM_BY_NUMBER
                               : json2pb::OUTPUT_ENUM_BY_NAME);
            if (!json2pb::ProtoMessageToJson(*res, &wrapper, opt, &err)) {
                cntl->SetFailed(ERESPONSE, "Fail to convert response to json, %s", err.c_str());
            }
        }
    }
        // 判断是http1.0和1.1还是grpc等,对应的包头的设置
    if (!is_http2) {
        const std::string* res_conn = res_header->GetHeader(common->CONNECTION);
        if (res_conn == NULL || strcasecmp(res_conn->c_str(), "close") != 0) {
            const std::string* req_conn =
                req_header->GetHeader(common->CONNECTION);
            if (req_header->before_http_1_1()) {
                if (req_conn != NULL &&
                    strcasecmp(req_conn->c_str(), "keep-alive") == 0) {
                    res_header->SetHeader(common->CONNECTION, common->KEEP_ALIVE);
                }
            } else {
                if (req_conn != NULL &&
                    strcasecmp(req_conn->c_str(), "close") == 0) {
                    res_header->SetHeader(common->CONNECTION, common->CLOSE);
                }
            }
        } // else user explicitly set Connection:close, clients of
        // HTTP 1.1/1.0/0.9 should all close the connection.
    } else if (is_grpc) {
        // status code is always 200 according to grpc protocol
        res_header->set_status_code(HTTP_STATUS_OK);
    }
    
    bool grpc_compressed = false;
    // 失败处理
    if (cntl->Failed()) {
        cntl->response_attachment().clear();
        if (!is_grpc) {
            // Set status-code with default value(converted from error code)
            // if user did not set it.
            if (res_header->status_code() == HTTP_STATUS_OK) {
                res_header->set_status_code(ErrorCodeToStatusCode(cntl->ErrorCode()));
            }
            // Fill ErrorCode into header
            res_header->SetHeader(common->ERROR_CODE,
                                  butil::string_printf("%d", cntl->ErrorCode()));
            res_header->RemoveHeader(common->CONTENT_ENCODING);
            res_header->set_content_type(common->CONTENT_TYPE_TEXT);
            cntl->response_attachment().append(cntl->ErrorText());
        }
       // chunked 协议的处理
    } else if (cntl->has_progressive_writer()) {
        // Transfer-Encoding is supported since HTTP/1.1
        if (res_header->major_version() < 2 && !res_header->before_http_1_1()) {
            res_header->SetHeader("Transfer-Encoding", "chunked");
        }
        if (!cntl->response_attachment().empty()) {
            LOG(ERROR) << "response_attachment(size="
                       << cntl->response_attachment().size() << ") will be"
                " ignored when CreateProgressiveAttachment() was called";
        }
        // gzip压缩处理 
    } else if (cntl->response_compress_type() == COMPRESS_TYPE_GZIP) {
        const size_t response_size = cntl->response_attachment().size();
        if (response_size >= (size_t)FLAGS_http_body_compress_threshold
            && (is_http2 || SupportGzip(cntl))) {
            TRACEPRINTF("Compressing response=%lu", (unsigned long)response_size);
            butil::IOBuf tmpbuf;
            if (GzipCompress(cntl->response_attachment(), &tmpbuf, NULL)) {
                cntl->response_attachment().swap(tmpbuf);
                if (is_grpc) {
                    grpc_compressed = true;
                    res_header->SetHeader(common->GRPC_ENCODING, common->GZIP);
                } else {
                    res_header->SetHeader(common->CONTENT_ENCODING, common->GZIP);
                }
            } else {
                LOG(ERROR) << "Fail to gzip the http response, skip compression.";
            }
        }
    } else {
        // TODO(gejun): Support snappy (grpc)
        LOG_IF(ERROR, cntl->response_compress_type() != COMPRESS_TYPE_NONE)
            << "Unknown compress_type=" << cntl->response_compress_type()
            << ", skip compression.";
    }

    int rc = -1;
    // Have the risk of unlimited pending responses, in which case, tell
    // users to set max_concurrency.
    Socket::WriteOptions wopt;
    wopt.ignore_eovercrowded = true;
    // h2协议
    if (is_http2) {
        if (is_grpc) {
            // Append compressed and length before body
            AddGrpcPrefix(&cntl->response_attachment(), grpc_compressed);
        }
        // grpc协议构建
        SocketMessagePtr<H2UnsentResponse> h2_response(
                H2UnsentResponse::New(cntl, _h2_stream_id, is_grpc));
        if (h2_response == NULL) {
            LOG(ERROR) << "Fail to make http2 response";
            errno = EINVAL;
            rc = -1;
        } else {
            if (FLAGS_http_verbose) {
                LOG(INFO) << '\n' << *h2_response;
            }
            if (span) {
                span->set_response_size(h2_response->EstimatedByteSize());
            }
            // 发送数据
            rc = socket->Write(h2_response, &wopt);
        }
    } else {
        // http协议
        butil::IOBuf* content = NULL;
        if (cntl->Failed() || !cntl->has_progressive_writer()) {
            content = &cntl->response_attachment();
        }
        butil::IOBuf res_buf;
        // 构建http协议回包,包头+body
        MakeRawHttpResponse(&res_buf, res_header, content);
        if (FLAGS_http_verbose) {
            PrintMessage(res_buf, false, !!content);
        }
        if (span) {
            span->set_response_size(res_buf.size());
        }
        // 发送数据
        rc = socket->Write(&res_buf, &wopt);
    }

    if (rc != 0) {
        // EPIPE is common in pooled connections + backup requests.
        const int errcode = errno;
        PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *socket;
        cntl->SetFailed(errcode, "Fail to write into %s", socket->description().c_str());
        return;
    }
    if (span) {
        // TODO: this is not sent
        span->set_sent_us(butil::cpuwide_time_us());
    }
}

根据control和resp message信息,构造http包数据。最终套接字socket->Write;

Socket::Write写数据

int Socket::Write(butil::IOBuf* data, const WriteOptions* options_in) {
    WriteOptions opt;
    if (options_in) {
        opt = *options_in;
    }
    // 判断数据是否为空
    if (data->empty()) {
        return SetError(opt.id_wait, EINVAL);
    }
    if (opt.pipelined_count > MAX_PIPELINED_COUNT) {
        LOG(ERROR) << "pipelined_count=" << opt.pipelined_count
                   << " is too large";
        return SetError(opt.id_wait, EOVERFLOW);
    }
    if (Failed()) {
        const int rc = ConductError(opt.id_wait);
        if (rc <= 0) {
            return rc;
        }
    }

    if (!opt.ignore_eovercrowded && _overcrowded) {
        return SetError(opt.id_wait, EOVERCROWDED);
    }
        // 获取WriteRequest
    WriteRequest* req = butil::get_object<WriteRequest>();
    if (!req) {
        return SetError(opt.id_wait, ENOMEM);
    }
        // 数据存储
    req->data.swap(*data);
    req->next = WriteRequest::UNCONNECTED;
    req->id_wait = opt.id_wait;
    // 设置pipe个数等信息,如redis的pipeline调用
    req->set_pipelined_count_and_user_message(
        opt.pipelined_count, DUMMY_USER_MESSAGE, opt.with_auth);
    return StartWrite(req, opt);
}

构造WriteRequest,调用StartWrite

StartWrite

int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) {
    // Release fence makes sure the thread getting request sees *req
    WriteRequest* const prev_head =
        _write_head.exchange(req, butil::memory_order_release);
    if (prev_head != NULL) {
        req->next = prev_head;
        return 0;
    }

    int saved_errno = 0;
    bthread_t th;
    SocketUniquePtr ptr_for_keep_write;
    ssize_t nw = 0;

    // We've got the right to write.
    req->next = NULL;
    
    // 判断fd是否断开链接了
    int ret = ConnectIfNot(opt.abstime, req);
    if (ret < 0) {
        saved_errno = errno;
        SetFailed(errno, "Fail to connect %s directly: %m", description().c_str());
        goto FAIL_TO_WRITE;
    } else if (ret == 1) {
        return 0;
    }

    // 记录未发送的数据大小,pipe个数大小,如果发送失败,可以继续发送
    req->Setup(this);
    if (_conn) {
        butil::IOBuf* data_arr[1] = { &req->data };
        nw = _conn->CutMessageIntoFileDescriptor(fd(), data_arr, 1);
    } else {
        // 看是发送数据writev写多个非连续缓冲区
        nw = req->data.cut_into_file_descriptor(fd());
    }
    if (nw < 0) {
        // 判断发送的失败是否正常
        if (errno != EAGAIN && errno != EOVERCROWDED) {
            saved_errno = errno;
            // EPIPE is common in pooled connections + backup requests.
            PLOG_IF(WARNING, errno != EPIPE) << "Fail to write into " << *this;
            SetFailed(saved_errno, "Fail to write into %s: %s", 
                      description().c_str(), berror(saved_errno));
            goto FAIL_TO_WRITE;
        }
    } else {
        // 将已发送的数据进行记录
        AddOutputBytes(nw);
    }
    // 判断有新的请求,没有新请求,则返回。有则继续写入
    if (IsWriteComplete(req, true, NULL)) {
        ReturnSuccessfulWriteRequest(req);
        return 0;
    }

KEEPWRITE_IN_BACKGROUND:
    ReAddress(&ptr_for_keep_write);
    req->socket = ptr_for_keep_write.release();
    // 如果发送失败,或者还有剩余的包,继续发送
    if (bthread_start_background(&th, &BTHREAD_ATTR_NORMAL,
                                 KeepWrite, req) != 0) {
        LOG(FATAL) << "Fail to start KeepWrite";
        KeepWrite(req);
    }
    return 0;

FAIL_TO_WRITE:
    // `SetFailed' before `ReturnFailedWriteRequest' (which will calls
    // `on_reset' callback inside the id object) so that we immediately
    // know this socket has failed inside the `on_reset' callback
    ReleaseAllFailedWriteRequests(req);
    errno = saved_errno;
    return -1;
}

cut_into_file_descriptor()中,_ref_at分好数据,开始调用系统函数writev写多个非连续缓冲区数据。写成功,则清空iobuf中的数据,如果写失败。正常的异常错误。继续keepwrite写。IsWriteComplete中判断是否有没有新的请求。没有新请求,则返回。有则执行,并调用KeepWrite继续写。 而KeepWrite和StartWrite逻辑大致相同。

总结

整个逻辑处理,还是比较清晰。收到回调函数,在处理完业务逻辑后,校验一些包的合法性等。 就直接调用writev函数回包给客户端了。至此,整个服务端的处理逻辑我们都分析完了,下期我们来分析一下brpc 客户端是如何调用的。

大家可以添加我的wx一起探讨

我是dandyhuang_,码字不易,点个小赞,只希望大家能更加明白

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容