Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Legacy Lightning-RPC Client #579

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 23 additions & 28 deletions libs/gl-plugin/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use anyhow::Result;
use cln_rpc;
use log::{debug, warn};
use rpc::LightningClient;
use serde_json::json;
use std::future::Future;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::sync::Mutex;

mod awaitables;
pub mod config;
Expand All @@ -17,7 +15,6 @@ pub mod node;
pub mod pb;
pub mod requests;
pub mod responses;
pub mod rpc;
pub mod stager;
pub mod storage;
pub mod tlv;
Expand All @@ -29,7 +26,6 @@ mod context;

#[derive(Clone)]
pub struct GlPlugin {
rpc: Arc<Mutex<LightningClient>>,
stage: Arc<stager::Stage>,
events: broadcast::Sender<Event>,
}
Expand Down Expand Up @@ -95,11 +91,8 @@ pub async fn init(
stage: Arc<stager::Stage>,
events: tokio::sync::broadcast::Sender<Event>,
) -> Result<Builder> {
let rpc = Arc::new(Mutex::new(LightningClient::new("lightning-rpc")));

let state = GlPlugin {
events: events.clone(),
rpc,
stage,
};

Expand Down Expand Up @@ -162,12 +155,9 @@ async fn on_peer_connected(plugin: Plugin, v: serde_json::Value) -> Result<serde
async fn on_openchannel(plugin: Plugin, v: serde_json::Value) -> Result<serde_json::Value> {
debug!("Received an openchannel request: {:?}", v);
let mut rpc = cln_rpc::ClnRpc::new(plugin.configuration().rpc_file).await?;

let req = cln_rpc::model::requests::ListdatastoreRequest{
key: Some(vec![
"glconf".to_string(),
"request".to_string(),
])

let req = cln_rpc::model::requests::ListdatastoreRequest {
key: Some(vec!["glconf".to_string(), "request".to_string()]),
};

let res = rpc.call_typed(&req).await;
Expand All @@ -178,13 +168,17 @@ async fn on_openchannel(plugin: Plugin, v: serde_json::Value) -> Result<serde_js
if !res.datastore.is_empty() {
match &res.datastore[0].string {
Some(serialized_request) => {
match _parse_gl_config_from_serialized_request(serialized_request.to_string()) {
match _parse_gl_config_from_serialized_request(
serialized_request.to_string(),
) {
Some(gl_config) => {
return Ok(json!({"result": "continue", "close_to": gl_config.close_to_addr}));
return Ok(
json!({"result": "continue", "close_to": gl_config.close_to_addr}),
);
}
None => {
debug!("Failed to parse the GlConfig from the serialized request's payload");
}
}
}
}
None => {
Expand All @@ -193,10 +187,13 @@ async fn on_openchannel(plugin: Plugin, v: serde_json::Value) -> Result<serde_js
}
}

return Ok(json!({"result": "continue"}))
return Ok(json!({"result": "continue"}));
}
Err(e) => {
log::debug!("An error occurred while searching for a custom close_to address: {}", e);
log::debug!(
"An error occurred while searching for a custom close_to address: {}",
e
);
Ok(json!({"result": "continue"}))
}
}
Expand Down Expand Up @@ -232,20 +229,18 @@ async fn on_invoice_payment(plugin: Plugin, v: serde_json::Value) -> Result<serd
}
};

