Skip to content

Commit

Permalink
feat: allow restoring and removing archive via env var (#130)
Browse files Browse the repository at this point in the history
  • Loading branch information
sagojez authored Aug 28, 2024
1 parent 9508379 commit 17ed8c8
Show file tree
Hide file tree
Showing 13 changed files with 738 additions and 237 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions integrationos-archiver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ reqwest-retry = "0.6.1"
reqwest-tracing = "0.5.3"
serde.workspace = true
serde_json.workspace = true
strum = { workspace = true, features = ["derive"] }
tempfile = "3.12.0"
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tokio-util = "0.7.11"
Expand Down
14 changes: 13 additions & 1 deletion integrationos-archiver/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
use envconfig::Envconfig;
use integrationos_domain::database::DatabaseConfig;
use std::fmt::{Display, Formatter};
use strum::{AsRefStr, EnumString};

#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumString, AsRefStr)]
#[strum(serialize_all = "kebab-case")]
pub enum Mode {
Dump,
DumpDelete,
Restore,
}

#[derive(Envconfig, Clone)]
pub struct ArchiverConfig {
Expand All @@ -19,6 +28,8 @@ pub struct ArchiverConfig {
pub read_buffer_size: usize,
#[envconfig(from = "PROCESSING_CHUNK_TIMEOUT_SECS", default = "30")]
pub processing_chunk_timeout_secs: u64,
#[envconfig(from = "MODE", default = "dump")]
pub mode: Mode,
}

impl Display for ArchiverConfig {
Expand All @@ -28,12 +39,13 @@ impl Display for ArchiverConfig {
writeln!(f, "GS_STORAGE_BUCKET: {}", self.gs_storage_bucket)?;
writeln!(f, "GS_STORAGE_URI: {}", self.gs_storage_uri)?;
writeln!(f, "MAX_RETRIES: {}", self.max_retries)?;
write!(
writeln!(
f,
"PROCESSING_CHUNK_TIMEOUT_SECS: {}",
self.processing_chunk_timeout_secs
)?;
writeln!(f, "READ_BUFFER_SIZE_BYTES: {}", self.read_buffer_size)?;
writeln!(f, "MODE: {}", self.mode.as_ref())?;
write!(f, "{}", self.db_config)
}
}
12 changes: 11 additions & 1 deletion integrationos-archiver/src/event/completed.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::EventMetadata;
use chrono::{DateTime, Utc};
use chrono::{DateTime, NaiveDate, Utc};
use integrationos_domain::Id;
use serde::{Deserialize, Serialize};

Expand All @@ -19,6 +19,16 @@ impl Completed {
completed_at: Utc::now(),
}
}

pub fn date(&self) -> NaiveDate {
self.completed_at.date_naive()
}

#[cfg(test)]
pub fn with_date(mut self, date: DateTime<Utc>) -> Self {
self.completed_at = date;
self
}
}

impl EventMetadata for Completed {
Expand Down
6 changes: 5 additions & 1 deletion integrationos-archiver/src/event/dumped.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::EventMetadata;
use chrono::{DateTime, Utc};
use chrono::{DateTime, NaiveDate, Utc};
use integrationos_domain::Id;
use serde::{Deserialize, Serialize};

Expand All @@ -17,6 +17,10 @@ impl Dumped {
dumped_at: Utc::now(),
}
}

pub fn date(&self) -> NaiveDate {
self.dumped_at.date_naive()
}
}

impl EventMetadata for Dumped {
Expand Down
6 changes: 5 additions & 1 deletion integrationos-archiver/src/event/failed.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::EventMetadata;
use chrono::{DateTime, Utc};
use chrono::{DateTime, NaiveDate, Utc};
use integrationos_domain::Id;
use serde::{Deserialize, Serialize};

Expand All @@ -19,6 +19,10 @@ impl Failed {
failed_at: Utc::now(),
}
}

pub fn date(&self) -> NaiveDate {
self.failed_at.date_naive()
}
}

impl EventMetadata for Failed {
Expand Down
16 changes: 16 additions & 0 deletions integrationos-archiver/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ pub mod completed;
pub mod dumped;
pub mod failed;
pub mod started;
pub mod uploaded;

use chrono::NaiveDate;
use completed::Completed;
use dumped::Dumped;
use failed::Failed;
use integrationos_domain::Id;
use serde::{Deserialize, Serialize};
use started::Started;
use uploaded::Uploaded;

pub trait EventMetadata {
fn reference(&self) -> Id;
Expand All @@ -20,5 +23,18 @@ pub enum Event {
Started(Started),
Dumped(Dumped),
Failed(Failed),
Uploaded(Uploaded),
Completed(Completed),
}

impl Event {
pub fn date(&self) -> NaiveDate {
match self {
Event::Started(e) => e.date(),
Event::Dumped(e) => e.date(),
Event::Failed(e) => e.date(),
Event::Uploaded(e) => e.date(),
Event::Completed(e) => e.date(),
}
}
}
24 changes: 19 additions & 5 deletions integrationos-archiver/src/event/started.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::str::FromStr;

use super::EventMetadata;
use chrono::{DateTime, Utc};
use integrationos_domain::{prefix::IdPrefix, Id};
use anyhow::Result;
use chrono::{DateTime, NaiveDate, Utc};
use integrationos_domain::{prefix::IdPrefix, Id, Store};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, Clone)]
Expand All @@ -9,14 +12,25 @@ pub struct Started {
#[serde(rename = "_id")]
id: Id,
started_at: DateTime<Utc>,
collection: Store,
}

impl Started {
pub fn new() -> Self {
Self {
pub fn new(collection: String) -> Result<Self> {
let store = Store::from_str(&collection).map_err(|e| anyhow::anyhow!(e))?;
Ok(Self {
id: Id::now(IdPrefix::Archive),
started_at: Utc::now(),
}
collection: store,
})
}

pub fn collection(&self) -> &Store {
&self.collection
}

pub fn date(&self) -> NaiveDate {
self.started_at.date_naive()
}
}

Expand Down
30 changes: 30 additions & 0 deletions integrationos-archiver/src/event/uploaded.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use super::EventMetadata;
use chrono::{DateTime, NaiveDate, Utc};
use integrationos_domain::Id;
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Uploaded {
id: Id,
uploaded_at: DateTime<Utc>,
}

impl Uploaded {
pub fn new(id: Id) -> Self {
Self {
id,
uploaded_at: Utc::now(),
}
}

pub fn date(&self) -> NaiveDate {
self.uploaded_at.date_naive()
}
}

impl EventMetadata for Uploaded {
fn reference(&self) -> Id {
self.id
}
}
Loading

0 comments on commit 17ed8c8

Please sign in to comment.