Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize server latency #2886

Merged
merged 2 commits into from
Mar 21, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Optimize server latency
chenBright committed Mar 21, 2025
commit 0c324f03f98055998399404a7a1669e5f434c95b
15 changes: 14 additions & 1 deletion src/brpc/details/method_status.cpp
Original file line number Diff line number Diff line change
@@ -149,9 +149,22 @@ void MethodStatus::SetConcurrencyLimiter(ConcurrencyLimiter* cl) {
_cl.reset(cl);
}

int HandleResponseWritten(bthread_id_t id, void* data, int error_code,
const std::string& error_text) {
auto args = static_cast<ResponseWriteInfo*>(data);
args->error_code = error_code;
args->error_text = error_text;
args->sent_us = butil::cpuwide_time_us();
CHECK_EQ(0, bthread_id_unlock_and_destroy(id));
return 0;
}

ConcurrencyRemover::~ConcurrencyRemover() {
if (_status) {
_status->OnResponded(_c->ErrorCode(), butil::cpuwide_time_us() - _received_us);
if (_sent_us < _received_us) {
_sent_us = butil::cpuwide_time_us();
}
_status->OnResponded(_c->ErrorCode(), _sent_us - _received_us);
_status = NULL;
}
ServerPrivateAccessor(_c->server()).RemoveConcurrency(_c);
18 changes: 14 additions & 4 deletions src/brpc/details/method_status.h
Original file line number Diff line number Diff line change
@@ -75,18 +75,28 @@ friend class Server;
bvar::PassiveStatus<int32_t> _max_concurrency_bvar;
};

struct ResponseWriteInfo {
int error_code{0};
std::string error_text;
int64_t sent_us{0};
};

int HandleResponseWritten(bthread_id_t id, void* data, int error_code,
const std::string& error_text);

class ConcurrencyRemover {
public:
ConcurrencyRemover(MethodStatus* status, Controller* c, int64_t received_us)
: _status(status)
, _c(c)
, _received_us(received_us) {}
: _status(status) , _c(c) , _received_us(received_us) {}
~ConcurrencyRemover();

void set_sent_us(int64_t sent_us) { _sent_us = sent_us; }
private:
DISALLOW_COPY_AND_ASSIGN(ConcurrencyRemover);
MethodStatus* _status;
Controller* _c;
uint64_t _received_us;
int64_t _received_us;
int64_t _sent_us{0};
};

