Skip to content

Commit 9ef56cd

Browse files
committed
Switch to a separate executor for RPC calls to avoid tokio hangs
See the comment in the commit for more info on why we have to do this.
1 parent 04aaa24 commit 9ef56cd

File tree

1 file changed

+82
-35
lines changed

1 file changed

+82
-35
lines changed

src/bitcoind_client.rs

+82-35
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@ use lightning_block_sync::rpc::RpcClient;
2525
use lightning_block_sync::{AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource};
2626
use serde_json;
2727
use std::collections::HashMap;
28+
use std::future::Future;
2829
use std::str::FromStr;
2930
use std::sync::atomic::{AtomicU32, Ordering};
3031
use std::sync::Arc;
3132
use std::time::Duration;
3233

34+
use tokio::runtime::{self, Runtime};
35+
3336
pub struct BitcoindClient {
3437
pub(crate) bitcoind_rpc_client: Arc<RpcClient>,
3538
network: Network,
@@ -38,7 +41,8 @@ pub struct BitcoindClient {
3841
rpc_user: String,
3942
rpc_password: String,
4043
fees: Arc<HashMap<ConfirmationTarget, AtomicU32>>,
41-
handle: tokio::runtime::Handle,
44+
main_runtime_handle: runtime::Handle,
45+
inner_runtime: Runtime,
4246
logger: Arc<FilesystemLogger>,
4347
}
4448

@@ -66,7 +70,7 @@ const MIN_FEERATE: u32 = 253;
6670
impl BitcoindClient {
6771
pub(crate) async fn new(
6872
host: String, port: u16, rpc_user: String, rpc_password: String, network: Network,
69-
handle: tokio::runtime::Handle, logger: Arc<FilesystemLogger>,
73+
handle: runtime::Handle, logger: Arc<FilesystemLogger>,
7074
) -> std::io::Result<Self> {
7175
let http_endpoint = HttpEndpoint::for_host(host.clone()).with_port(port);
7276
let rpc_credentials =
@@ -95,6 +99,10 @@ impl BitcoindClient {
9599
fees.insert(ConfirmationTarget::ChannelCloseMinimum, AtomicU32::new(MIN_FEERATE));
96100
fees.insert(ConfirmationTarget::OutputSpendingFee, AtomicU32::new(MIN_FEERATE));
97101

102+
let mut builder = runtime::Builder::new_multi_thread();
103+
let inner_runtime =
104+
builder.enable_all().worker_threads(2).thread_name("rpc-worker").build().unwrap();
105+
98106
let client = Self {
99107
bitcoind_rpc_client: Arc::new(bitcoind_rpc_client),
100108
host,
@@ -103,7 +111,8 @@ impl BitcoindClient {
103111
rpc_password,
104112
network,
105113
fees: Arc::new(fees),
106-
handle: handle.clone(),
114+
main_runtime_handle: handle.clone(),
115+
inner_runtime,
107116
logger,
108117
};
109118
BitcoindClient::poll_for_fee_estimates(
@@ -226,10 +235,43 @@ impl BitcoindClient {
226235
});
227236
}
228237

238+
fn run_future_in_blocking_context<F: Future + Send + 'static>(&self, future: F) -> F::Output
239+
where F::Output: Send + 'static
240+
{
241+
// Tokio deliberately makes it nigh impossible to block on a future in a sync context that
242+
// is running in an async task (which makes it really hard to interact with sync code that
243+
// has callbacks in an async project).
244+
//
245+
// Reading the docs, it *seems* like
246+
// `tokio::task::block_in_place(tokio::runtime::Handle::spawn(future))` should do the
247+
// trick, and 99.999% of the time it does! But tokio has a "non-stealable I/O driver" - if
248+
// the task we're running happens to, by sheer luck, be holding the "I/O driver" when we go
249+
// into a `block_in_place` call, and the inner future requires I/O (which of course it
250+
// does, its a future!), the whole thing will come to a grinding halt as no other thread is
251+
// allowed to poll I/O until the blocked one finishes.
252+
//
253+
// This is, of course, nuts, and an almost trivial performance penalty of occasional
254+
// additional wakeups would solve this, but tokio refuses to do so because any performance
255+
// penalty at all would be too much (tokio issue #4730).
256+
//
257+
// Instead, we have to do a rather insane dance - we have to spawn the `future` we want to
258+
// run on a *different* (threaded) tokio runtime (doing the `block_in_place` dance to avoid
259+
// blocking too many threads on the main runtime). We want to block on that `future` being
260+
// run on the other runtime's threads, but tokio only provides `block_on` to do so, which
261+
// runs the `future` itself on the current thread, panicing if this thread is already a
262+
// part of a tokio runtime (which in this case it is - the main tokio runtime). Thus, we
263+
// have to `spawn` the `future` on the secondary runtime and then `block_on` the resulting
264+
// `JoinHandle` on the main runtime.
265+
tokio::task::block_in_place(move || {
266+
self.main_runtime_handle.block_on(
267+
self.inner_runtime.spawn(future)
268+
).unwrap()
269+
})
270+
}
271+
229272
pub fn get_new_rpc_client(&self) -> RpcClient {
230273
let http_endpoint = HttpEndpoint::for_host(self.host.clone()).with_port(self.port);
231-
let rpc_credentials =
232-
base64::encode(format!("{}:{}", self.rpc_user.clone(), self.rpc_password.clone()));
274+
let rpc_credentials = base64::encode(format!("{}:{}", self.rpc_user, self.rpc_password));
233275
RpcClient::new(&rpc_credentials, http_endpoint)
234276
}
235277

@@ -273,22 +315,28 @@ impl BitcoindClient {
273315
.unwrap();
274316
}
275317

276-
pub async fn sign_raw_transaction_with_wallet(&self, tx_hex: String) -> SignedTx {
318+
pub fn sign_raw_transaction_with_wallet(&self, tx_hex: String) -> impl Future<Output=SignedTx> {
277319
let tx_hex_json = serde_json::json!(tx_hex);
278-
self.bitcoind_rpc_client
279-
.call_method("signrawtransactionwithwallet", &vec![tx_hex_json])
280-
.await
281-
.unwrap()
320+
let rpc_client = self.get_new_rpc_client();
321+
async move {
322+
rpc_client
323+
.call_method("signrawtransactionwithwallet", &vec![tx_hex_json])
324+
.await
325+
.unwrap()
326+
}
282327
}
283328

284-
pub async fn get_new_address(&self) -> Address {
329+
pub fn get_new_address(&self) -> impl Future<Output=Address> {
285330
let addr_args = vec![serde_json::json!("LDK output address")];
286-
let addr = self
287-
.bitcoind_rpc_client
288-
.call_method::<NewAddress>("getnewaddress", &addr_args)
289-
.await
290-
.unwrap();
291-
Address::from_str(addr.0.as_str()).unwrap().require_network(self.network).unwrap()
331+
let network = self.network;
332+
let rpc_client = self.get_new_rpc_client();
333+
async move {
334+
let addr = rpc_client
335+
.call_method::<NewAddress>("getnewaddress", &addr_args)
336+
.await
337+
.unwrap();
338+
Address::from_str(addr.0.as_str()).unwrap().require_network(network).unwrap()
339+
}
292340
}
293341

294342
pub async fn get_blockchain_info(&self) -> BlockchainInfo {
@@ -298,11 +346,14 @@ impl BitcoindClient {
298346
.unwrap()
299347
}
300348

301-
pub async fn list_unspent(&self) -> ListUnspentResponse {
302-
self.bitcoind_rpc_client
303-
.call_method::<ListUnspentResponse>("listunspent", &vec![])
304-
.await
305-
.unwrap()
349+
pub fn list_unspent(&self) -> impl Future<Output=ListUnspentResponse> {
350+
let rpc_client = self.get_new_rpc_client();
351+
async move {
352+
rpc_client
353+
.call_method::<ListUnspentResponse>("listunspent", &vec![])
354+
.await
355+
.unwrap()
356+
}
306357
}
307358
}
308359

@@ -324,7 +375,7 @@ impl BroadcasterInterface for BitcoindClient {
324375
let txn = txs.iter().map(|tx| encode::serialize_hex(tx)).collect::<Vec<_>>();
325376
let bitcoind_rpc_client = Arc::clone(&self.bitcoind_rpc_client);
326377
let logger = Arc::clone(&self.logger);
327-
self.handle.spawn(async move {
378+
self.main_runtime_handle.spawn(async move {
328379
let res = if txn.len() == 1 {
329380
let tx_json = serde_json::json!(txn[0]);
330381
bitcoind_rpc_client
@@ -355,17 +406,15 @@ impl BroadcasterInterface for BitcoindClient {
355406

356407
impl ChangeDestinationSource for BitcoindClient {
357408
fn get_change_destination_script(&self) -> Result<ScriptBuf, ()> {
358-
tokio::task::block_in_place(move || {
359-
Ok(self.handle.block_on(async move { self.get_new_address().await.script_pubkey() }))
360-
})
409+
let future = self.get_new_address();
410+
Ok(self.run_future_in_blocking_context(async move { future.await.script_pubkey() }))
361411
}
362412
}
363413

364414
impl WalletSource for BitcoindClient {
365415
fn list_confirmed_utxos(&self) -> Result<Vec<Utxo>, ()> {
366-
let utxos = tokio::task::block_in_place(move || {
367-
self.handle.block_on(async move { self.list_unspent().await }).0
368-
});
416+
let future = self.list_unspent();
417+
let utxos = self.run_future_in_blocking_context(async move { future.await.0 });
369418
Ok(utxos
370419
.into_iter()
371420
.filter_map(|utxo| {
@@ -398,18 +447,16 @@ impl WalletSource for BitcoindClient {
398447
}
399448

400449
fn get_change_script(&self) -> Result<ScriptBuf, ()> {
401-
tokio::task::block_in_place(move || {
402-
Ok(self.handle.block_on(async move { self.get_new_address().await.script_pubkey() }))
403-
})
450+
let future = self.get_new_address();
451+
Ok(self.run_future_in_blocking_context(async move { future.await.script_pubkey() }))
404452
}
405453

406454
fn sign_psbt(&self, tx: Psbt) -> Result<Transaction, ()> {
407455
let mut tx_bytes = Vec::new();
408456
let _ = tx.unsigned_tx.consensus_encode(&mut tx_bytes).map_err(|_| ());
409457
let tx_hex = hex_utils::hex_str(&tx_bytes);
410-
let signed_tx = tokio::task::block_in_place(move || {
411-
self.handle.block_on(async move { self.sign_raw_transaction_with_wallet(tx_hex).await })
412-
});
458+
let future = self.sign_raw_transaction_with_wallet(tx_hex);
459+
let signed_tx = self.run_future_in_blocking_context(async move { future.await });
413460
let signed_tx_bytes = hex_utils::to_vec(&signed_tx.hex).ok_or(())?;
414461
Transaction::consensus_decode(&mut signed_tx_bytes.as_slice()).map_err(|_| ())
415462
}

0 commit comments

Comments
 (0)