Skip to content

Commit 2ffe5d8

Browse files
committed
Don't wait for threads upon shutting down
Threads may be performing blocking system calls such as reading from STDIN. Waiting for such threads upon shutdown can result in the program hanging. To fix this, we start OS threads _without_ using a thread scope and _don't_ wait for them to join when shutting down. This fixes #805. Changelog: fixed
1 parent 4298b31 commit 2ffe5d8

File tree

1 file changed

+56
-43
lines changed

1 file changed

+56
-43
lines changed

rt/src/scheduler/process.rs

+56-43
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@ use crate::context;
44
use crate::process::{Process, ProcessPointer, Task};
55
use crate::scheduler::pin_thread_to_core;
66
use crate::stack::StackPool;
7-
use crate::state::State;
7+
use crate::state::{RcState, State};
88
use crossbeam_queue::ArrayQueue;
99
use crossbeam_utils::atomic::AtomicCell;
10-
use crossbeam_utils::thread::scope;
1110
use rand::rngs::ThreadRng;
1211
use rand::thread_rng;
1312
use std::cell::Cell;
@@ -18,7 +17,7 @@ use std::ops::Drop;
1817
use std::ptr::null_mut;
1918
use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU64, Ordering};
2019
use std::sync::{Condvar, Mutex};
21-
use std::thread::sleep;
20+
use std::thread::{sleep, Builder as ThreadBuilder};
2221
use std::time::{Duration, Instant};
2322

2423
/// The ID of the main thread/queue.
@@ -1007,54 +1006,68 @@ impl Scheduler {
10071006
self.pool.terminate();
10081007
}
10091008

1010-
pub(crate) fn run(&self, state: &State, process: ProcessPointer) {
1009+
pub(crate) fn run(&self, state: &RcState, process: ProcessPointer) {
10111010
let pollers = state.network_pollers.len();
10121011
let cores = state.cores as usize;
1013-
let _ = scope(move |s| {
1014-
s.builder()
1012+
1013+
// We deliberately don't join threads as this may result in the program
1014+
// hanging during shutdown if one or more threads are performing a
1015+
// blocking system call (e.g. reading from STDIN).
1016+
{
1017+
let pool = self.pool.clone();
1018+
1019+
ThreadBuilder::new()
10151020
.name("proc monitor".to_string())
1016-
.spawn(move |_| Monitor::new(&self.pool).run())
1017-
.unwrap();
1021+
.spawn(move || Monitor::new(&pool).run())
1022+
.expect("failed to start the process monitor thread");
1023+
}
10181024

1019-
s.builder()
1025+
{
1026+
let state = state.clone();
1027+
1028+
ThreadBuilder::new()
10201029
.name("epoch".to_string())
1021-
.spawn(move |_| {
1022-
epoch_loop(state);
1030+
.spawn(move || {
1031+
epoch_loop(&state);
10231032
})
1024-
.unwrap();
1025-
1026-
for id in 1..self.primary {
1027-
let poll_id = id % pollers;
1028-
1029-
s.builder()
1030-
.name(format!("proc {}", id))
1031-
.spawn(move |_| {
1032-
pin_thread_to_core(id % cores);
1033-
Thread::new(id, poll_id, self.pool.clone()).run(state)
1034-
})
1035-
.expect("failed to start a process thread");
1036-
}
1033+
.expect("failed to start the epoch thread");
1034+
}
10371035

1038-
for id in 0..self.backup {
1039-
let poll_id = id % pollers;
1036+
for id in 1..self.primary {
1037+
let poll_id = id % pollers;
1038+
let state = state.clone();
1039+
let pool = self.pool.clone();
10401040

1041-
s.builder()
1042-
.name(format!("backup {}", id))
1043-
.spawn(move |_| {
1044-
pin_thread_to_core(id % cores);
1045-
Thread::backup(poll_id, self.pool.clone()).run(state)
1046-
})
1047-
.expect("failed to start a backup thread");
1048-
}
1041+
ThreadBuilder::new()
1042+
.name(format!("proc {}", id))
1043+
.spawn(move || {
1044+
pin_thread_to_core(id % cores);
1045+
Thread::new(id, poll_id, pool).run(&state)
1046+
})
1047+
.expect("failed to start a process thread");
1048+
}
10491049

1050-
self.pool.schedule(process);
1050+
for id in 0..self.backup {
1051+
let poll_id = id % pollers;
1052+
let state = state.clone();
1053+
let pool = self.pool.clone();
10511054

1052-
// The current thread is used for running the main process. This
1053-
// makes it possible for this process to interface with libraries
1054-
// that require the same thread to be used for all operations (e.g.
1055-
// most GUI libraries).
1056-
Thread::new(0, 0, self.pool.clone()).run_main(state);
1057-
});
1055+
ThreadBuilder::new()
1056+
.name(format!("backup {}", id))
1057+
.spawn(move || {
1058+
pin_thread_to_core(id % cores);
1059+
Thread::backup(poll_id, pool).run(&state)
1060+
})
1061+
.expect("failed to start a backup thread");
1062+
}
1063+
1064+
self.pool.schedule(process);
1065+
1066+
// The current thread is used for running the main process. This
1067+
// makes it possible for this process to interface with libraries
1068+
// that require the same thread to be used for all operations (e.g.
1069+
// most GUI libraries).
1070+
Thread::new(0, 0, self.pool.clone()).run_main(state);
10581071
}
10591072
}
10601073

@@ -1064,7 +1077,7 @@ mod tests {
10641077
use crate::test::{
10651078
empty_process_type, new_process, new_process_with_message, setup,
10661079
};
1067-
use std::thread::sleep;
1080+
use std::thread::{scope, sleep};
10681081

10691082
unsafe extern "system" fn method(data: *mut u8) {
10701083
let mut proc = ProcessPointer::new(data as _);
@@ -1408,7 +1421,7 @@ mod tests {
14081421
let scheduler = Scheduler::new(1, 1, 32);
14091422
let monitor = Monitor::new(&scheduler.pool);
14101423
let _ = scope(|s| {
1411-
s.spawn(|_| monitor.deep_sleep());
1424+
s.spawn(|| monitor.deep_sleep());
14121425

14131426
while scheduler.pool.monitor.status.load()
14141427
!= MonitorStatus::Sleeping

0 commit comments

Comments
 (0)