Skip to content

Commit

Permalink
fix: assign worker id to avoid task duplication (#241)
Browse files Browse the repository at this point in the history
  • Loading branch information
sagojez authored Feb 7, 2025
1 parent bf325eb commit 04585db
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 13 deletions.
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions api/src/logic/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ impl RequestExt for CreateRequest {
Some(Task {
id: Id::now(IdPrefix::Task),
start_time: Utc::now().timestamp_millis(),
worker_id: 0,
end_time: None,
payload: self.payload.clone(),
endpoint: self.endpoint.clone(),
status: None,
r#await: self.r#await,
log_trail: vec![],
metadata: RecordMetadata::default(),
})
}
Expand Down
3 changes: 1 addition & 2 deletions api/tests/http/crud.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::context::TestServer;
use api::logic::{common_model, tasks, ReadResponse};
use api::logic::{common_model, ReadResponse};
use api::logic::{connection_definition, connection_model_definition, connection_model_schema};
use entities::task::Task;
use entities::{
common_model::CommonModel, connection_definition::ConnectionDefinition,
connection_model_definition::ConnectionModelDefinition,
Expand Down
1 change: 1 addition & 0 deletions entities/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ axum.workspace = true
base64.workspace = true
base64ct.workspace = true
bson.workspace = true
bytes = { version = "1.10.0", features = ["serde"] }
chrono.workspace = true
ctr = "0.9.2"
derive_builder.workspace = true
Expand Down
7 changes: 4 additions & 3 deletions entities/src/domain/event/task.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{record_metadata::RecordMetadata, Id};
use http::StatusCode;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use serde_json::Value;

Expand All @@ -8,13 +8,14 @@ use serde_json::Value;
pub struct Task {
#[serde(rename = "_id")]
pub id: Id,
pub worker_id: i64,
pub start_time: i64,
pub end_time: Option<i64>,
pub payload: Value,
pub endpoint: String,
#[serde(with = "http_serde_ext_ios::status_code::option")]
pub status: Option<StatusCode>,
pub status: Option<String>,
pub r#await: bool,
pub log_trail: Vec<Bytes>,
#[serde(flatten)]
pub metadata: RecordMetadata,
}
40 changes: 34 additions & 6 deletions watchdog/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::config::{self, WatchdogConfig};
use crate::config::WatchdogConfig;
use bson::doc;
use cache::remote::RedisCache;
use chrono::Utc;
Expand Down Expand Up @@ -91,8 +91,9 @@ impl WatchdogClient {
.get_many(
Some(doc! {
"active": true,
"workerId": 0,
"startTime": {
"$lte": Utc::now().timestamp_millis()
"$lte": Utc::now().timestamp_millis(),
}}),
None,
None,
Expand All @@ -101,6 +102,24 @@ impl WatchdogClient {
)
.await?;

tracing::info!("Executing {} tasks", tasks.len());

self.tasks
.update_many(
doc! {
"_id": {
"$in": tasks.iter().map(|t| t.id.to_string()).collect::<Vec<_>>()
}
},
doc! {
"$set": {
"workerId": 1,
"active": false
}
},
)
.await?;

let client = self.client.clone();
let tasks_store = self.tasks.clone();
let timeout = self.watchdog.http_client_timeout_secs;
Expand All @@ -121,8 +140,6 @@ impl WatchdogClient {
}
});

tracing::info!("Executing next batch of tasks");

tokio::time::sleep(Duration::from_secs(
self.watchdog.rate_limiter_refresh_interval,
))
Expand Down Expand Up @@ -152,12 +169,23 @@ async fn execute(

let status = response.status();
let mut stream = response.bytes_stream();
let mut log_trail = vec![];

while let Some(item) = stream.next().await {
tracing::debug!("Response from API {:?}", item);
tracing::info!("Response length from API {:?}", item.map(|b| b.len()));
log_trail.push(item);
}

let log_trail = log_trail
.into_iter()
.filter_map(|x| x.ok())
.collect::<Vec<_>>();

let bson_log_trail = bson::to_bson(&log_trail).map_err(|e| {
error!("Could not convert log trail to BSON: {e}");
InternalError::io_err(e.to_string().as_str(), None)
})?;

tasks_store
.collection
.find_one_and_update(
Expand All @@ -168,7 +196,7 @@ async fn execute(
"$set": {
"status": status.to_string(),
"endTime": Utc::now().timestamp_millis(),
"active": false
"logTrail": bson_log_trail,
}
},
)
Expand Down

0 comments on commit 04585db

Please sign in to comment.