Skip to content

Commit 86f6795

Browse files
committed
Only update real sent time to span
1 parent 55b73bf commit 86f6795

11 files changed

+71
-146
lines changed

.github/actions/init-ut-make-config/action.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@ runs:
99
shell: bash
1010
- run: sudo apt-get update && sudo apt-get install -y libgtest-dev cmake gdb libstdc++6-9-dbg && cd /usr/src/gtest && sudo cmake . && sudo make -j ${{env.proc_num}} && sudo mv lib/libgtest* /usr/lib/
1111
shell: bash
12-
- run: sh config_brpc.sh --headers="/libunwind/include /usr/include" --libs="/libunwind/lib /usr/lib /usr/lib64" --nodebugsymbols ${{inputs.options}}
12+
- run: sh config_brpc.sh --headers="/libunwind/include /usr/include" --libs="/libunwind/lib /usr/lib /usr/lib64" ${{inputs.options}}
1313
shell: bash

src/brpc/details/method_status.cpp

+2-8
Original file line numberDiff line numberDiff line change
@@ -149,22 +149,16 @@ void MethodStatus::SetConcurrencyLimiter(ConcurrencyLimiter* cl) {
149149
_cl.reset(cl);
150150
}
151151

152-
int HandleResponseWritten(bthread_id_t id, void* data, int error_code,
153-
const std::string& error_text) {
152+
int HandleResponseWritten(bthread_id_t id, void* data, int /*error_code*/) {
154153
auto args = static_cast<ResponseWriteInfo*>(data);
155-
args->error_code = error_code;
156-
args->error_text = error_text;
157154
args->sent_us = butil::cpuwide_time_us();
158155
CHECK_EQ(0, bthread_id_unlock_and_destroy(id));
159156
return 0;
160157
}
161158

162159
ConcurrencyRemover::~ConcurrencyRemover() {
163160
if (_status) {
164-
if (_sent_us < _received_us) {
165-
_sent_us = butil::cpuwide_time_us();
166-
}
167-
_status->OnResponded(_c->ErrorCode(), _sent_us - _received_us);
161+
_status->OnResponded(_c->ErrorCode(), butil::cpuwide_time_us() - _received_us);
168162
_status = NULL;
169163
}
170164
ServerPrivateAccessor(_c->server()).RemoveConcurrency(_c);

src/brpc/details/method_status.h

+1-6
Original file line numberDiff line numberDiff line change
@@ -76,27 +76,22 @@ friend class Server;
7676
};
7777

7878
struct ResponseWriteInfo {
79-
int error_code{0};
80-
std::string error_text;
8179
int64_t sent_us{0};
8280
};
8381

84-
int HandleResponseWritten(bthread_id_t id, void* data, int error_code,
85-
const std::string& error_text);
82+
int HandleResponseWritten(bthread_id_t id, void* data, int error_code);
8683

8784
class ConcurrencyRemover {
8885
public:
8986
ConcurrencyRemover(MethodStatus* status, Controller* c, int64_t received_us)
9087
: _status(status) , _c(c) , _received_us(received_us) {}
9188
~ConcurrencyRemover();
9289

93-
void set_sent_us(int64_t sent_us) { _sent_us = sent_us; }
9490
private:
9591
DISALLOW_COPY_AND_ASSIGN(ConcurrencyRemover);
9692
MethodStatus* _status;
9793
Controller* _c;
9894
int64_t _received_us;
99-
int64_t _sent_us{0};
10095
};
10196

10297
inline bool MethodStatus::OnRequested(int* rejected_cc, Controller* cntl) {

src/brpc/policy/baidu_rpc_protocol.cpp

+17-45
Original file line numberDiff line numberDiff line change
@@ -197,19 +197,15 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
197197
const google::protobuf::Message* req = NULL == messages ? NULL : messages->Request();
198198
const google::protobuf::Message* res = NULL == messages ? NULL : messages->Response();
199199

200-
ResponseWriteInfo args;
201-
202200
// Recycle resources at the end of this function.
203201
BRPC_SCOPE_EXIT {
204202
{
205203
// Remove concurrency and record latency at first.
206204
ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
207-
concurrency_remover.set_sent_us(args.sent_us);
208205
}
209206

210207
std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
211208

212-
213209
if (NULL == messages) {
214210
return;
215211
}
@@ -305,12 +301,13 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
305301
}
306302
}
307303

304+
ResponseWriteInfo args;
305+
bthread_id_t response_id = INVALID_BTHREAD_ID;
308306
if (span) {
309307
span->set_response_size(res_buf.size());
308+
CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten));
310309
}
311310

