Skip to content

Commit

Permalink
Merge pull request #3362 from golemfactory/release/v0.17
Browse files Browse the repository at this point in the history
Release 0.17 Branch
  • Loading branch information
scx1332 authored Feb 24, 2025
2 parents cba662b + cb4c085 commit ec18cbd
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 5 deletions.
19 changes: 19 additions & 0 deletions core/payment/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,12 +782,18 @@ impl PaymentProcessor {
let payments: Vec<Payment> = {
let db_executor = self.db_executor.timeout_lock(DB_LOCK_TIMEOUT).await?;

log::trace!("get_batch_order_items_by_payment_id {}...", msg.payment_id);
let order_items = db_executor
.as_dao::<BatchDao>()
.get_batch_order_items_by_payment_id(msg.payment_id, payer_id)
.await?;
log::trace!(
"get_batch_order_items_by_payment_id finished and received {} items",
order_items.len()
);

for order_item in order_items.iter() {
log::trace!("Getting documents for order item: {:?}", order_item);
let order_documents = match db_executor
.as_dao::<BatchDao>()
.get_batch_items(
Expand All @@ -808,6 +814,7 @@ impl PaymentProcessor {
)));
}
};
log::trace!("Set orders paid: {:?}", order_item);

db_executor
.as_dao::<BatchDao>()
Expand All @@ -819,6 +826,8 @@ impl PaymentProcessor {
)
.await?;
for order in order_documents.iter() {
log::trace!("Find activity for order: {:?}", order);

//get agreement by id
let agreement = db_executor
.as_dao::<AgreementDao>()
Expand Down Expand Up @@ -863,6 +872,11 @@ impl PaymentProcessor {
}
}

log::trace!(
"Summing up notifications for peers: {:?}",
peers_to_sent_to.values()
);

// Sum up the amounts for each peer
for (key, value) in peers_to_sent_to.iter_mut() {
for val in &value.activity_payments {
Expand Down Expand Up @@ -907,10 +921,13 @@ impl PaymentProcessor {
value.agreement_payments = agreement_map.into_values().collect();
}

log::trace!("Signing {} payments...", peers_to_sent_to.len());

let mut payloads = Vec::new();
for (key, value) in peers_to_sent_to.iter() {
let payment_dao: PaymentDao = db_executor.as_dao();

log::trace!("Creating new payment for peer: {}", value.payee_id);
payment_dao
.create_new(
value.payment_id.clone(),
Expand All @@ -926,12 +943,14 @@ impl PaymentProcessor {
)
.await?;

log::trace!("Get signed payment for peer: {}", value.payee_id);
let signed_payment = payment_dao
.get(payment_id.clone(), payer_id)
.await?
.unwrap();
payloads.push(signed_payment.payload);
}
log::trace!("Part of processing blocking DB done, releasing DB");
payloads
};

Expand Down
50 changes: 45 additions & 5 deletions core/payment/src/timeout_lock.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,58 @@
use std::time::Duration;

use futures::Future;
use futures::{Future, FutureExt};
use tokio::time::error::Elapsed;
use tokio::time::Instant;
use tokio::{
sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard},
time::Timeout,
};

pub trait MutexTimeoutExt<T: ?Sized + 'static> {
fn timeout_lock(&self, duration: Duration) -> Timeout<impl Future<Output = MutexGuard<'_, T>>>;
fn timeout_lock(
&self,
duration: Duration,
) -> impl Future<Output = Result<MutexGuard<'_, T>, Elapsed>>;
}

static ID: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
impl<T: ?Sized + 'static> MutexTimeoutExt<T> for Mutex<T> {
fn timeout_lock(&self, duration: Duration) -> Timeout<impl Future<Output = MutexGuard<'_, T>>> {
tokio::time::timeout(duration, self.lock())
#[track_caller]
fn timeout_lock(
&self,
duration: Duration,
) -> impl Future<Output = Result<MutexGuard<'_, T>, Elapsed>> {
let caller_location = std::panic::Location::caller();
let caller_line_number = caller_location.line();
let caller_file = caller_location.file();

let next_id = ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst);

let caller = format!("{}:{}", caller_file, caller_line_number);
log::trace!("Timeout lock {next_id} requested from {caller}");
let curr = Instant::now();
tokio::time::timeout(duration, self.lock()).then(move |result| {
let elapsed_ms = curr.elapsed().as_secs_f64() * 1000.0;
let duration_ms = duration.as_secs_f64() * 1000.0;
match &result {
Ok(guard) => {
if elapsed_ms > duration_ms / 2.0 {
log::warn!(
"Timeout lock {next_id} acquired after {elapsed_ms:.0}ms by {caller}"
);
} else {
log::trace!(
"Timeout lock {next_id} acquired after {elapsed_ms:.2}ms by {caller}"
);
}
}
Err(_) => {
log::error!(
"Timeout lock {next_id} timed out after {elapsed_ms:.0}ms for {caller}"
);
}
};
futures::future::ready(result)
})
}
}

Expand Down
7 changes: 7 additions & 0 deletions core/serv/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,13 @@ impl ServiceCommand {
"info,actix_web::middleware::logger=warn,sqlx=warn".to_string()
}),
);
if env::var("YAGNA_TRACE_DB_LOCKS") == Ok("1".to_string()) {
env::set_var(
"RUST_LOG",
env::var("RUST_LOG").unwrap_or("info".to_string())
+ ",ya_payment::timeout_lock=trace,ya_payment::processor=trace",
);
}

//this force_debug flag sets default log level to debug
//if the --debug option is set
Expand Down

0 comments on commit ec18cbd

Please sign in to comment.