let rpc = state.rpc.lock().await.clone();
let req = requests::ListInvoices {
let mut rpc = cln_rpc::ClnRpc::new("lightning-rpc").await?;
let req = cln_rpc::model::requests::ListinvoicesRequest {
label: Some(call.payment.label.clone()),
invstring: None,
payment_hash: None,
offer_id: None,
index: None,
start: None,
limit: None,
};

let invoice = match rpc
.call::<_, responses::ListInvoices>("listinvoices", req)
.await
.unwrap()
.invoices
.pop()
{
let invoice = match rpc.call_typed(&req).await.unwrap().invoices.pop() {
Some(i) => i,
None => {
warn!(
Expand Down Expand Up @@ -274,7 +269,7 @@ async fn on_invoice_payment(plugin: Plugin, v: serde_json::Value) -> Result<serd
preimage: hex::decode(call.payment.preimage).unwrap(),
amount: Some(amount.try_into().unwrap()),
extratlvs: tlvs,
bolt11: invoice.bolt11,
bolt11: invoice.bolt11.unwrap(),
payment_hash: hex::decode(invoice.payment_hash).unwrap(),
};

Expand Down
65 changes: 27 additions & 38 deletions libs/gl-plugin/src/node/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::config::Config;
use crate::pb::{self, node_server::Node};
use crate::rpc::LightningClient;
use crate::storage::StateStore;
use crate::{messages, Event};
use crate::{stager, tramp};
Expand All @@ -13,7 +12,6 @@ use governor::{
};
use lazy_static::lazy_static;
use log::{debug, error, info, trace, warn};
use serde_json::json;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::{
Expand Down Expand Up @@ -54,7 +52,6 @@ lazy_static! {
pub struct PluginNodeServer {
pub tls: ServerTlsConfig,
pub stage: Arc<stager::Stage>,
pub rpc: Arc<Mutex<LightningClient>>,
rpc_path: PathBuf,
events: tokio::sync::broadcast::Sender<super::Event>,
signer_state: Arc<Mutex<State>>,
Expand All @@ -78,8 +75,6 @@ impl PluginNodeServer {
rpc_path.push("lightning-rpc");
info!("Connecting to lightning-rpc at {:?}", rpc_path);

let rpc = Arc::new(Mutex::new(LightningClient::new(rpc_path.clone())));

// Bridge the RPC_BCAST into the events queue
let tx = events.clone();
tokio::spawn(async move {
Expand All @@ -95,15 +90,12 @@ impl PluginNodeServer {

let ctx = crate::context::Context::new();

let rrpc = rpc.clone();

let s = PluginNodeServer {
ctx,
tls,
rpc,
stage,
events,
rpc_path,
rpc_path: rpc_path.clone(),
signer_state: Arc::new(Mutex::new(signer_state)),
signer_state_store: Arc::new(Mutex::new(signer_state_store)),
grpc_binding: config.node_grpc_binding,
Expand All @@ -114,10 +106,11 @@ impl PluginNodeServer {
use tokio::time::{sleep, Duration};

// Move the lock into the closure so we can release it later.
let rpc = rrpc.lock().await;
let mut rpc = cln_rpc::ClnRpc::new(rpc_path.clone()).await.unwrap();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We got a bit unlucky here: we use the Mutex to prevent incoming JSON-RPC calls, before the JSON-RPC socket is ready. This is done by acquiring the lock during startup and releasing it once the socket file is present. The internal use-cases are ok, since they by definition are invoked only after the node started and the socket is ready.

The change below however breaks this, by not locking in get_rpc it is now possible for incoming grpc calls to be directly forwarded (not waiting behind a lock) and so they will fail. We will most likely notice this as the startup commands (i.e., the ones that caused the node to start) will fail in a large number of cases.

Please leave the Mutex in get_rpc

loop {
let res: Result<crate::responses::GetInfo, crate::rpc::Error> =
rpc.call("getinfo", json!({})).await;
let res = rpc
.call_typed(&cln_rpc::model::requests::GetinfoRequest {})
.await;
match res {
Ok(_) => break,
Err(e) => {
Expand All @@ -137,8 +130,7 @@ impl PluginNodeServer {
key: Some(vec!["glconf".to_string(), "request".to_string()]),
};

let res: Result<cln_rpc::model::responses::ListdatastoreResponse, crate::rpc::Error> =
rpc.call("listdatastore", list_datastore_req).await;
let res = rpc.call_typed(&list_datastore_req).await;

match res {
Ok(list_datastore_res) => {
Expand Down Expand Up @@ -177,11 +169,8 @@ impl PluginNodeServer {
limiter.until_ready().await
}

pub async fn get_rpc(&self) -> LightningClient {
let rpc = self.rpc.lock().await;
let r = rpc.clone();
drop(rpc);
r
pub async fn get_rpc(&self) -> Result<cln_rpc::ClnRpc> {
cln_rpc::ClnRpc::new(self.rpc_path.clone()).await
}
}

Expand Down Expand Up @@ -229,8 +218,8 @@ impl Node for PluginNodeServer {
// log entries are produced while we're streaming the
// backlog out, but do we care?
use tokio::io::{AsyncBufReadExt, BufReader};
// The nodelet uses its CWD, but CLN creates a network
// subdirectory
// The nodelet uses its CWD, but CLN creates a network
// subdirectory
let file = tokio::fs::File::open("../log").await?;
let mut file = BufReader::new(file).lines();

Expand Down Expand Up @@ -475,10 +464,16 @@ impl Node for PluginNodeServer {
) -> Result<Response<pb::Empty>, Status> {
self.limit().await;
let gl_config = req.into_inner();
let rpc = self.get_rpc().await;
let mut rpc = self.get_rpc().await.map_err(|e| {
tonic::Status::new(
tonic::Code::Internal,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this is the error messages all "early calls" will run into. Not the best UX.

format!("could not connect rpc client: {e}"),
)
})?;

let res: Result<crate::responses::GetInfo, crate::rpc::Error> =
rpc.call("getinfo", json!({})).await;
let res = rpc
.call_typed(&cln_rpc::model::requests::GetinfoRequest {})
.await;

let network = match res {
Ok(get_info_response) => match get_info_response.network.parse() {
Expand Down Expand Up @@ -528,20 +523,14 @@ impl Node for PluginNodeServer {
.map(|r| r.into())
.collect();
let serialized_req = serde_json::to_string(&requests[0]).unwrap();
let datastore_res: Result<
crate::cln_rpc::model::responses::DatastoreResponse,
crate::rpc::Error,
> = rpc
.call(
"datastore",
json!({
"key": vec![
"glconf".to_string(),
"request".to_string(),
],
"string": serialized_req,
}),
)
let datastore_res = rpc
.call_typed(&cln_rpc::model::requests::DatastoreRequest {
key: vec!["glconf".to_string(), "request".to_string()],
string: Some(serialized_req.clone()),
hex: None,
mode: None,
generation: None,
})
.await;

match datastore_res {
Expand Down
Loading