inline bool MethodStatus::OnRequested(int* rejected_cc, Controller* cntl) {
111 changes: 76 additions & 35 deletions src/brpc/policy/baidu_rpc_protocol.cpp
Original file line number Diff line number Diff line change
@@ -184,37 +184,43 @@ struct BaiduProxyPBMessages : public RpcPBMessages {
}

// Used by UT, can't be static.
void SendRpcResponse(int64_t correlation_id,
Controller* cntl,
RpcPBMessages* messages,
const Server* server,
MethodStatus* method_status,
int64_t received_us) {
void SendRpcResponse(int64_t correlation_id, Controller* cntl,
RpcPBMessages* messages, const Server* server,
MethodStatus* method_status, int64_t received_us) {
ControllerPrivateAccessor accessor(cntl);
Span* span = accessor.span();
if (span) {
span->set_start_send_us(butil::cpuwide_time_us());
}
Socket* sock = accessor.get_sending_socket();

std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
const google::protobuf::Message* req = NULL == messages ? NULL : messages->Request();
const google::protobuf::Message* res = NULL == messages ? NULL : messages->Response();

ResponseWriteInfo args;

// Recycle resources at the end of this function.
BRPC_SCOPE_EXIT {
{
// Remove concurrency and record latency at first.
ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
concurrency_remover.set_sent_us(args.sent_us);
}

std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);


auto messages_guard = butil::MakeScopeGuard([server, messages] {
if (NULL == messages) {
return;
}
if (NULL != server->options().baidu_master_service) {
BaiduProxyPBMessages::Return(static_cast<BaiduProxyPBMessages*>(messages));
} else {

cntl->CallAfterRpcResp(req, res);
if (NULL == server->options().baidu_master_service) {
server->options().rpc_pb_message_factory->Return(messages);
} else {
BaiduProxyPBMessages::Return(static_cast<BaiduProxyPBMessages*>(messages));
}
});

const google::protobuf::Message* req = NULL == messages ? NULL : messages->Request();
const google::protobuf::Message* res = NULL == messages ? NULL : messages->Response();
ClosureGuard guard(brpc::NewCallback(
cntl, &Controller::CallAfterRpcResp, req, res));
};

StreamIds response_stream_ids = accessor.response_streams();

@@ -302,29 +308,65 @@ void SendRpcResponse(int64_t correlation_id,
if (span) {
span->set_response_size(res_buf.size());
}

bthread_id_t response_id;
CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten));
// Send rpc response over stream even if server side failed to create
// stream for some reason.
if(cntl->has_remote_stream()){
if (cntl->has_remote_stream()) {
// Send the response over stream to notify that this stream connection
// is successfully built.
// Response_stream can be INVALID_STREAM_ID when error occurs.
if (SendStreamData(sock, &res_buf,
accessor.remote_stream_settings()->stream_id(),
response_stream_id) != 0) {
response_stream_id, response_id) != 0) {
error_code = errno;
PLOG_IF(WARNING, error_code != EPIPE)
<< "Fail to write into " << sock->description();
cntl->SetFailed(error_code, "Fail to write into %s",
sock->description().c_str());
Stream::SetFailed(response_stream_ids, error_code,
"Fail to write into %s",
sock->description().c_str());
}
} else{
// Have the risk of unlimited pending responses, in which case, tell
// users to set max_concurrency.
Socket::WriteOptions wopt;
wopt.id_wait = response_id;
wopt.notify_on_success = true;
wopt.ignore_eovercrowded = true;
if (sock->Write(&res_buf, &wopt) != 0) {
const int errcode = errno;
std::string error_text = butil::string_printf(64, "Fail to write into %s",
sock->description().c_str());
PLOG_IF(WARNING, errcode != EPIPE) << error_text;
cntl->SetFailed(errcode, "%s", error_text.c_str());
Stream::SetFailed(response_stream_ids, errcode, "%s",
error_text.c_str());
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
cntl->SetFailed(errcode, "Fail to write into %s",
sock->description().c_str());
return;
}
}

bthread_id_join(response_id);

error_code = args.error_code;
if (cntl->has_remote_stream()) {
if (0 != error_code) {
LOG_IF(WARNING, error_code != EPIPE)
<< "Fail to write into " << *sock
<< ", error text= " << args.error_text
<< ": " << berror(error_code);
cntl->SetFailed(error_code, "Fail to write into %s: %s",
sock->description().c_str(),
args.error_text.c_str());
Stream::SetFailed(response_stream_ids, error_code,
"Fail to write into %s",
args.error_text.c_str());
return;
}

// Now it's ok the mark these server-side streams as connected as all the
// written user data would follower the RPC response.
// Reuse stream_ptr to avoid address first stream id again
if(stream_ptr) {
if (NULL != stream_ptr) {
((Stream*)stream_ptr->conn())->SetConnected();
}
for (size_t i = 1; i < response_stream_ids.size(); ++i) {
@@ -342,20 +384,19 @@ void SendRpcResponse(int64_t correlation_id,
} else{
// Have the risk of unlimited pending responses, in which case, tell
// users to set max_concurrency.
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (sock->Write(&res_buf, &wopt) != 0) {
const int errcode = errno;
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
cntl->SetFailed(errcode, "Fail to write into %s",
sock->description().c_str());
if (0 != error_code) {
LOG_IF(WARNING, error_code != EPIPE) << "Fail to write into " << *sock
<< ", error text= " << args.error_text
<< ": " << strerror(error_code);
cntl->SetFailed(error_code, "Fail to write into %s: %s",
sock->description().c_str(), args.error_text.c_str());
return;
}
}

if (span) {
// TODO: this is not sent
span->set_sent_us(butil::cpuwide_time_us());
span->set_sent_us(args.sent_us);
}
}

17 changes: 16 additions & 1 deletion src/brpc/policy/http_rpc_protocol.cpp
Original file line number Diff line number Diff line change
@@ -934,7 +934,12 @@ HttpResponseSender::~HttpResponseSender() {
int rc = -1;
// Have the risk of unlimited pending responses, in which case, tell
// users to set max_concurrency.
ResponseWriteInfo args;
bthread_id_t response_id;
CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten));
Socket::WriteOptions wopt;
wopt.id_wait = response_id;
wopt.notify_on_success = true;
wopt.ignore_eovercrowded = true;
if (is_http2) {
if (is_grpc) {
@@ -980,9 +985,19 @@ HttpResponseSender::~HttpResponseSender() {
cntl->SetFailed(errcode, "Fail to write into %s", socket->description().c_str());
return;
}

bthread_id_join(response_id);
concurrency_remover.set_sent_us(args.sent_us);
const int errcode = args.error_code;
if (0 != errcode) {
LOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *socket;
cntl->SetFailed(errcode, "Fail to write into %s", socket->description().c_str());
return;
}

if (span) {
// TODO: this is not sent
span->set_sent_us(butil::cpuwide_time_us());
span->set_sent_us(args.sent_us);
}
}

17 changes: 16 additions & 1 deletion src/brpc/policy/hulu_pbrpc_protocol.cpp
Original file line number Diff line number Diff line change
@@ -304,7 +304,12 @@ static void SendHuluResponse(int64_t correlation_id,

// Have the risk of unlimited pending responses, in which case, tell
// users to set max_concurrency.
ResponseWriteInfo args;
bthread_id_t response_id;
CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten));
Socket::WriteOptions wopt;
wopt.id_wait = response_id;
wopt.notify_on_success = true;
wopt.ignore_eovercrowded = true;
if (sock->Write(&res_buf, &wopt) != 0) {
const int errcode = errno;
@@ -313,9 +318,19 @@ static void SendHuluResponse(int64_t correlation_id,
sock->description().c_str());
return;
}

bthread_id_join(response_id);
concurrency_remover.set_sent_us(args.sent_us);
const int errcode = args.error_code;
if (0 != errcode) {
LOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
cntl->SetFailed(errcode, "Fail to write into %s", sock->description().c_str());
return;
}

if (span) {
// TODO: this is not sent
span->set_sent_us(butil::cpuwide_time_us());
span->set_sent_us(args.sent_us);
}
}

36 changes: 27 additions & 9 deletions src/brpc/policy/mongo_protocol.cpp
Original file line number Diff line number Diff line change
@@ -95,16 +95,34 @@ void SendMongoResponse::Run() {
res_buf.append(res.message());
}

if (!res_buf.empty()) {
// Have the risk of unlimited pending responses, in which case, tell
// users to set max_concurrency.
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (socket->Write(&res_buf, &wopt) != 0) {
PLOG(WARNING) << "Fail to write into " << *socket;
return;
}
if (res_buf.empty()) {
return;
}

// Have the risk of unlimited pending responses, in which case, tell
// users to set max_concurrency.
ResponseWriteInfo args;
bthread_id_t response_id;
CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten));
Socket::WriteOptions wopt;
wopt.id_wait = response_id;
wopt.notify_on_success = true;
wopt.ignore_eovercrowded = true;
wopt.ignore_eovercrowded = true;
if (socket->Write(&res_buf, &wopt) != 0) {
PLOG(WARNING) << "Fail to write into " << *socket;
return;
}

bthread_id_join(response_id);
concurrency_remover.set_sent_us(args.sent_us);
const int errcode = args.error_code;
if (0 != errcode) {
LOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *socket;
cntl.SetFailed(errcode, "Fail to write into %s", socket->description().c_str());
return;
}

}

ParseResult ParseMongoMessage(butil::IOBuf* source,
19 changes: 18 additions & 1 deletion src/brpc/policy/nshead_protocol.cpp
Original file line number Diff line number Diff line change
@@ -95,6 +95,7 @@ void NsheadClosure::Run() {
return;
}

int64_t sent_us = 0;
if (_do_respond) {
// response uses request's head as default.
// Notice that the response use request.head.log_id directly rather
@@ -112,7 +113,12 @@ void NsheadClosure::Run() {
write_buf.append(_response.body.movable());
// Have the risk of unlimited pending responses, in which case, tell
// users to set max_concurrency.
ResponseWriteInfo args;
bthread_id_t response_id;
CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten));
Socket::WriteOptions wopt;
wopt.id_wait = response_id;
wopt.notify_on_success = true;
wopt.ignore_eovercrowded = true;
if (sock->Write(&write_buf, &wopt) != 0) {
const int errcode = errno;
@@ -121,10 +127,21 @@ void NsheadClosure::Run() {
sock->description().c_str());
return;
}

bthread_id_join(response_id);
concurrency_remover.set_sent_us(args.sent_us);
const int errcode = args.error_code;
if (0 != errcode) {
LOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
_controller.SetFailed(errcode, "Fail to write into %s",
sock->description().c_str());
return;
}
sent_us = args.sent_us;
}
if (span) {
// TODO: this is not sent
span->set_sent_us(butil::cpuwide_time_us());
span->set_sent_us(0 == sent_us ? butil::cpuwide_time_us() : sent_us);
}
}

