From 04585dbbdba73ee2d1475e5e0cd759bb3373e648 Mon Sep 17 00:00:00 2001 From: Samuel <39674930+sagojez@users.noreply.github.com> Date: Fri, 7 Feb 2025 14:48:43 +0000 Subject: [PATCH] fix: assign worker id to avoid task duplication (#241) --- Cargo.lock | 8 +++++-- api/src/logic/tasks.rs | 2 ++ api/tests/http/crud.rs | 3 +-- entities/Cargo.toml | 1 + entities/src/domain/event/task.rs | 7 +++--- watchdog/src/client.rs | 40 ++++++++++++++++++++++++++----- 6 files changed, 48 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a305f8c9..e474db49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -610,9 +610,12 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.8.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" +checksum = "f61dac84819c6588b558454b194026eb1f09c293b9036ae9b159e74e73ab6cf9" +dependencies = [ + "serde", +] [[package]] name = "cache" @@ -1335,6 +1338,7 @@ dependencies = [ "base64 0.21.7", "base64ct", "bson", + "bytes", "chacha20poly1305", "chrono", "ctr", diff --git a/api/src/logic/tasks.rs b/api/src/logic/tasks.rs index 514c89ed..977b93aa 100644 --- a/api/src/logic/tasks.rs +++ b/api/src/logic/tasks.rs @@ -42,11 +42,13 @@ impl RequestExt for CreateRequest { Some(Task { id: Id::now(IdPrefix::Task), start_time: Utc::now().timestamp_millis(), + worker_id: 0, end_time: None, payload: self.payload.clone(), endpoint: self.endpoint.clone(), status: None, r#await: self.r#await, + log_trail: vec![], metadata: RecordMetadata::default(), }) } diff --git a/api/tests/http/crud.rs b/api/tests/http/crud.rs index 0ff2cd75..d9a21d91 100644 --- a/api/tests/http/crud.rs +++ b/api/tests/http/crud.rs @@ -1,7 +1,6 @@ use crate::context::TestServer; -use api::logic::{common_model, tasks, ReadResponse}; +use api::logic::{common_model, ReadResponse}; use api::logic::{connection_definition, connection_model_definition, connection_model_schema}; -use entities::task::Task; use entities::{ common_model::CommonModel, connection_definition::ConnectionDefinition, connection_model_definition::ConnectionModelDefinition, diff --git a/entities/Cargo.toml b/entities/Cargo.toml index deab5160..24bf0415 100644 --- a/entities/Cargo.toml +++ b/entities/Cargo.toml @@ -19,6 +19,7 @@ axum.workspace = true base64.workspace = true base64ct.workspace = true bson.workspace = true +bytes = { version = "1.10.0", features = ["serde"] } chrono.workspace = true ctr = "0.9.2" derive_builder.workspace = true diff --git a/entities/src/domain/event/task.rs b/entities/src/domain/event/task.rs index d7289297..dc6cbd1b 100644 --- a/entities/src/domain/event/task.rs +++ b/entities/src/domain/event/task.rs @@ -1,5 +1,5 @@ use crate::{record_metadata::RecordMetadata, Id}; -use http::StatusCode; +use bytes::Bytes; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -8,13 +8,14 @@ use serde_json::Value; pub struct Task { #[serde(rename = "_id")] pub id: Id, + pub worker_id: i64, pub start_time: i64, pub end_time: Option, pub payload: Value, pub endpoint: String, - #[serde(with = "http_serde_ext_ios::status_code::option")] - pub status: Option, + pub status: Option, pub r#await: bool, + pub log_trail: Vec, #[serde(flatten)] pub metadata: RecordMetadata, } diff --git a/watchdog/src/client.rs b/watchdog/src/client.rs index 98b10cec..59efab99 100644 --- a/watchdog/src/client.rs +++ b/watchdog/src/client.rs @@ -1,4 +1,4 @@ -use crate::config::{self, WatchdogConfig}; +use crate::config::WatchdogConfig; use bson::doc; use cache::remote::RedisCache; use chrono::Utc; @@ -91,8 +91,9 @@ impl WatchdogClient { .get_many( Some(doc! { "active": true, + "workerId": 0, "startTime": { - "$lte": Utc::now().timestamp_millis() + "$lte": Utc::now().timestamp_millis(), }}), None, None, @@ -101,6 +102,24 @@ impl WatchdogClient { ) .await?; + tracing::info!("Executing {} tasks", tasks.len()); + + self.tasks + .update_many( + doc! { + "_id": { + "$in": tasks.iter().map(|t| t.id.to_string()).collect::>() + } + }, + doc! { + "$set": { + "workerId": 1, + "active": false + } + }, + ) + .await?; + let client = self.client.clone(); let tasks_store = self.tasks.clone(); let timeout = self.watchdog.http_client_timeout_secs; @@ -121,8 +140,6 @@ impl WatchdogClient { } }); - tracing::info!("Executing next batch of tasks"); - tokio::time::sleep(Duration::from_secs( self.watchdog.rate_limiter_refresh_interval, )) @@ -152,12 +169,23 @@ async fn execute( let status = response.status(); let mut stream = response.bytes_stream(); + let mut log_trail = vec![]; while let Some(item) = stream.next().await { tracing::debug!("Response from API {:?}", item); - tracing::info!("Response length from API {:?}", item.map(|b| b.len())); + log_trail.push(item); } + let log_trail = log_trail + .into_iter() + .filter_map(|x| x.ok()) + .collect::>(); + + let bson_log_trail = bson::to_bson(&log_trail).map_err(|e| { + error!("Could not convert log trail to BSON: {e}"); + InternalError::io_err(e.to_string().as_str(), None) + })?; + tasks_store .collection .find_one_and_update( @@ -168,7 +196,7 @@ async fn execute( "$set": { "status": status.to_string(), "endTime": Utc::now().timestamp_millis(), - "active": false + "logTrail": bson_log_trail, } }, )