Skip to content

Commit 7ca337d

Browse files
author
jiazheng.jia
committed
add global priority queue
1 parent cce48a0 commit 7ca337d

File tree

5 files changed

+23
-36
lines changed

5 files changed

+23
-36
lines changed

src/bthread/parking_lot.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class BAIDU_CACHELINE_ALIGNMENT ParkingLot {
4141
int val;
4242
};
4343

44-
ParkingLot() : _pending_signal(0){}
44+
ParkingLot() : _pending_signal(0) {}
4545

4646
// Wake up at most `num_task' workers.
4747
// Returns #workers woken up.

src/bthread/task_control.cpp

+15-26
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,8 @@ TaskControl::TaskControl()
183183
, _signal_per_second(&_cumulated_signal_count)
184184
, _status(print_rq_sizes_in_the_tc, this)
185185
, _nbthreads("bthread_count")
186+
, _priority_qs(FLAGS_task_group_ntags)
186187
, _pl(FLAGS_task_group_ntags)
187-
, _epoll_tid_states(FLAGS_task_group_ntags)
188188
{}
189189

190190
int TaskControl::init(int concurrency) {
@@ -208,6 +208,10 @@ int TaskControl::init(int concurrency) {
208208
_tagged_worker_usage_second.push_back(new bvar::PerSecond<bvar::PassiveStatus<double>>(
209209
"bthread_worker_usage", tag_str, _tagged_cumulated_worker_time[i], 1));
210210
_tagged_nbthreads.push_back(new bvar::Adder<int64_t>("bthread_count", tag_str));
211+
if (_priority_qs[i].init(BTHREAD_MAX_CONCURRENCY) != 0) {
212+
LOG(FATAL) << "Fail to init _priority_q";
213+
return -1;
214+
}
211215
}
212216

213217
// Make sure TimerThread is ready.
@@ -431,15 +435,9 @@ int TaskControl::_destroy_group(TaskGroup* g) {
431435

432436
bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
433437
auto tag = tls_task_group->tag();
434-
// epoll tid should be stolen first.
435-
for (auto &epoll_state : _epoll_tid_states[tag]) {
436-
bool expected_state = true;
437-
if (epoll_state.second.compare_exchange_strong(
438-
expected_state, false, butil::memory_order_seq_cst,
439-
butil::memory_order_relaxed)) {
440-
*tid = epoll_state.first;
441-
return true;
442-
}
438+
439+
if (_priority_qs[tag].steal(tid)) {
440+
return true;
443441
}
444442

445443
// 1: Acquiring fence is paired with releasing fence in _add_group to
@@ -482,30 +480,22 @@ void TaskControl::signal_task(int num_task, bthread_tag_t tag) {
482480
if (num_task > 2) {
483481
num_task = 2;
484482
}
485-
if (ParkingLot::_waiting_count.load(std::memory_order_acquire) == 0) {
486-
if (FLAGS_bthread_min_concurrency > 0 &&
487-
_concurrency.load(butil::memory_order_relaxed) < FLAGS_bthread_concurrency) {
488-
// TODO: Reduce this lock
489-
BAIDU_SCOPED_LOCK(g_task_control_mutex);
490-
if (_concurrency.load(butil::memory_order_acquire) < FLAGS_bthread_concurrency) {
491-
add_workers(1, tag);
492-
}
493-
} else {
494-
return;
495-
}
496-
}
497483
auto& pl = tag_pl(tag);
498484
int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM;
499-
num_task -= pl[start_index].signal(1);
500-
if (num_task > 0) {
485+
// WARNING: This allow some bad case happen when wait_count is not accurente.
486+
auto wait_count = ParkingLot::_waiting_count.load(butil::memory_order_relaxed);
487+
if (wait_count > 0) {
488+
num_task -= pl[start_index].signal(1);
489+
}
490+
if (num_task > 0 && wait_count > 0) {
501491
for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) {
502492
if (++start_index >= PARKING_LOT_NUM) {
503493
start_index = 0;
504494
}
505495
num_task -= pl[start_index].signal(1);
506496
}
507497
}
508-
if (num_task > 0 &&
498+
if (num_task > 0 && wait_count >0 &&
509499
FLAGS_bthread_min_concurrency > 0 && // test min_concurrency for performance
510500
_concurrency.load(butil::memory_order_relaxed) < FLAGS_bthread_concurrency) {
511501
// TODO: Reduce this lock
@@ -600,7 +590,6 @@ bvar::LatencyRecorder* TaskControl::create_exposed_pending_time() {
600590
}
601591

602592
void TaskControl::set_group_epoll_tid(bthread_tag_t tag, bthread_t tid) {
603-
_epoll_tid_states[tag][tid] = false;
604593
auto groups = tag_group(tag);
605594
const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_acquire);
606595
for (size_t i = 0; i < ngroup; i++) {

src/bthread/task_control.h

+3-5
Original file line numberDiff line numberDiff line change
@@ -100,15 +100,14 @@ friend bthread_t init_for_pthread_stack_trace();
100100
// Only deal once when init epoll bthread.
101101
void set_group_epoll_tid(bthread_tag_t tag, bthread_t tid);
102102

103-
void epoll_waiting(bthread_tag_t tag, bthread_t tid) {
104-
_epoll_tid_states[tag][tid].store(true, butil::memory_order_release);
103+
void push_priority_q(bthread_tag_t tag, bthread_t tid) {
104+
_priority_qs[tag].push(tid);
105105
}
106106

107107
private:
108108
typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;
109109
static const int PARKING_LOT_NUM = 4;
110110
typedef std::array<ParkingLot, PARKING_LOT_NUM> TaggedParkingLot;
111-
typedef std::unordered_map<bthread_t, butil::atomic<bool>> EpollTidState;
112111
// Add/Remove a TaskGroup.
113112
// Returns 0 on success, -1 otherwise.
114113
int _add_group(TaskGroup*, bthread_tag_t tag);
@@ -161,14 +160,13 @@ friend bthread_t init_for_pthread_stack_trace();
161160
std::vector<bvar::PassiveStatus<double>*> _tagged_cumulated_worker_time;
162161
std::vector<bvar::PerSecond<bvar::PassiveStatus<double>>*> _tagged_worker_usage_second;
163162
std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads;
163+
std::vector<WorkStealingQueue<bthread_t>> _priority_qs;
164164

165165
std::vector<TaggedParkingLot> _pl;
166166

167167
#ifdef BRPC_BTHREAD_TRACER
168168
TaskTracer _task_tracer;
169169
#endif // BRPC_BTHREAD_TRACER
170-
171-
std::vector<EpollTidState> _epoll_tid_states;
172170
};
173171

174172
inline bvar::LatencyRecorder& TaskControl::exposed_pending_time() {

src/bthread/task_group.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,7 @@ int TaskGroup::start_foreground(TaskGroup** pg,
436436
// NOSIGNAL affects current task, not the new task.
437437
RemainedFn fn = NULL;
438438
if (g->cur_epoll_tid()) {
439-
fn = ready_to_run_epoll;
439+
fn = priority_to_run;
440440
} else if (g->current_task()->about_to_quit) {
441441
fn = ready_to_run_in_worker_ignoresignal;
442442
} else {
@@ -809,9 +809,9 @@ void TaskGroup::ready_to_run_in_worker_ignoresignal(void* args_in) {
809809
return tls_task_group->push_rq(args->meta->tid);
810810
}
811811

812-
void TaskGroup::ready_to_run_epoll(void* args_in) {
812+
void TaskGroup::priority_to_run(void* args_in) {
813813
ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in);
814-
return tls_task_group->control()->epoll_waiting(args->tag, args->meta->tid);
814+
return tls_task_group->control()->push_priority_q(args->tag, args->meta->tid);
815815
}
816816

817817
struct SleepArgs {

src/bthread/task_group.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ friend class TaskControl;
229229
};
230230
static void ready_to_run_in_worker(void*);
231231
static void ready_to_run_in_worker_ignoresignal(void*);
232-
static void ready_to_run_epoll(void*);
232+
static void priority_to_run(void*);
233233

234234
// Wait for a task to run.
235235
// Returns true on success, false is treated as permanent error and the

0 commit comments

Comments
 (0)