大家好,我是dandyhuang。上回给大家介绍了brpc从客户端到服务端整个收包的逻辑,详情可见Brpc 服务端收包源码分析(一),本次咱们介绍server端异步回包逻辑,同步直接response写数据即可。
server异步处理
// 异步回调
void helloServiceImpl::ServiceCb(brpc::Controller* cntl, ::hello::CommonResponse* response){
LOG(INFO) << "response: " << response->ByteSize();
if (cntl->ErrorCode() != 0) {
LOG(ERROR) << " hello error code:" << cntl->ErrorCode() << " msg:" << cntl->ErrorText();
}
// std::unique_ptr makes sure cntl/response will be deleted before returning.
std::unique_ptr<brpc::Controller> cntl_guard(cntl);
std::unique_ptr<::hello::CommonResponse> response_guard(response);
response_->Swap(response);
// 回包
done_->Run();
}
// 服务端
void helloServiceImpl::echo(::google::protobuf::RpcController* controller,
const ::hello::CommonRequest* request,
::hello::CommonResponse* response,
::google::protobuf::Closure* done) {
// 初始化存储客户端发送的信息
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
start_time_ = butil::gettimeofday_us();
done_ = done;
response_ = response;
// channel初始化
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = FLAGS_protocol;
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
options.max_retry = FLAGS_max_retry;
if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) {
LOG(ERROR) << "Fail to initialize channel";
return -1;
}
auto cli_cntl = new brpc::Controller();
::hello::CommonResponse* cli_resp = new ::hello::CommonResponse();
// 调用下游服务
hello::CommonService_Stub stub(channel);
auto cli_done = brpc::NewCallback(this, &helloServiceImpl::ServiceCb, cli_cntl, cli_resp);
stub.echo(cli_cntl, request, cli_resp, cli_done);
// 先释放、并没有立刻回包给客户端
done_guard.release();
}
~ClosureGuard() {
// _done为NULL,不执行run
if (_done) {
_done->Run();
}
}
google::protobuf::Closure* release() {
google::protobuf::Closure* const prev_done = _done;
// 提前先将_done置空,析构的时候,就不会回包
_done = NULL;
return prev_done;
}
⚠️注意:我们这里当服务端收到数据时,我们brpc::ClosureGuard 将done节点存储起来,在调用下游服务后,再调用done_guard.release(),将done先释放,但不回包。在处理完业务逻辑后,异步回调,在调用done_->Run()。对客户端进行回包,从而达到异步的效果。
ProcessHttpRequest回包逻辑
void ProcessHttpRequest(InputMessageBase *msg) {
Controller* cntl = new (std::nothrow) Controller;
// cntl存储
HttpResponseSender resp_sender(cntl);
// resp存储
google::protobuf::Message* res = svc->GetResponsePrototype(method).New();
resp_sender.own_response(res);
// 最终都会存到HttpResponseSenderAsDone
google::protobuf::Closure* done = new HttpResponseSenderAsDone(&resp_sender);
···
// res都会存储到resp_sender中
svc->CallMethod(method, cntl, req, res, done);
}
以http例子为,回包res
和done
都会存储到HttpResponseSenderAsDone
中,所以业务逻辑代码中done_->Run();我们看它是如何回包的。
HttpResponseSenderAsDone->Run()
class HttpResponseSenderAsDone : public google::protobuf::Closure {
public:
HttpResponseSenderAsDone(HttpResponseSender* s) : _sender(std::move(*s)) {}
void Run() override { delete this; }
private:
// 析构delete
HttpResponseSender _sender;
};
HttpResponseSender::~HttpResponseSender() {
// cntl信息
Controller* cntl = _cntl.get();
if (cntl == NULL) {
return;
}
ControllerPrivateAccessor accessor(cntl);
Span* span = accessor.span();
if (span) {
span->set_start_send_us(butil::cpuwide_time_us());
}
ConcurrencyRemover concurrency_remover(_method_status, cntl, _received_us);
// 获取套接字信息
Socket* socket = accessor.get_sending_socket();
// 回包的pb msg。就是我们填充respone返回的protobuf的数据
const google::protobuf::Message* res = _res.get();
// 判断一下client是否连接关闭
if (cntl->IsCloseConnection()) {
socket->SetFailed();
return;
}
// 请求的包头
const HttpHeader* req_header = &cntl->http_request();
// 返回http的包头
HttpHeader* res_header = &cntl->http_response();
res_header->set_version(req_header->major_version(),
req_header->minor_version());
//后续都是设置一些http回包的包头
const std::string* content_type_str = &res_header->content_type();
if (content_type_str->empty()) {
// Use request's content_type if response's is not set.
content_type_str = &req_header->content_type();
res_header->set_content_type(*content_type_str);
}
bool is_grpc_ct = false;
const HttpContentType content_type = ParseContentType(*content_type_str, &is_grpc_ct);
const bool is_http2 = req_header->is_http2();
const bool is_grpc = (is_http2 && is_grpc_ct);
if (res != NULL &&
cntl->response_attachment().empty() && res->GetDescriptor()->field_count() > 0 &&!cntl->Failed()) {
butil::IOBufAsZeroCopyOutputStream wrapper(&cntl->response_attachment());
// 对body 是json和pb的转化处理
if (content_type == HTTP_CONTENT_PROTO) {
if (!res->SerializeToZeroCopyStream(&wrapper)) {
cntl->SetFailed(ERESPONSE, "Fail to serialize %s", res->GetTypeName().c_str());
}
} else {
std::string err;
json2pb::Pb2JsonOptions opt;
opt.bytes_to_base64 = cntl->has_pb_bytes_to_base64();
opt.jsonify_empty_array = cntl->has_pb_jsonify_empty_array();
opt.always_print_primitive_fields = cntl->has_always_print_primitive_fields();
opt.enum_option = (FLAGS_pb_enum_as_number
? json2pb::OUTPUT_ENUM_BY_NUMBER
: json2pb::OUTPUT_ENUM_BY_NAME);
if (!json2pb::ProtoMessageToJson(*res, &wrapper, opt, &err)) {
cntl->SetFailed(ERESPONSE, "Fail to convert response to json, %s", err.c_str());
}
}
}
// 判断是http1.0和1.1还是grpc等,对应的包头的设置
if (!is_http2) {
const std::string* res_conn = res_header->GetHeader(common->CONNECTION);
if (res_conn == NULL || strcasecmp(res_conn->c_str(), "close") != 0) {
const std::string* req_conn =
req_header->GetHeader(common->CONNECTION);
if (req_header->before_http_1_1()) {
if (req_conn != NULL &&
strcasecmp(req_conn->c_str(), "keep-alive") == 0) {
res_header->SetHeader(common->CONNECTION, common->KEEP_ALIVE);
}
} else {
if (req_conn != NULL &&
strcasecmp(req_conn->c_str(), "close") == 0) {
res_header->SetHeader(common->CONNECTION, common->CLOSE);
}
}
} // else user explicitly set Connection:close, clients of
// HTTP 1.1/1.0/0.9 should all close the connection.
} else if (is_grpc) {
// status code is always 200 according to grpc protocol
res_header->set_status_code(HTTP_STATUS_OK);
}
bool grpc_compressed = false;
// 失败处理
if (cntl->Failed()) {
cntl->response_attachment().clear();
if (!is_grpc) {
// Set status-code with default value(converted from error code)
// if user did not set it.
if (res_header->status_code() == HTTP_STATUS_OK) {
res_header->set_status_code(ErrorCodeToStatusCode(cntl->ErrorCode()));
}
// Fill ErrorCode into header
res_header->SetHeader(common->ERROR_CODE,
butil::string_printf("%d", cntl->ErrorCode()));
res_header->RemoveHeader(common->CONTENT_ENCODING);
res_header->set_content_type(common->CONTENT_TYPE_TEXT);
cntl->response_attachment().append(cntl->ErrorText());
}
// chunked 协议的处理
} else if (cntl->has_progressive_writer()) {
// Transfer-Encoding is supported since HTTP/1.1
if (res_header->major_version() < 2 && !res_header->before_http_1_1()) {
res_header->SetHeader("Transfer-Encoding", "chunked");
}
if (!cntl->response_attachment().empty()) {
LOG(ERROR) << "response_attachment(size="
<< cntl->response_attachment().size() << ") will be"
" ignored when CreateProgressiveAttachment() was called";
}
// gzip压缩处理
} else if (cntl->response_compress_type() == COMPRESS_TYPE_GZIP) {
const size_t response_size = cntl->response_attachment().size();
if (response_size >= (size_t)FLAGS_http_body_compress_threshold
&& (is_http2 || SupportGzip(cntl))) {
TRACEPRINTF("Compressing response=%lu", (unsigned long)response_size);
butil::IOBuf tmpbuf;
if (GzipCompress(cntl->response_attachment(), &tmpbuf, NULL)) {
cntl->response_attachment().swap(tmpbuf);
if (is_grpc) {
grpc_compressed = true;
res_header->SetHeader(common->GRPC_ENCODING, common->GZIP);
} else {
res_header->SetHeader(common->CONTENT_ENCODING, common->GZIP);
}
} else {
LOG(ERROR) << "Fail to gzip the http response, skip compression.";
}
}
} else {
// TODO(gejun): Support snappy (grpc)
LOG_IF(ERROR, cntl->response_compress_type() != COMPRESS_TYPE_NONE)
<< "Unknown compress_type=" << cntl->response_compress_type()
<< ", skip compression.";
}
int rc = -1;
// Have the risk of unlimited pending responses, in which case, tell
// users to set max_concurrency.
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
// h2协议
if (is_http2) {
if (is_grpc) {
// Append compressed and length before body
AddGrpcPrefix(&cntl->response_attachment(), grpc_compressed);
}
// grpc协议构建
SocketMessagePtr<H2UnsentResponse> h2_response(
H2UnsentResponse::New(cntl, _h2_stream_id, is_grpc));
if (h2_response == NULL) {
LOG(ERROR) << "Fail to make http2 response";
errno = EINVAL;
rc = -1;
} else {
if (FLAGS_http_verbose) {
LOG(INFO) << '\n' << *h2_response;
}
if (span) {
span->set_response_size(h2_response->EstimatedByteSize());
}
// 发送数据
rc = socket->Write(h2_response, &wopt);
}
} else {
// http协议
butil::IOBuf* content = NULL;
if (cntl->Failed() || !cntl->has_progressive_writer()) {
content = &cntl->response_attachment();
}
butil::IOBuf res_buf;
// 构建http协议回包,包头+body
MakeRawHttpResponse(&res_buf, res_header, content);
if (FLAGS_http_verbose) {
PrintMessage(res_buf, false, !!content);
}
if (span) {
span->set_response_size(res_buf.size());
}
// 发送数据
rc = socket->Write(&res_buf, &wopt);
}
if (rc != 0) {
// EPIPE is common in pooled connections + backup requests.
const int errcode = errno;
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *socket;
cntl->SetFailed(errcode, "Fail to write into %s", socket->description().c_str());
return;
}
if (span) {
// TODO: this is not sent
span->set_sent_us(butil::cpuwide_time_us());
}
}
根据control和resp message信息,构造http包数据。最终套接字socket->Write;
Socket::Write写数据
int Socket::Write(butil::IOBuf* data, const WriteOptions* options_in) {
WriteOptions opt;
if (options_in) {
opt = *options_in;
}
// 判断数据是否为空
if (data->empty()) {
return SetError(opt.id_wait, EINVAL);
}
if (opt.pipelined_count > MAX_PIPELINED_COUNT) {
LOG(ERROR) << "pipelined_count=" << opt.pipelined_count
<< " is too large";
return SetError(opt.id_wait, EOVERFLOW);
}
if (Failed()) {
const int rc = ConductError(opt.id_wait);
if (rc <= 0) {
return rc;
}
}
if (!opt.ignore_eovercrowded && _overcrowded) {
return SetError(opt.id_wait, EOVERCROWDED);
}
// 获取WriteRequest
WriteRequest* req = butil::get_object<WriteRequest>();
if (!req) {
return SetError(opt.id_wait, ENOMEM);
}
// 数据存储
req->data.swap(*data);
req->next = WriteRequest::UNCONNECTED;
req->id_wait = opt.id_wait;
// 设置pipe个数等信息,如redis的pipeline调用
req->set_pipelined_count_and_user_message(
opt.pipelined_count, DUMMY_USER_MESSAGE, opt.with_auth);
return StartWrite(req, opt);
}
构造WriteRequest
,调用StartWrite
StartWrite
int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) {
// Release fence makes sure the thread getting request sees *req
WriteRequest* const prev_head =
_write_head.exchange(req, butil::memory_order_release);
if (prev_head != NULL) {
req->next = prev_head;
return 0;
}
int saved_errno = 0;
bthread_t th;
SocketUniquePtr ptr_for_keep_write;
ssize_t nw = 0;
// We've got the right to write.
req->next = NULL;
// 判断fd是否断开链接了
int ret = ConnectIfNot(opt.abstime, req);
if (ret < 0) {
saved_errno = errno;
SetFailed(errno, "Fail to connect %s directly: %m", description().c_str());
goto FAIL_TO_WRITE;
} else if (ret == 1) {
return 0;
}
// 记录未发送的数据大小,pipe个数大小,如果发送失败,可以继续发送
req->Setup(this);
if (_conn) {
butil::IOBuf* data_arr[1] = { &req->data };
nw = _conn->CutMessageIntoFileDescriptor(fd(), data_arr, 1);
} else {
// 看是发送数据writev写多个非连续缓冲区
nw = req->data.cut_into_file_descriptor(fd());
}
if (nw < 0) {
// 判断发送的失败是否正常
if (errno != EAGAIN && errno != EOVERCROWDED) {
saved_errno = errno;
// EPIPE is common in pooled connections + backup requests.
PLOG_IF(WARNING, errno != EPIPE) << "Fail to write into " << *this;
SetFailed(saved_errno, "Fail to write into %s: %s",
description().c_str(), berror(saved_errno));
goto FAIL_TO_WRITE;
}
} else {
// 将已发送的数据进行记录
AddOutputBytes(nw);
}
// 判断有新的请求,没有新请求,则返回。有则继续写入
if (IsWriteComplete(req, true, NULL)) {
ReturnSuccessfulWriteRequest(req);
return 0;
}
KEEPWRITE_IN_BACKGROUND:
ReAddress(&ptr_for_keep_write);
req->socket = ptr_for_keep_write.release();
// 如果发送失败,或者还有剩余的包,继续发送
if (bthread_start_background(&th, &BTHREAD_ATTR_NORMAL,
KeepWrite, req) != 0) {
LOG(FATAL) << "Fail to start KeepWrite";
KeepWrite(req);
}
return 0;
FAIL_TO_WRITE:
// `SetFailed' before `ReturnFailedWriteRequest' (which will calls
// `on_reset' callback inside the id object) so that we immediately
// know this socket has failed inside the `on_reset' callback
ReleaseAllFailedWriteRequests(req);
errno = saved_errno;
return -1;
}
cut_into_file_descriptor()中,_ref_at分好数据,开始调用系统函数writev写多个非连续缓冲区数据。写成功,则清空iobuf中的数据,如果写失败。正常的异常错误。继续keepwrite写。IsWriteComplete中判断是否有没有新的请求。没有新请求,则返回。有则执行,并调用KeepWrite继续写。 而KeepWrite和StartWrite逻辑大致相同。
总结
整个逻辑处理,还是比较清晰。收到回调函数,在处理完业务逻辑后,校验一些包的合法性等。 就直接调用writev函数回包给客户端了。至此,整个服务端的处理逻辑我们都分析完了,下期我们来分析一下brpc 客户端是如何调用的。
大家可以添加我的wx一起探讨
我是dandyhuang_,码字不易,点个小赞,只希望大家能更加明白