312-
bthread_id_t response_id;
313-
CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten));
314311
// Send rpc response over stream even if server side failed to create
315312
// stream for some reason.
316313
if (cntl->has_remote_stream()) {
@@ -328,45 +325,13 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
328325
Stream::SetFailed(response_stream_ids, error_code,
329326
"Fail to write into %s",
330327
sock->description().c_str());
331-
}
332-
} else{
333-
// Have the risk of unlimited pending responses, in which case, tell
334-
// users to set max_concurrency.
335-
Socket::WriteOptions wopt;
336-
wopt.id_wait = response_id;
337-
wopt.notify_on_success = true;
338-
wopt.ignore_eovercrowded = true;
339-
if (sock->Write(&res_buf, &wopt) != 0) {
340-
const int errcode = errno;
341-
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
342-
cntl->SetFailed(errcode, "Fail to write into %s",
343-
sock->description().c_str());
344-
return;
345-
}
346-
}
347-
348-
bthread_id_join(response_id);
349-
350-
error_code = args.error_code;
351-
if (cntl->has_remote_stream()) {
352-
if (0 != error_code) {
353-
LOG_IF(WARNING, error_code != EPIPE)
354-
<< "Fail to write into " << *sock
355-
<< ", error text= " << args.error_text
356-
<< ": " << berror(error_code);
357-
cntl->SetFailed(error_code, "Fail to write into %s: %s",
358-
sock->description().c_str(),
359-
args.error_text.c_str());
360-
Stream::SetFailed(response_stream_ids, error_code,
361-
"Fail to write into %s",
362-
args.error_text.c_str());
363328
return;
364329
}
365330

366331
// Now it's ok the mark these server-side streams as connected as all the
367332
// written user data would follower the RPC response.
368333
// Reuse stream_ptr to avoid address first stream id again
369-
if (NULL != stream_ptr) {
334+
if (stream_ptr) {
370335
((Stream*)stream_ptr->conn())->SetConnected();
371336
}
372337
for (size_t i = 1; i < response_stream_ids.size(); ++i) {
@@ -384,17 +349,24 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
384349
} else{
385350
// Have the risk of unlimited pending responses, in which case, tell
386351
// users to set max_concurrency.
387-
if (0 != error_code) {
388-
LOG_IF(WARNING, error_code != EPIPE) << "Fail to write into " << *sock
389-
<< ", error text= " << args.error_text
390-
<< ": " << strerror(error_code);
391-
cntl->SetFailed(error_code, "Fail to write into %s: %s",
392-
sock->description().c_str(), args.error_text.c_str());
352+
Socket::WriteOptions wopt;
353+
wopt.ignore_eovercrowded = true;
354+
if (INVALID_BTHREAD_ID != response_id) {
355+
wopt.id_wait = response_id;
356+
wopt.notify_on_success = true;
357+
}
358+
if (sock->Write(&res_buf, &wopt) != 0) {
359+
const int errcode = errno;
360+
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
361+
cntl->SetFailed(errcode, "Fail to write into %s",
362+
sock->description().c_str());
393363
return;
394364
}
395365
}
396366

397367
if (span) {
368+
bthread_id_join(response_id);
369+
// Do not care about the result of background writing.
398370
// TODO: this is not sent
399371
span->set_sent_us(args.sent_us);
400372
}

src/brpc/policy/http_rpc_protocol.cpp

+8-13
Original file line numberDiff line numberDiff line change
@@ -935,12 +935,14 @@ HttpResponseSender::~HttpResponseSender() {
935935
// Have the risk of unlimited pending responses, in which case, tell
936936
// users to set max_concurrency.
937937
ResponseWriteInfo args;
938-
bthread_id_t response_id;
939-
CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten));
940938
Socket::WriteOptions wopt;
941-
wopt.id_wait = response_id;
942-
wopt.notify_on_success = true;
943939
wopt.ignore_eovercrowded = true;
940+
bthread_id_t response_id = INVALID_BTHREAD_ID;
941+
if (span) {
942+
CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten));
943+
wopt.id_wait = response_id;
944+
wopt.notify_on_success = true;
945+
}
944946
if (is_http2) {
945947
if (is_grpc) {
946948
// Append compressed and length before body
@@ -986,16 +988,9 @@ HttpResponseSender::~HttpResponseSender() {
986988
return;
987989
}
988990

989-
bthread_id_join(response_id);
990-
concurrency_remover.set_sent_us(args.sent_us);
991-
const int errcode = args.error_code;
992-
if (0 != errcode) {
993-
LOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *socket;
994-
cntl->SetFailed(errcode, "Fail to write into %s", socket->description().c_str());
995-
return;
996-
}
997-
998991
if (span) {
992+
bthread_id_join(response_id);
993+
// Do not care about the result of background writing.
999994
// TODO: this is not sent
1000995
span->set_sent_us(args.sent_us);
1001996
}

src/brpc/policy/hulu_pbrpc_protocol.cpp

+8-13
Original file line numberDiff line numberDiff line change
@@ -305,12 +305,14 @@ static void SendHuluResponse(int64_t correlation_id,
305305
// Have the risk of unlimited pending responses, in which case, tell
306306
// users to set max_concurrency.
307307
ResponseWriteInfo args;
308-
bthread_id_t response_id;
309-
CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten));
310308
Socket::WriteOptions wopt;
311-
wopt.id_wait = response_id;
312-
wopt.notify_on_success = true;
313309
wopt.ignore_eovercrowded = true;
310+
bthread_id_t response_id = INVALID_BTHREAD_ID;
311+
if (span) {
312+
CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten));
313+
wopt.id_wait = response_id;
314+
wopt.notify_on_success = true;
315+
}
314316
if (sock->Write(&res_buf, &wopt) != 0) {
315317
const int errcode = errno;
316318
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
@@ -319,16 +321,9 @@ static void SendHuluResponse(int64_t correlation_id,
319321
return;
320322
}
321323

322-
bthread_id_join(response_id);
323-
concurrency_remover.set_sent_us(args.sent_us);
324-
const int errcode = args.error_code;
325-
if (0 != errcode) {
326-
LOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
327-
cntl->SetFailed(errcode, "Fail to write into %s", sock->description().c_str());
328-
return;
329-
}
330-
331324
if (span) {
325+
bthread_id_join(response_id);
326+
// Do not care about the result of background writing.
332327
// TODO: this is not sent
333328
span->set_sent_us(args.sent_us);
334329
}

src/brpc/policy/mongo_protocol.cpp

+9-27
Original file line numberDiff line numberDiff line change
@@ -95,34 +95,16 @@ void SendMongoResponse::Run() {
9595
res_buf.append(res.message());
9696
}
9797

98-
if (res_buf.empty()) {
99-
return;
100-
}
101-
102-
// Have the risk of unlimited pending responses, in which case, tell
103-
// users to set max_concurrency.
104-
ResponseWriteInfo args;
105-
bthread_id_t response_id;
106-
CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten));
107-
Socket::WriteOptions wopt;
108-
wopt.id_wait = response_id;
109-
wopt.notify_on_success = true;
110-
wopt.ignore_eovercrowded = true;
111-
wopt.ignore_eovercrowded = true;
112-
if (socket->Write(&res_buf, &wopt) != 0) {
113-
PLOG(WARNING) << "Fail to write into " << *socket;
114-
return;
115-
}
116-
117-
bthread_id_join(response_id);
118-
concurrency_remover.set_sent_us(args.sent_us);
119-
const int errcode = args.error_code;
120-
if (0 != errcode) {
121-
LOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *socket;
122-
cntl.SetFailed(errcode, "Fail to write into %s", socket->description().c_str());
123-
return;
98+
if (!res_buf.empty()) {
99+
// Have the risk of unlimited pending responses, in which case, tell
100+
// users to set max_concurrency.
101+
Socket::WriteOptions wopt;
102+
wopt.ignore_eovercrowded = true;
103+
if (socket->Write(&res_buf, &wopt) != 0) {
104+
PLOG(WARNING) << "Fail to write into " << *socket;
105+
return;
106+
}
124107
}
125-
126108
}
127109

