Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scx1332/fix yagna performance #3364

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/activity/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ mod common {
query: web::Query<QueryTimeout>,
id: Identity,
) -> impl Responder {
log::debug!("get_activity_state_web");
log::info!("get_activity_state_web");

// check if caller is the Provider
if authorize_activity_executor(&db, id.identity, &path.activity_id, Role::Provider)
Expand Down
8 changes: 8 additions & 0 deletions core/activity/src/requestor/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ async fn get_batch_results(
id: Identity,
request: HttpRequest,
) -> Result<impl Responder> {
log::info!(
"Requestor get_batch_results: {:?}",
request.headers().get(header::ACCEPT)
);
authorize_activity_initiator(&db, id.identity, &path.activity_id, Role::Requestor).await?;
let agreement = get_activity_agreement(&db, &path.activity_id, Role::Requestor).await?;

Expand Down Expand Up @@ -238,6 +242,10 @@ async fn await_results(
.timeout(timeout_margin(query.timeout))
.await???;

log::info!(
"Requestor get_batch_results: {:?}",
serde_json::to_string(&results)
);
Ok::<_, Error>(web::Json(results))
}

Expand Down
1 change: 1 addition & 0 deletions core/payment/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ metrics = "0.12"
num-bigint = "0.3"
open = "5.1.2"
problem_details = "0.6.0"
rand.workspace = true
r2d2 = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- It is not possible to revert this migration.

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

CREATE INDEX pay_allocation_released_idx ON pay_allocation (released);

Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- It is not possible to revert this migration.

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

CREATE INDEX pay_invoice_event_idx ON pay_invoice_event (timestamp);

Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- It is not possible to revert this migration.

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

CREATE INDEX pay_debit_note_event_idx ON pay_debit_note_event (timestamp);

8 changes: 8 additions & 0 deletions core/payment/src/api/debit_notes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ async fn get_debit_note_events(
let max_events = query.max_events;
let app_session_id = &query.app_session_id;

log::info!(
"Requested DebitNote events for Node [{}] after [{}] with timeout [{}]",
node_id,
after_timestamp
.map(|d| d.to_string())
.unwrap_or_else(|| "None".to_owned()),
timeout_secs
);
let dao: DebitNoteEventDao = db.as_dao();
let getter = || async {
dao.get_for_node_id(
Expand Down
68 changes: 62 additions & 6 deletions core/payment/src/dao/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use diesel::sql_types::{Text, Timestamp};
use std::collections::{hash_map, HashMap};
use std::iter::zip;
use std::str::FromStr;
use tokio::time::Instant;
use uuid::Uuid;
use ya_core_model::NodeId;
use ya_persistence::executor::{
Expand Down Expand Up @@ -393,7 +394,7 @@ fn use_expenditures_on_payments(
})
.collect::<Vec<&mut AllocationExpenditureObj>>(),
};
log::info!(
log::debug!(
"Found {} matching expenditures for obligation {:?}",
matching_expenditures.len(),
obligation
Expand Down Expand Up @@ -471,15 +472,15 @@ fn use_expenditures_on_payments(
amount,
agreement_id,
} => {
log::info!("Covered invoice obligation {} with {} of {} from allocation {} - agreement id: {}", id, cover_amount, amount, expenditure.allocation_id, agreement_id);
log::debug!("Covered invoice obligation {} with {} of {} from allocation {} - agreement id: {}", id, cover_amount, amount, expenditure.allocation_id, agreement_id);
}
BatchPaymentObligation::DebitNote {
debit_note_id,
amount,
agreement_id,
activity_id,
} => {
log::info!("Covered debit note obligation {:?} with {} of {} from allocation {} - agreement id: {} - activity id: {}", debit_note_id, cover_amount, amount, expenditure.allocation_id, agreement_id, activity_id);
log::debug!("Covered debit note obligation {:?} with {} of {} from allocation {} - agreement id: {} - activity id: {}", debit_note_id, cover_amount, amount, expenditure.allocation_id, agreement_id, activity_id);
}
}
amount_covered += cover_amount;
Expand All @@ -490,7 +491,7 @@ fn use_expenditures_on_payments(
amount,
agreement_id,
} => {
log::info!(
log::debug!(
"Total covered invoice obligation {} with {} of {} from allocations",
id,
amount_covered,
Expand All @@ -503,7 +504,7 @@ fn use_expenditures_on_payments(
agreement_id,
activity_id,
} => {
log::info!("Total covered debit note obligation {:?} with {} of {} from allocations", debit_note_id, amount_covered, amount);
log::debug!("Total covered debit note obligation {:?} with {} of {} from allocations", debit_note_id, amount_covered, amount);
}
}
}
Expand All @@ -520,23 +521,44 @@ pub fn resolve_invoices(args: &ResolveInvoiceArgs) -> DbResult<Option<String>> {
let since = args.since;
let zero = BigDecimal::from(0u32);

let start_all = Instant::now();

let total_amount = BigDecimal::default();
let payments = HashMap::<String, BatchPayment>::new();

let total_amount = BigDecimal::from(0u32);

let start = Instant::now();
log::debug!("Resolving invoices for {} - {}", owner_id, platform);
let (payments, total_amount) = resolve_invoices_activity_part(args, total_amount, payments)?;

log::trace!(
"Resolve invoices (activity) finished in {:.2}ms",
start.elapsed().as_secs_f64() * 1000.0
);

let start = Instant::now();
log::debug!("Resolving agreements for {}", owner_id);
let (payments, total_amount) = resolve_invoices_agreement_part(args, total_amount, payments)?;

log::trace!(
"Resolve invoices (agreement) finished in {:.2}ms",
start.elapsed().as_secs_f64() * 1000.0
);
if total_amount == zero {
log::debug!("No invoices to resolve for {} and {}", owner_id, platform);
return Ok(None);
} else {
log::info!(
"Found payments that need to be resolved! platform: {}, owner: {}",
platform,
owner_id
);
}

let start = Instant::now();
//get allocation expenditures

log::debug!("Resolving expenditures for {}", owner_id);
use crate::schema::pay_allocation::dsl as pa_dsl;
use crate::schema::pay_allocation_expenditure::dsl as pae_dsl;
let expenditures_orig: Vec<AllocationExpenditureObj> = pae_dsl::pay_allocation_expenditure
Expand All @@ -553,16 +575,30 @@ pub fn resolve_invoices(args: &ResolveInvoiceArgs) -> DbResult<Option<String>> {
.load(conn)?;
let mut expenditures = expenditures_orig.clone();

log::trace!(
"Resolve expenditures finished in {:.2}ms",
start.elapsed().as_secs_f64() * 1000.0
);

if expenditures.len() > 0 {
log::debug!("Found total of {} expenditures", expenditures.len());
}

let start = Instant::now();
log::debug!("Using expenditures on payments");
let payments_allocations =
use_expenditures_on_payments(&mut expenditures, payments).map_err(|e| {
log::error!("Error using expenditures on payments: {:?}", e);
e
})?;

log::trace!(
"Use expenditures on payments finished in {:.2}ms",
start.elapsed().as_secs_f64() * 1000.0
);

let start = Instant::now();
log::debug!("Updating expenditures");
// upload the updated expenditures to database (if changed)
for (expenditure_new, expenditure_old) in zip(expenditures.iter(), expenditures_orig.iter()) {
if expenditure_new.scheduled_amount != expenditure_old.scheduled_amount {
Expand All @@ -585,7 +621,13 @@ pub fn resolve_invoices(args: &ResolveInvoiceArgs) -> DbResult<Option<String>> {
}
}

log::trace!(
"Update expenditures finished in {:.2}ms",
start.elapsed().as_secs_f64() * 1000.0
);

let order_id = Uuid::new_v4().to_string();
log::debug!("Inserting new order: {}", order_id);
{
use crate::schema::pay_batch_order::dsl as odsl;

Expand All @@ -600,6 +642,9 @@ pub fn resolve_invoices(args: &ResolveInvoiceArgs) -> DbResult<Option<String>> {
))
.execute(conn)?;
}

let start = Instant::now();
log::debug!("Inserting new order items");
{
for (key, payment) in payments_allocations {
let payee_addr = key.payee_addr;
Expand Down Expand Up @@ -665,6 +710,17 @@ pub fn resolve_invoices(args: &ResolveInvoiceArgs) -> DbResult<Option<String>> {
}
}
}
log::trace!(
"Insert order items finished in {:.2}ms",
start.elapsed().as_secs_f64() * 1000.0
);

log::info!(
"Resolved invoices for {}, platform: {}, took {:.2}ms",
owner_id,
platform,
start_all.elapsed().as_secs_f64() * 1000.0
);
Ok(Some(order_id))
}

Expand Down
71 changes: 57 additions & 14 deletions core/payment/src/dao/debit_note_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::models::debit_note_event::{ReadObj, WriteObj};
use crate::schema::pay_debit_note_event::dsl as write_dsl;
use crate::schema::pay_debit_note_event_read::dsl as read_dsl;
use chrono::NaiveDateTime;
use diesel::sql_types::{Text, Timestamp};
use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl};
use std::borrow::Cow;
use std::collections::HashSet;
Expand Down Expand Up @@ -37,6 +38,20 @@ impl<'c> AsDao<'c> for DebitNoteEventDao<'c> {
}
}

#[derive(Debug, QueryableByName)]
struct DebitNoteEventRow {
#[sql_type = "Text"]
role: String,
#[sql_type = "Text"]
debit_note_id: String,
#[sql_type = "Text"]
owner_id: String,
#[sql_type = "Text"]
event_type: String,
#[sql_type = "Text"]
timestamp: String,
}

impl<'c> DebitNoteEventDao<'c> {
pub async fn create(
&self,
Expand Down Expand Up @@ -99,20 +114,48 @@ impl<'c> DebitNoteEventDao<'c> {
provider_events: Vec<Cow<'static, str>>,
) -> DbResult<Vec<DebitNoteEvent>> {
readonly_transaction(self.pool, "debit_note_get_for_node_id", move |conn| {
let mut query = read_dsl::pay_debit_note_event_read
.filter(read_dsl::owner_id.eq(node_id))
.order_by(read_dsl::timestamp.asc())
.into_boxed();
if let Some(timestamp) = after_timestamp {
query = query.filter(read_dsl::timestamp.gt(timestamp.adapt()));
}
if let Some(app_session_id) = app_session_id {
query = query.filter(read_dsl::app_session_id.eq(app_session_id));
}
if let Some(limit) = max_events {
query = query.limit(limit.into());
}
let events: Vec<ReadObj> = query.load(conn)?;

//get random bool
let use_sqlite = rand::random::<u64>() % 2 == 0;

let events: Vec<ReadObj> =
if let (true, Some(after_timestamp), Some(app_session_id), Some(limit))
= (use_sqlite, after_timestamp, app_session_id.clone(), max_events) {
log::info!("Start sqlite Query: {:?} {:?} {:?} {:?}", after_timestamp, node_id, app_session_id, max_events);

diesel::sql_query(format!(r#"
SELECT pdn.role, pdne.debit_note_id, pdne.owner_id, pdne.event_type, pdne.timestamp, pdne.details, pag.app_session_id FROM pay_debit_note_event AS pdne
JOIN pay_debit_note AS pdn ON pdne.debit_note_id = pdn.id AND pdne.owner_id = pdn.owner_id
JOIN pay_activity AS pac ON pdn.activity_id = pac.id AND pac.owner_id = pdn.owner_id
JOIN pay_agreement AS pag ON pac.agreement_id = pag.id AND pac.owner_id = pag.owner_id
WHERE pdne.owner_id = ? AND pdne.timestamp > ? AND pag.app_session_id = ?
ORDER BY pdne.timestamp ASC
LIMIT {limit};
"#))
.bind::<Text, _>(node_id)
.bind::<Timestamp, _>(after_timestamp)
.bind::<Text, _>(app_session_id)
.load::<ReadObj>(conn)?
} else {
log::info!("Start diesel Query: {:?} {:?} {:?} {:?}", after_timestamp, node_id, app_session_id, max_events);

let mut query = read_dsl::pay_debit_note_event_read
.filter(read_dsl::owner_id.eq(node_id))
.order_by(read_dsl::timestamp.asc())
.into_boxed();
if let Some(timestamp) = after_timestamp {
query = query.filter(read_dsl::timestamp.gt(timestamp.adapt()));
}
if let Some(app_session_id) = app_session_id.clone() {
query = query.filter(read_dsl::app_session_id.eq(app_session_id));
}
if let Some(limit) = max_events {
query = query.limit(limit.into());
}
query.load(conn)?
};

log::info!("End Query: {:?}", events);
let requestor_events: HashSet<Cow<'static, str>> =
requestor_events.into_iter().collect();
let provider_events: HashSet<Cow<'static, str>> = provider_events.into_iter().collect();
Expand Down
2 changes: 1 addition & 1 deletion core/payment/src/models/debit_note_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl WriteObj {
}
}

#[derive(Queryable, Debug, Identifiable)]
#[derive(QueryableByName, Queryable, Debug, Identifiable)]
#[table_name = "pay_debit_note_event_read"]
#[primary_key(debit_note_id, event_type)]
pub struct ReadObj {
Expand Down
16 changes: 11 additions & 5 deletions core/payment/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ impl PaymentProcessor {
.await
{
Ok(res) => {
resolve_time_ms = operation_start.elapsed().as_secs_f64() / 1000.0;
resolve_time_ms = operation_start.elapsed().as_secs_f64() * 1000.0;
order_id = res;
}
Err(err) => {
Expand Down Expand Up @@ -735,7 +735,7 @@ impl PaymentProcessor {
.await
{
Ok(()) => {
send_time_ms = send_time_now.elapsed().as_secs_f64() / 1000.0;
send_time_ms = send_time_now.elapsed().as_secs_f64() * 1000.0;
}
Err(err) => {
log::error!("Error when closing deposits {}", err);
Expand Down Expand Up @@ -1273,9 +1273,15 @@ impl PaymentProcessor {
let active = dao
.get_for_address(platform.clone(), address.clone(), Some(false))
.await?;
let past = dao
.get_for_address(platform.clone(), address.clone(), Some(true))
.await?;
let past = if deposit.is_some() {
//todo this is huge performance problem, which should be addressed when dealing with deposits
dao.get_for_address(platform.clone(), address.clone(), Some(true))
.await?
} else {
//no need to get past allocations if no deposit is provided
//fix performance issue when running yagna for a long time
vec![]
};

(active, past)
};
Expand Down
Loading
Loading