18 changes: 17 additions & 1 deletion src/brpc/policy/sofa_pbrpc_protocol.cpp
Original file line number Diff line number Diff line change
@@ -281,7 +281,12 @@ static void SendSofaResponse(int64_t correlation_id,
}
// Have the risk of unlimited pending responses, in which case, tell
// users to set max_concurrency.
ResponseWriteInfo args;
bthread_id_t response_id;
CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten));
Socket::WriteOptions wopt;
wopt.id_wait = response_id;
wopt.notify_on_success = true;
wopt.ignore_eovercrowded = true;
if (sock->Write(&res_buf, &wopt) != 0) {
const int errcode = errno;
@@ -290,9 +295,20 @@ static void SendSofaResponse(int64_t correlation_id,
sock->description().c_str());
return;
}

bthread_id_join(response_id);
concurrency_remover.set_sent_us(args.sent_us);
const int errcode = args.error_code;
if (0 != errcode) {
LOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
cntl->SetFailed(errcode, "Fail to write into %s",
sock->description().c_str());
return;
}

if (span) {
// TODO: this is not sent
span->set_sent_us(butil::cpuwide_time_us());
span->set_sent_us(args.sent_us);
}
}

6 changes: 5 additions & 1 deletion src/brpc/policy/streaming_rpc_protocol.cpp
Original file line number Diff line number Diff line change
@@ -154,7 +154,9 @@ void SendStreamClose(Socket *sock, int64_t remote_stream_id,
}