128110
ParseResult ParseMongoMessage(butil::IOBuf* source,

src/brpc/policy/nshead_protocol.cpp

+10-13
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,14 @@ void NsheadClosure::Run() {
114114
// Have the risk of unlimited pending responses, in which case, tell
115115
// users to set max_concurrency.
116116
ResponseWriteInfo args;
117-
bthread_id_t response_id;
118-
CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten));
119117
Socket::WriteOptions wopt;
120-
wopt.id_wait = response_id;
121-
wopt.notify_on_success = true;
122118
wopt.ignore_eovercrowded = true;
119+
bthread_id_t response_id = INVALID_BTHREAD_ID;
120+
if (span) {
121+
CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten));
122+
wopt.id_wait = response_id;
123+
wopt.notify_on_success = true;
124+
}
123125
if (sock->Write(&write_buf, &wopt) != 0) {
124126
const int errcode = errno;
125127
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
@@ -128,16 +130,11 @@ void NsheadClosure::Run() {
128130
return;
129131
}
130132

131-
bthread_id_join(response_id);
132-
concurrency_remover.set_sent_us(args.sent_us);
133-
const int errcode = args.error_code;
134-
if (0 != errcode) {
135-
LOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
136-
_controller.SetFailed(errcode, "Fail to write into %s",
137-
sock->description().c_str());
138-
return;
133+
if (span) {
134+
bthread_id_join(response_id);
135+
// Do not care about the result of background writing.
136+
sent_us = args.sent_us;
139137
}
140-
sent_us = args.sent_us;
141138
}
142139
if (span) {
143140
// TODO: this is not sent

src/brpc/policy/sofa_pbrpc_protocol.cpp

+8-14
Original file line numberDiff line numberDiff line change
@@ -282,12 +282,14 @@ static void SendSofaResponse(int64_t correlation_id,
282282
// Have the risk of unlimited pending responses, in which case, tell
283283
// users to set max_concurrency.
284284
ResponseWriteInfo args;
285-
bthread_id_t response_id;
286-
CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten));
287285
Socket::WriteOptions wopt;
288-
wopt.id_wait = response_id;
289-
wopt.notify_on_success = true;
290286
wopt.ignore_eovercrowded = true;
287+
bthread_id_t response_id = INVALID_BTHREAD_ID;
288+
if (span) {
289+
CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten));
290+
wopt.id_wait = response_id;
291+
wopt.notify_on_success = true;
292+
}
291293
if (sock->Write(&res_buf, &wopt) != 0) {
292294
const int errcode = errno;
293295
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
@@ -296,17 +298,9 @@ static void SendSofaResponse(int64_t correlation_id,
296298
return;
297299
}
298300

299-
bthread_id_join(response_id);
300-
concurrency_remover.set_sent_us(args.sent_us);
301-
const int errcode = args.error_code;
302-
if (0 != errcode) {
303-
LOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
304-
cntl->SetFailed(errcode, "Fail to write into %s",
305-
sock->description().c_str());
306-
return;
307-
}
308-
309301
if (span) {
302+
bthread_id_join(response_id);
303+
// Do not care about the result of background writing.
310304
// TODO: this is not sent
311305
span->set_sent_us(args.sent_us);
312306
}

src/brpc/policy/streaming_rpc_protocol.cpp

+5-4
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,7 @@ void SendStreamClose(Socket *sock, int64_t remote_stream_id,
154154
}
155155

156156
int SendStreamData(Socket* sock, const butil::IOBuf* data,
157-
int64_t remote_stream_id,
158-
int64_t source_stream_id,
157+
int64_t remote_stream_id, int64_t source_stream_id,
159158
bthread_id_t response_id) {
160159
CHECK(sock != NULL);
161160
StreamFrameMeta fm;
@@ -166,8 +165,10 @@ int SendStreamData(Socket* sock, const butil::IOBuf* data,
166165
butil::IOBuf out;
167166
PackStreamMessage(&out, fm, data);
168167
Socket::WriteOptions wopt;
169-
wopt.id_wait = response_id;
170-
wopt.notify_on_success = true;
168+
if (INVALID_BTHREAD_ID != response_id) {
169+
wopt.id_wait = response_id;
170+
wopt.notify_on_success = true;
171+
}
171172
wopt.ignore_eovercrowded = true;
172173
return sock->Write(&out, &wopt);
173174
}

0 commit comments

Comments
 (0)