Skip to content

Commit 47803fb

Browse files
zhang2014BohuTANG
andauthored
refactor(query): disallowed tokio spawn and block_on (databendlabs#14639)
* refactor(query): disallowed thread::spawn * refactor(query): disallowed tokio::Handle::spawn * refactor(query): disallowed tokio::Handle::spawn_blocking * refactor(query): disallowed tokio block_on * refactor(query): disallowed tokio block_on * refactor(query): disallowed tokio block_on * refactor(query): location futrue * refactor(query): location futrue --------- Co-authored-by: Bohu <[email protected]>
1 parent 7ab90cc commit 47803fb

File tree

18 files changed

+180
-185
lines changed

18 files changed

+180
-185
lines changed

Cargo.lock

-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

clippy.toml

+18
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,28 @@ disallowed-methods = [
66
{ path = "num_traits::sign::Signed::signum", reason = "This returns 1.0 for 0.0 but 0 for 0." },
77
{ path = "tokio::spawn", reason = "Please use `databend_common_base::runtime::spawn` instead." },
88
{ path = "tokio::task::spawn", reason = "Please use `databend_common_base::runtime::spawn` instead." },
9+
{ path = "tokio::runtime::Handle::spawn", reason = "Please use `databend_common_base::runtime::spawn` instead." },
10+
{ path = "tokio::runtime::Runtime::spawn", reason = "Please use `databend_common_base::runtime::Runtime::spawn` instead." },
11+
12+
{ path = "tokio::task::spawn_local", reason = "Please use `databend_common_base::runtime::spawn_local` instead." },
13+
14+
{ path = "tokio::runtime::Handle::block_on", reason = "Please use `databend_common_base::runtime::block_on` instead." },
15+
# { path = "tokio::runtime::Runtime::block_on", reason = "Please use `databend_common_base::runtime::Runtime::block_on` instead." },
16+
17+
{ path = "tokio::task::spawn_blocking", reason = "Please use `databend_common_base::runtime::spawn_blocking` instead." },
18+
{ path = "tokio::runtime::Handle::spawn_blocking", reason = "Please use `databend_common_base::runtime::spawn_blocking` instead." },
19+
{ path = "tokio::runtime::Runtime::spawn_blocking", reason = "Please use `databend_common_base::runtime::Runtime::spawn_blocking` instead." },
20+
21+
{ path = "std::thread::spawn", reason = "Please use `databend_common_base::runtime::Thread::spawn` instead." },
922
]
1023

1124
disallowed-types = [
1225
{ path = "once_cell::sync::Lazy", reason = "Please use `std::sync::LazyLock` instead." },
26+
27+
{ path = "tokio::task::Builder", reason = "Disallowed types, please remove it." },
28+
{ path = "tokio::task::LocalSet", reason = "Disallowed types, please remove it." },
29+
{ path = "tokio::task::join_set::JoinSet", reason = "Disallowed types, please remove it." },
30+
{ path = "tokio::task::join_set::Builder", reason = "Disallowed types, please remove it." },
1331
]
1432

1533
disallowed-macros = [

src/binaries/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ databend-storages-common-table-meta = { path = "../query/storages/common/table_m
5858
# Crates.io dependencies
5959
anyerror = { workspace = true }
6060
anyhow = { workspace = true }
61-
async-backtrace = { workspace = true }
6261
chrono = { workspace = true }
6362
clap = { workspace = true }
6463
futures = { workspace = true }

src/binaries/query/ee_main.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ fn main() {
3737
std::process::exit(cause.code() as i32);
3838
}
3939
Ok(rt) => {
40-
if let Err(cause) = rt.block_on(async_backtrace::location!().frame(main_entrypoint())) {
40+
if let Err(cause) = rt.block_on(main_entrypoint()) {
4141
eprintln!("Databend Query start failure, cause: {:?}", cause);
4242
std::process::exit(cause.code() as i32);
4343
}

src/binaries/query/oss_main.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ fn main() {
3838
std::process::exit(cause.code() as i32);
3939
}
4040
Ok(rt) => {
41-
if let Err(cause) = rt.block_on(async_backtrace::location!().frame(main_entrypoint())) {
41+
if let Err(cause) = rt.block_on(main_entrypoint()) {
4242
eprintln!("Databend Query start failure, cause: {:?}", cause);
4343
std::process::exit(cause.code() as i32);
4444
}

src/common/base/src/runtime/mod.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ mod global_runtime;
1919
mod runtime;
2020
mod runtime_tracker;
2121
mod thread;
22-
mod thread_pool;
2322

2423
pub use backtrace::dump_backtrace;
2524
pub use backtrace::get_all_tasks;
@@ -28,9 +27,14 @@ pub use catch_unwind::catch_unwind;
2827
pub use catch_unwind::CatchUnwindFuture;
2928
pub use global_runtime::GlobalIORuntime;
3029
pub use global_runtime::GlobalQueryRuntime;
30+
pub use runtime::block_on;
3131
pub use runtime::execute_futures_in_parallel;
3232
pub use runtime::match_join_handle;
3333
pub use runtime::spawn;
34+
pub use runtime::spawn_blocking;
35+
pub use runtime::spawn_local;
36+
pub use runtime::try_block_on;
37+
pub use runtime::try_spawn_blocking;
3438
pub use runtime::Dropper;
3539
pub use runtime::Runtime;
3640
pub use runtime::TrySpawn;
@@ -44,5 +48,3 @@ pub use runtime_tracker::UnlimitedFuture;
4448
pub use runtime_tracker::GLOBAL_MEM_STAT;
4549
pub use thread::Thread;
4650
pub use thread::ThreadJoinHandle;
47-
pub use thread_pool::TaskJoinHandler;
48-
pub use thread_pool::ThreadPool;

src/common/base/src/runtime/runtime.rs

+85-14
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414

1515
use std::backtrace::Backtrace;
1616
use std::future::Future;
17+
use std::panic::Location;
1718
use std::sync::Arc;
18-
use std::thread;
1919
use std::time::Duration;
2020
use std::time::Instant;
2121

@@ -32,6 +32,8 @@ use tokio::task::JoinHandle;
3232

3333
use crate::runtime::catch_unwind::CatchUnwindFuture;
3434
use crate::runtime::MemStat;
35+
use crate::runtime::Thread;
36+
use crate::runtime::ThreadJoinHandle;
3537

3638
/// Methods to spawn tasks.
3739
pub trait TrySpawn {
@@ -101,7 +103,7 @@ impl Runtime {
101103
let handle = runtime.handle().clone();
102104

103105
// Block the runtime to shutdown.
104-
let join_handler = thread::spawn(move || {
106+
let join_handler = Thread::spawn(move || {
105107
// We ignore channel is closed.
106108
let _ = runtime.block_on(recv_stop);
107109

@@ -201,10 +203,16 @@ impl Runtime {
201203
self.handle.clone()
202204
}
203205

206+
#[track_caller]
204207
pub fn block_on<T, F>(&self, future: F) -> F::Output
205208
where F: Future<Output = Result<T>> + Send + 'static {
206209
let future = CatchUnwindFuture::create(future);
207-
self.handle.block_on(future).flatten()
210+
#[allow(clippy::disallowed_methods)]
211+
tokio::task::block_in_place(|| {
212+
self.handle
213+
.block_on(location_future(future, std::panic::Location::caller()))
214+
.flatten()
215+
})
208216
}
209217

210218
// For each future of `futures`, before being executed
@@ -260,6 +268,7 @@ impl Runtime {
260268
let permit = semaphore.acquire_owned().await.map_err(|e| {
261269
ErrorCode::Internal(format!("semaphore closed, acquire permit failure. {}", e))
262270
})?;
271+
#[expect(clippy::disallowed_methods)]
263272
let handler = self
264273
.handle
265274
.spawn(async_backtrace::location!().frame(async move {
@@ -280,6 +289,7 @@ impl Runtime {
280289
F: FnOnce() -> Result<R> + Send + 'static,
281290
R: Send + 'static,
282291
{
292+
#[allow(clippy::disallowed_methods)]
283293
match_join_handle(self.handle.spawn_blocking(f)).await
284294
}
285295
}
@@ -298,6 +308,7 @@ impl TrySpawn for Runtime {
298308
async_backtrace::location!(format!("Running query {} spawn task", id)).frame(task)
299309
}
300310
};
311+
#[expect(clippy::disallowed_methods)]
301312
Ok(self.handle.spawn(task))
302313
}
303314
}
@@ -306,7 +317,7 @@ impl TrySpawn for Runtime {
306317
pub struct Dropper {
307318
name: Option<String>,
308319
close: Option<oneshot::Sender<()>>,
309-
join_handler: Option<thread::JoinHandle<bool>>,
320+
join_handler: Option<ThreadJoinHandle<bool>>,
310321
}
311322

312323
impl Drop for Dropper {
@@ -389,6 +400,70 @@ pub fn spawn<F>(future: F) -> tokio::task::JoinHandle<F::Output>
389400
where
390401
F: Future + Send + 'static,
391402
F::Output: Send + 'static,
403+
{
404+
#[expect(clippy::disallowed_methods)]
405+
tokio::spawn(location_future(future, std::panic::Location::caller()))
406+
}
407+
408+
#[track_caller]
409+
pub fn spawn_local<F>(future: F) -> tokio::task::JoinHandle<F::Output>
410+
where
411+
F: Future + Send + 'static,
412+
F::Output: Send + 'static,
413+
{
414+
#[expect(clippy::disallowed_methods)]
415+
tokio::task::spawn_local(location_future(future, std::panic::Location::caller()))
416+
}
417+
418+
#[track_caller]
419+
pub fn spawn_blocking<F, R>(f: F) -> tokio::task::JoinHandle<R>
420+
where
421+
F: FnOnce() -> R + Send + 'static,
422+
R: Send + 'static,
423+
{
424+
#[expect(clippy::disallowed_methods)]
425+
tokio::runtime::Handle::current().spawn_blocking(f)
426+
}
427+
428+
#[track_caller]
429+
pub fn try_spawn_blocking<F, R>(f: F) -> Result<tokio::task::JoinHandle<R>, F>
430+
where
431+
F: FnOnce() -> R + Send + 'static,
432+
R: Send + 'static,
433+
{
434+
match tokio::runtime::Handle::try_current() {
435+
Err(_) => Err(f),
436+
#[expect(clippy::disallowed_methods)]
437+
Ok(handler) => Ok(handler.spawn_blocking(f)),
438+
}
439+
}
440+
441+
#[track_caller]
442+
pub fn block_on<F: Future>(future: F) -> F::Output {
443+
#[expect(clippy::disallowed_methods)]
444+
tokio::task::block_in_place(|| {
445+
tokio::runtime::Handle::current()
446+
.block_on(location_future(future, std::panic::Location::caller()))
447+
})
448+
}
449+
450+
#[track_caller]
451+
pub fn try_block_on<F: Future>(future: F) -> Result<F::Output, F> {
452+
match tokio::runtime::Handle::try_current() {
453+
Err(_) => Err(future),
454+
#[expect(clippy::disallowed_methods)]
455+
Ok(handler) => Ok(tokio::task::block_in_place(|| {
456+
handler.block_on(location_future(future, std::panic::Location::caller()))
457+
})),
458+
}
459+
}
460+
461+
fn location_future<F: Future>(
462+
future: F,
463+
frame_location: &'static Location,
464+
) -> impl Future<Output = F::Output>
465+
where
466+
F: Future,
392467
{
393468
// NOTE:
394469
// Frame name: https://play.rust-lang.org/?version=nightly&mode=debug&edition=2021&gist=689fbc84ab4be894c0cdd285bea24845
@@ -397,16 +472,12 @@ where
397472
let frame_name = std::any::type_name::<F>()
398473
.trim_end_matches("::{{closure}}")
399474
.to_string();
400-
let frame_location = std::panic::Location::caller();
401475

402-
#[expect(clippy::disallowed_methods)]
403-
tokio::spawn(
404-
async_backtrace::location!(
405-
frame_name,
406-
frame_location.file(),
407-
frame_location.line(),
408-
frame_location.column()
409-
)
410-
.frame(future),
476+
async_backtrace::location!(
477+
frame_name,
478+
frame_location.file(),
479+
frame_location.line(),
480+
frame_location.column()
411481
)
482+
.frame(future)
412483
}

src/common/base/src/runtime/thread_pool.rs

-69
This file was deleted.

src/common/base/tests/it/main.rs

-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ mod runtime;
2525
mod runtime_tracker;
2626
mod stoppable;
2727
mod string;
28-
mod thread_pool;
2928

3029
// runtime tests depends on the memory stat collector.
3130
#[global_allocator]

src/common/base/tests/it/thread_pool.rs

-46
This file was deleted.

src/common/storage/src/operator.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ pub fn build_operator<B: Builder>(builder: B) -> Result<Operator> {
109109
// Magic happens here. We will add a layer upon original
110110
// storage operator so that all underlying storage operations
111111
// will send to storage runtime.
112-
.layer(RuntimeLayer::new(GlobalIORuntime::instance().inner()))
112+
.layer(RuntimeLayer::new(GlobalIORuntime::instance()))
113113
.layer({
114114
let retry_timeout = env::var("_DATABEND_INTERNAL_RETRY_TIMEOUT")
115115
.ok()

0 commit comments

Comments
 (0)