int SendStreamData(Socket* sock, const butil::IOBuf* data,
int64_t remote_stream_id, int64_t source_stream_id) {
int64_t remote_stream_id,
int64_t source_stream_id,
bthread_id_t response_id) {
CHECK(sock != NULL);
StreamFrameMeta fm;
fm.set_stream_id(remote_stream_id);
@@ -164,6 +166,8 @@ int SendStreamData(Socket* sock, const butil::IOBuf* data,
butil::IOBuf out;
PackStreamMessage(&out, fm, data);
Socket::WriteOptions wopt;
wopt.id_wait = response_id;
wopt.notify_on_success = true;
wopt.ignore_eovercrowded = true;
return sock->Write(&out, &wopt);
}
5 changes: 3 additions & 2 deletions src/brpc/policy/streaming_rpc_protocol.h
Original file line number Diff line number Diff line change
@@ -19,10 +19,10 @@
#ifndef BRPC_STREAMING_RPC_PROTOCOL_H
#define BRPC_STREAMING_RPC_PROTOCOL_H

#include "bthread/types.h"
#include "brpc/protocol.h"
#include "brpc/streaming_rpc_meta.pb.h"


namespace brpc {
namespace policy {

@@ -41,7 +41,8 @@ void SendStreamClose(Socket *sock, int64_t remote_stream_id,
int64_t source_stream_id);

int SendStreamData(Socket* sock, const butil::IOBuf* data,
int64_t remote_stream_id, int64_t source_stream_id);
int64_t remote_stream_id,
int64_t source_stream_id, bthread_id_t);

} // namespace policy
} // namespace brpc