diff --git a/Cargo.lock b/Cargo.lock index 3301be4d..54909907 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1642,6 +1642,7 @@ dependencies = [ "reqwest-tracing", "serde", "serde_json", + "strum", "tempfile", "tokio", "tokio-util", diff --git a/integrationos-archiver/Cargo.toml b/integrationos-archiver/Cargo.toml index c132d952..30a93f93 100644 --- a/integrationos-archiver/Cargo.toml +++ b/integrationos-archiver/Cargo.toml @@ -20,6 +20,7 @@ reqwest-retry = "0.6.1" reqwest-tracing = "0.5.3" serde.workspace = true serde_json.workspace = true +strum = { workspace = true, features = ["derive"] } tempfile = "3.12.0" tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } tokio-util = "0.7.11" diff --git a/integrationos-archiver/src/config.rs b/integrationos-archiver/src/config.rs index f69d24d6..463b9611 100644 --- a/integrationos-archiver/src/config.rs +++ b/integrationos-archiver/src/config.rs @@ -1,6 +1,15 @@ use envconfig::Envconfig; use integrationos_domain::database::DatabaseConfig; use std::fmt::{Display, Formatter}; +use strum::{AsRefStr, EnumString}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumString, AsRefStr)] +#[strum(serialize_all = "kebab-case")] +pub enum Mode { + Dump, + DumpDelete, + Restore, +} #[derive(Envconfig, Clone)] pub struct ArchiverConfig { @@ -19,6 +28,8 @@ pub struct ArchiverConfig { pub read_buffer_size: usize, #[envconfig(from = "PROCESSING_CHUNK_TIMEOUT_SECS", default = "30")] pub processing_chunk_timeout_secs: u64, + #[envconfig(from = "MODE", default = "dump")] + pub mode: Mode, } impl Display for ArchiverConfig { @@ -28,12 +39,13 @@ impl Display for ArchiverConfig { writeln!(f, "GS_STORAGE_BUCKET: {}", self.gs_storage_bucket)?; writeln!(f, "GS_STORAGE_URI: {}", self.gs_storage_uri)?; writeln!(f, "MAX_RETRIES: {}", self.max_retries)?; - write!( + writeln!( f, "PROCESSING_CHUNK_TIMEOUT_SECS: {}", self.processing_chunk_timeout_secs )?; writeln!(f, "READ_BUFFER_SIZE_BYTES: {}", self.read_buffer_size)?; + writeln!(f, "MODE: {}", self.mode.as_ref())?; write!(f, "{}", self.db_config) } } diff --git a/integrationos-archiver/src/event/completed.rs b/integrationos-archiver/src/event/completed.rs index 32767726..9ae3e1e3 100644 --- a/integrationos-archiver/src/event/completed.rs +++ b/integrationos-archiver/src/event/completed.rs @@ -1,5 +1,5 @@ use super::EventMetadata; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, NaiveDate, Utc}; use integrationos_domain::Id; use serde::{Deserialize, Serialize}; @@ -19,6 +19,16 @@ impl Completed { completed_at: Utc::now(), } } + + pub fn date(&self) -> NaiveDate { + self.completed_at.date_naive() + } + + #[cfg(test)] + pub fn with_date(mut self, date: DateTime) -> Self { + self.completed_at = date; + self + } } impl EventMetadata for Completed { diff --git a/integrationos-archiver/src/event/dumped.rs b/integrationos-archiver/src/event/dumped.rs index feb3abb9..9a95d28d 100644 --- a/integrationos-archiver/src/event/dumped.rs +++ b/integrationos-archiver/src/event/dumped.rs @@ -1,5 +1,5 @@ use super::EventMetadata; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, NaiveDate, Utc}; use integrationos_domain::Id; use serde::{Deserialize, Serialize}; @@ -17,6 +17,10 @@ impl Dumped { dumped_at: Utc::now(), } } + + pub fn date(&self) -> NaiveDate { + self.dumped_at.date_naive() + } } impl EventMetadata for Dumped { diff --git a/integrationos-archiver/src/event/failed.rs b/integrationos-archiver/src/event/failed.rs index 4c74b424..41cd2ce1 100644 --- a/integrationos-archiver/src/event/failed.rs +++ b/integrationos-archiver/src/event/failed.rs @@ -1,5 +1,5 @@ use super::EventMetadata; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, NaiveDate, Utc}; use integrationos_domain::Id; use serde::{Deserialize, Serialize}; @@ -19,6 +19,10 @@ impl Failed { failed_at: Utc::now(), } } + + pub fn date(&self) -> NaiveDate { + self.failed_at.date_naive() + } } impl EventMetadata for Failed { diff --git a/integrationos-archiver/src/event/mod.rs b/integrationos-archiver/src/event/mod.rs index 6e490dce..aecc0ba5 100644 --- a/integrationos-archiver/src/event/mod.rs +++ b/integrationos-archiver/src/event/mod.rs @@ -2,13 +2,16 @@ pub mod completed; pub mod dumped; pub mod failed; pub mod started; +pub mod uploaded; +use chrono::NaiveDate; use completed::Completed; use dumped::Dumped; use failed::Failed; use integrationos_domain::Id; use serde::{Deserialize, Serialize}; use started::Started; +use uploaded::Uploaded; pub trait EventMetadata { fn reference(&self) -> Id; @@ -20,5 +23,18 @@ pub enum Event { Started(Started), Dumped(Dumped), Failed(Failed), + Uploaded(Uploaded), Completed(Completed), } + +impl Event { + pub fn date(&self) -> NaiveDate { + match self { + Event::Started(e) => e.date(), + Event::Dumped(e) => e.date(), + Event::Failed(e) => e.date(), + Event::Uploaded(e) => e.date(), + Event::Completed(e) => e.date(), + } + } +} diff --git a/integrationos-archiver/src/event/started.rs b/integrationos-archiver/src/event/started.rs index a30b04d2..f3a9cf02 100644 --- a/integrationos-archiver/src/event/started.rs +++ b/integrationos-archiver/src/event/started.rs @@ -1,6 +1,9 @@ +use std::str::FromStr; + use super::EventMetadata; -use chrono::{DateTime, Utc}; -use integrationos_domain::{prefix::IdPrefix, Id}; +use anyhow::Result; +use chrono::{DateTime, NaiveDate, Utc}; +use integrationos_domain::{prefix::IdPrefix, Id, Store}; use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize, Clone)] @@ -9,14 +12,25 @@ pub struct Started { #[serde(rename = "_id")] id: Id, started_at: DateTime, + collection: Store, } impl Started { - pub fn new() -> Self { - Self { + pub fn new(collection: String) -> Result { + let store = Store::from_str(&collection).map_err(|e| anyhow::anyhow!(e))?; + Ok(Self { id: Id::now(IdPrefix::Archive), started_at: Utc::now(), - } + collection: store, + }) + } + + pub fn collection(&self) -> &Store { + &self.collection + } + + pub fn date(&self) -> NaiveDate { + self.started_at.date_naive() } } diff --git a/integrationos-archiver/src/event/uploaded.rs b/integrationos-archiver/src/event/uploaded.rs new file mode 100644 index 00000000..5cf98729 --- /dev/null +++ b/integrationos-archiver/src/event/uploaded.rs @@ -0,0 +1,30 @@ +use super::EventMetadata; +use chrono::{DateTime, NaiveDate, Utc}; +use integrationos_domain::Id; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Uploaded { + id: Id, + uploaded_at: DateTime, +} + +impl Uploaded { + pub fn new(id: Id) -> Self { + Self { + id, + uploaded_at: Utc::now(), + } + } + + pub fn date(&self) -> NaiveDate { + self.uploaded_at.date_naive() + } +} + +impl EventMetadata for Uploaded { + fn reference(&self) -> Id { + self.id + } +} diff --git a/integrationos-archiver/src/main.rs b/integrationos-archiver/src/main.rs index e1d6ce06..d8fe17fc 100644 --- a/integrationos-archiver/src/main.rs +++ b/integrationos-archiver/src/main.rs @@ -1,67 +1,129 @@ mod config; mod event; +mod storage; -use anyhow::{anyhow, Context, Result}; -use bson::doc; -use chrono::{Duration as CDuration, Utc}; -use config::ArchiverConfig; +use anyhow::{anyhow, Result}; +use bson::{doc, Document}; +use chrono::{DateTime, Duration as CDuration, Utc}; +use config::{ArchiverConfig, Mode}; use envconfig::Envconfig; use event::completed::Completed; use event::dumped::Dumped; use event::failed::Failed; use event::started::Started; +use event::uploaded::Uploaded; use event::{Event, EventMetadata}; -use google_cloud_storage::client::{Client as GClient, ClientConfig}; -use google_cloud_storage::http::objects::upload::{UploadObjectRequest, UploadType}; -use google_cloud_storage::http::objects::Object; -use google_cloud_storage::http::resumable_upload_client::ChunkSize; use integrationos_domain::telemetry::{get_subscriber, init_subscriber}; -use integrationos_domain::{MongoStore, Store}; -use mongodb::Client; -use reqwest_middleware::ClientBuilder; -use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; -use reqwest_tracing::TracingMiddleware; -use std::future::Future; -use std::path::{Path, PathBuf}; +use integrationos_domain::{MongoStore, Store, Unit}; +use mongodb::options::FindOneOptions; +use mongodb::{Client, Database}; use std::process::Command; -use std::time::Duration; +use storage::google_cloud::GoogleCloudStorage; +use storage::{Extension, Storage}; use tempfile::TempDir; -use tokio::fs::File; -use tokio::io::{AsyncBufReadExt, BufReader}; #[tokio::main] -async fn main() -> Result<()> { +async fn main() -> Result { let config = ArchiverConfig::init_from_env()?; - let retry_policy = ExponentialBackoff::builder().build_with_max_retries(config.max_retries); - let client = reqwest::Client::default(); - let middleware = ClientBuilder::new(client) - .with(RetryTransientMiddleware::new_with_policy(retry_policy)) - .with(TracingMiddleware::default()) - .build(); - let storage = GClient::new( - ClientConfig { - http: Some(middleware), - ..Default::default() - } - .with_auth() - .await?, - ); + let storage = GoogleCloudStorage::new(&config).await?; let subscriber = get_subscriber("archiver".into(), "info".into(), std::io::stdout); init_subscriber(subscriber); tracing::info!("Starting archiver with config:\n{config}"); - let client = Client::with_uri_str(&config.db_config.control_db_url).await?; - let database = client.database(&config.db_config.control_db_name); + let client = Client::with_uri_str(&config.db_config.event_db_url).await?; + let database = client.database(&config.db_config.event_db_name); let archives: MongoStore = MongoStore::new(&database, &Store::Archives).await?; - let started = Started::new(); + let started = Started::new(config.event_collection_name.clone())?; archives .create_one(&Event::Started(started.clone())) .await?; - let saved = save(config, &archives, storage, &started).await; + match config.mode { + Mode::Restore => restore(config, &archives, &started, storage).await, + Mode::Dump => dump(config, &archives, &started, storage, database, false).await, + Mode::DumpDelete => dump(config, &archives, &started, storage, database, true).await, + } +} + +async fn restore( + config: ArchiverConfig, + archives: &MongoStore, + started: &Started, + storage: impl Storage, +) -> Result { + tracing::info!( + "Starting archiver in restore mode for the {} collection. Events will not be restored to the original collection, rather a new collection will be created", + started.collection() + ); + + let filter = doc! { + "completedAt": { + "$exists": true, + } + }; + let options = FindOneOptions::builder() + .sort(doc! { "completedAt": -1 }) + .build(); + let archive = archives.collection.find_one(filter, options).await?; + + match archive { + None => Err(anyhow!( + "No archive found for the collection {}", + started.collection() + )), + Some(event) => { + let archive_bson_file_path = storage + .download_file(&config, &event, &Extension::Bson) + .await?; + + // * Restore: mongorestore --gzip --nsInclude=events-service.clients events-service/clients.bson.gz --verbose (nsInclude=${DB_NAME}.${COLLECTION_NAME}) + tracing::info!("Restoring collection {}", config.event_collection_name); + let output = Command::new("mongorestore") + .arg("--gzip") + .arg("--nsInclude") + .arg(format!( + "{}.{}", + config.db_config.event_db_name, config.event_collection_name + )) + .arg(archive_bson_file_path) + .arg("--verbose") + .output()?; + + if !output.status.success() { + anyhow::bail!( + "Archive restore failed with status {}", + String::from_utf8_lossy(&output.stderr) + ); + } + + tracing::info!( + "Collection {} restored successfully", + config.event_collection_name + ); + + Ok(()) + } + } +} + +async fn dump( + config: ArchiverConfig, + archives: &MongoStore, + started: &Started, + storage: impl Storage, + database: Database, + destructive: bool, +) -> Result { + tracing::info!( + "Starting archiver in dump mode for the {} collection", + started.collection() + ); + + let date = Utc::now() - CDuration::days(30); + let saved = save(config, archives, storage, started, &date).await; if let Err(e) = saved { archives @@ -78,24 +140,36 @@ async fn main() -> Result<()> { tracing::info!("Archive saved successfully"); + if destructive { + tracing::warn!("Deleting old events as destructive mode is enabled"); + let store: MongoStore = MongoStore::new(&database, started.collection()).await?; + + let filter = doc! { + "createdAt": { "$lt": date.timestamp_millis() } + }; + + store.collection.delete_many(filter, None).await?; + tracing::warn!("Old events deleted successfully"); + } + Ok(()) } async fn save( config: ArchiverConfig, archive: &MongoStore, - storage: GClient, + storage: impl Storage, started: &Started, -) -> Result<()> { + date: &DateTime, +) -> Result { let tmp_dir = TempDir::new()?; - let filter = - doc! { "createdAt": { "$lt": (Utc::now() - CDuration::days(30)).timestamp_millis() } }; + let filter = doc! { "createdAt": { "$lt": date.timestamp_millis() } }; let command = Command::new("mongodump") .arg("--uri") - .arg(&config.db_config.control_db_url) + .arg(&config.db_config.event_db_url) .arg("--db") - .arg(&config.db_config.control_db_name) + .arg(&config.db_config.event_db_name) .arg("--collection") .arg(&config.event_collection_name) .arg("--query") @@ -118,11 +192,21 @@ async fn save( .join(&config.db_config.event_db_name) .join(&config.event_collection_name); - if let Err(e) = upload_file(&base_path, "bson.gz", &config, &storage).await { + if let Err(e) = storage + .upload_file(&base_path, &Extension::Bson, &config) + .await + { return Err(anyhow!("Failed to upload bson file: {e}")); } - if let Err(e) = upload_file(&base_path, "metadata.json.gz", &config, &storage).await { + archive + .create_one(&Event::Uploaded(Uploaded::new(started.reference()))) + .await?; + + if let Err(e) = storage + .upload_file(&base_path, &Extension::Metadata, &config) + .await + { return Err(anyhow!("Failed to upload json file: {e}")); } @@ -144,186 +228,3 @@ async fn save( Ok(()) } - -#[derive(Debug)] -struct Chunk { - data: Vec, - first_byte: u64, - last_byte: u64, -} - -impl Chunk { - fn first_byte(&self) -> u64 { - self.first_byte - } - - fn last_byte(&self) -> u64 { - self.last_byte - } -} - -async fn upload_file( - base_path: &Path, - extension: &str, - config: &ArchiverConfig, - storage: &GClient, -) -> Result<()> { - let path = base_path.with_extension(extension); - let total = path.metadata()?.len(); - - let upload_type = UploadType::Multipart(Box::new(Object { - name: get_file_name(&path)?, - ..Default::default() - })); - - let uploader = storage - .prepare_resumable_upload( - &UploadObjectRequest { - bucket: config.gs_storage_bucket.clone(), - ..Default::default() - }, - &upload_type, - ) - .await?; - - process_file_in_chunks( - &path, - config.read_buffer_size, - Duration::from_secs(config.processing_chunk_timeout_secs), - |chunk| async { - let size = ChunkSize::new(chunk.first_byte(), chunk.last_byte(), Some(total)); - uploader.upload_multiple_chunk(chunk.data, &size).await?; - Ok(()) - }, - ) - .await?; - - Ok(()) -} - -async fn process_file_in_chunks( - file_path: &PathBuf, - chunk_size: usize, - timeout: Duration, - process_chunk: F, -) -> Result<()> -where - F: Fn(Chunk) -> Fut + Send, - Fut: Future> + Send, -{ - let file = File::open(file_path).await?; - let mut buffered_reader = BufReader::with_capacity(chunk_size, file); - - let mut current_position: u64 = 0; - - loop { - let chunk = buffered_reader.fill_buf().await?; - let chunk_length = chunk.len(); - - if chunk_length == 0 { - break; - } - - let first_byte = current_position; - let last_byte = current_position + chunk_length as u64 - 1; - - let chunk = Chunk { - data: chunk.to_vec(), - first_byte, - last_byte, - }; - - current_position = last_byte + 1; - - tokio::time::timeout(timeout, async { process_chunk(chunk).await }).await??; - - tracing::debug!("Processed chunk of size {}", chunk_length); - - buffered_reader.consume(chunk_length); - } - - Ok(()) -} - -fn get_file_name(path: &Path) -> Result { - let file_name = path - .file_name() - .context("Missing file name")? - .to_str() - .context("Invalid file name: {path:?}")?; - - let timestamp = Utc::now().format("%Y-%m-%d"); - let file_name = format!("{}-{}", timestamp, file_name); - - Ok(file_name) -} - -// TODO: -// * Restore: mongorestore --gzip --nsInclude=events-service.clients events-service/clients.bson.gz --verbose (nsInclude=${DB_NAME}.${COLLECTION_NAME}) -// * Delete: Remove the events that are older than 30 days because they are guaranteed to be already in the snapshot - -#[cfg(test)] -mod tests { - use super::*; - use fake::{Fake, Faker}; - use std::{ - io::Write, - sync::{Arc, Mutex}, - }; - use tempfile::NamedTempFile; - - #[test] - fn test_get_file_name() { - let string: String = Faker.fake(); - let file_name = get_file_name(&PathBuf::from(string)).expect("Failed to get file name"); - let now = Utc::now().format("%Y-%m-%d").to_string(); - assert!(file_name.contains('-')); - assert!(file_name.contains(now.as_str())); - } - - #[tokio::test] - async fn test_process_file_in_chunks() { - let mut temp_file = NamedTempFile::new().expect("Failed to create temp file"); - let content = b"abcdefghijklmnopqrstuvwxyz0123456789"; // 36 bytes - temp_file - .write_all(content) - .expect("Failed to write to temp file"); - - let path = temp_file.path().to_path_buf(); // Keep the temp file open - - let chunk_size = 10; - - let chunks = Arc::new(Mutex::new(Vec::new())); - let chunks_ref = Arc::clone(&chunks); - - process_file_in_chunks(&path, chunk_size, Duration::from_secs(30), |chunk| { - let chunks = Arc::clone(&chunks_ref); - async move { - let mut chunks = chunks.lock().expect("Failed to lock chunks"); - chunks.push((chunk.first_byte(), chunk.last_byte(), chunk.data.clone())); - Ok(()) - } - }) - .await - .expect("Failed to process file"); - - let chunks = chunks.lock().expect("Failed to lock chunks"); - assert_eq!(chunks.len(), 4); - - assert_eq!(chunks[0].0, 0); - assert_eq!(chunks[0].1, 9); - assert_eq!(chunks[0].2, b"abcdefghij".to_vec()); - - assert_eq!(chunks[1].0, 10); - assert_eq!(chunks[1].1, 19); - assert_eq!(chunks[1].2, b"klmnopqrst".to_vec()); - - assert_eq!(chunks[2].0, 20); - assert_eq!(chunks[2].1, 29); - assert_eq!(chunks[2].2, b"uvwxyz0123".to_vec()); - - assert_eq!(chunks[3].0, 30); - assert_eq!(chunks[3].1, 35); - assert_eq!(chunks[3].2, b"456789".to_vec()); - } -} diff --git a/integrationos-archiver/src/storage/google_cloud.rs b/integrationos-archiver/src/storage/google_cloud.rs new file mode 100644 index 00000000..f3a68e02 --- /dev/null +++ b/integrationos-archiver/src/storage/google_cloud.rs @@ -0,0 +1,401 @@ +use anyhow::{anyhow, Context, Result}; +use chrono::{NaiveDate, Utc}; +use futures::StreamExt; +use google_cloud_storage::client::{Client as GClient, ClientConfig}; +use google_cloud_storage::http::objects::download::Range; +use google_cloud_storage::http::objects::get::GetObjectRequest; +use google_cloud_storage::http::objects::list::ListObjectsRequest; +use google_cloud_storage::http::objects::upload::{UploadObjectRequest, UploadType}; +use google_cloud_storage::http::objects::Object; +use google_cloud_storage::http::resumable_upload_client::ChunkSize; +use integrationos_domain::Unit; +use reqwest_middleware::ClientBuilder; +use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; +use reqwest_tracing::TracingMiddleware; +use std::future::Future; +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::time::Duration; +use tempfile::Builder; +use tokio::fs::File; +use tokio::io::{AsyncBufReadExt, BufReader}; + +use crate::config::ArchiverConfig; +use crate::event::Event; +use crate::storage::Chunk; +use crate::Extension; + +use super::{ArchiveName, Storage}; + +pub struct GoogleCloudStorage { + client: GClient, +} + +impl GoogleCloudStorage { + pub async fn new(config: &ArchiverConfig) -> Result { + let retry_policy = ExponentialBackoff::builder().build_with_max_retries(config.max_retries); + let client = reqwest::Client::default(); + let middleware = ClientBuilder::new(client) + .with(RetryTransientMiddleware::new_with_policy(retry_policy)) + .with(TracingMiddleware::default()) + .build(); + let storage = GClient::new( + ClientConfig { + http: Some(middleware), + ..Default::default() + } + .with_auth() + .await?, + ); + + Ok(GoogleCloudStorage { client: storage }) + } +} + +impl Storage for GoogleCloudStorage { + async fn upload_file( + &self, + base_path: &Path, + extension: &Extension, + config: &ArchiverConfig, + ) -> Result { + upload_file_google(base_path, extension, config, &self.client).await + } + + async fn download_file( + &self, + config: &ArchiverConfig, + event: &Event, + extension: &Extension, + ) -> Result { + download_file_google(config, &self.client, event, extension).await + } +} + +async fn download_file_google( + config: &ArchiverConfig, + storage: &GClient, + event: &Event, + extension: &Extension, +) -> Result { + let objects = storage + .list_objects(&ListObjectsRequest { + bucket: config.gs_storage_bucket.clone(), + ..Default::default() + }) + .await?; + + let names = objects + .items + .into_iter() + .flat_map(|object| object.into_iter().map(|o| o.name)) + .collect::>(); + + tracing::info!("Found {:?} objects in the bucket", names); + + let archive_name = find_latest_archive(&names, config, event, extension)?; + + let mut download = storage + .download_streamed_object( + &GetObjectRequest { + bucket: config.gs_storage_bucket.clone(), + object: archive_name.name(), + ..Default::default() + }, + &Range::default(), + ) + .await?; + + let archive_file = Builder::new() + .suffix(&archive_name.extension.with_leading_dot()) + .prefix(&format!( + "{}.{}-", + config.event_collection_name, + archive_name.date.format("%Y-%m-%d") + )) + .tempfile()? + .keep()?; // If the tempfile is dropped, the file will be deleted + + let path = archive_file.1; + let mut archive_file = archive_file.0; + + // TODO: Eventually we should decide if there's value in also restoring the collection metadata(e.g. indexes) + while let Some(result) = download.next().await { + match result { + Ok(bytes) => { + archive_file + .write_all(&bytes) + .context("Failed to write archive")?; + + archive_file + .flush() + .context("Failed to flush archive metadata")?; + } + Err(e) => anyhow::bail!("Error downloading archive: {}", e), + } + } + + Ok(path.to_path_buf()) +} + +async fn upload_file_google( + base_path: &Path, + extension: &Extension, + config: &ArchiverConfig, + storage: &GClient, +) -> Result { + let path = base_path.with_extension(extension.as_ref()); + let total = path.metadata()?.len(); + + let upload_type = UploadType::Multipart(Box::new(Object { + name: get_file_name(&path)?, + ..Default::default() + })); + + let uploader = storage + .prepare_resumable_upload( + &UploadObjectRequest { + bucket: config.gs_storage_bucket.clone(), + ..Default::default() + }, + &upload_type, + ) + .await?; + + process_file_in_chunks( + &path, + config.read_buffer_size, + Duration::from_secs(config.processing_chunk_timeout_secs), + |chunk| async { + let size = ChunkSize::new(chunk.first_byte(), chunk.last_byte(), Some(total)); + uploader.upload_multiple_chunk(chunk.data, &size).await?; + Ok(()) + }, + ) + .await?; + + Ok(()) +} + +async fn process_file_in_chunks( + file_path: &PathBuf, + chunk_size: usize, + timeout: Duration, + process_chunk: F, +) -> Result +where + F: Fn(Chunk) -> Fut + Send, + Fut: Future> + Send, +{ + let file = File::open(file_path).await?; + let mut buffered_reader = BufReader::with_capacity(chunk_size, file); + + let mut current_position: u64 = 0; + + loop { + let chunk = buffered_reader.fill_buf().await?; + let chunk_length = chunk.len(); + + if chunk_length == 0 { + break; + } + + let first_byte = current_position; + let last_byte = current_position + chunk_length as u64 - 1; + + let chunk = Chunk { + data: chunk.to_vec(), + first_byte, + last_byte, + }; + + current_position = last_byte + 1; + + tokio::time::timeout(timeout, async { process_chunk(chunk).await }).await??; + + tracing::debug!("Processed chunk of size {}", chunk_length); + + buffered_reader.consume(chunk_length); + } + + Ok(()) +} + +fn get_file_name(path: &Path) -> Result { + let file_name = path + .file_name() + .context("Missing file name")? + .to_str() + .context("Invalid file name: {path:?}")?; + + let timestamp = Utc::now().format("%Y-%m-%d"); + let file_name = format!("{}-{}", timestamp, file_name); + + Ok(file_name) +} + +fn parse_archive_name( + name: &str, + config: &ArchiverConfig, + extension: &Extension, +) -> Option { + let expected_suffix = format!( + "-{}{}", + config.event_collection_name, + extension.with_leading_dot() + ); + + if let Some(point) = name.rfind(&expected_suffix) { + if point > 0 && point + expected_suffix.len() <= name.len() { + let (date_str, file_name) = name.split_at(point); + match NaiveDate::parse_from_str(date_str, "%Y-%m-%d") { + Ok(date) => { + let file_name = file_name[1..].to_string(); // Skip the leading hyphen + if file_name + == format!( + "{}{}", + config.event_collection_name, + extension.with_leading_dot() + ) + { + return Some(ArchiveName { + date, + name: config.event_collection_name.clone(), + extension: *extension, + }); + } + } + Err(e) => tracing::warn!("Invalid date: {}", e), + } + } else { + tracing::warn!("Invalid archive name: {}", name); + } + } else { + tracing::warn!("Expected suffix not found in archive name: {}", name); + } + + None +} + +fn find_latest_archive( + names: &[String], + config: &ArchiverConfig, + event: &Event, + extension: &Extension, +) -> Result { + names + .iter() + .flat_map(|name| parse_archive_name(name, config, extension)) + .filter(|archive| archive.date == event.date()) + .max_by_key(|archive| archive.date) + .ok_or_else(|| anyhow!("No valid archive found. Please check the date you are restoring is the same as the event date. Or that there are any archived events for this collection.")) +} + +#[cfg(test)] +mod tests { + use crate::event::completed::Completed; + + use super::*; + use envconfig::Envconfig; + use fake::{Fake, Faker}; + use integrationos_domain::{prefix::IdPrefix, Id}; + use std::{ + collections::HashMap, + io::Write, + sync::{Arc, Mutex}, + }; + use tempfile::NamedTempFile; + + #[test] + fn test_get_file_name() { + let string: String = Faker.fake(); + let file_name = get_file_name(&PathBuf::from(string)).expect("Failed to get file name"); + let now = Utc::now().format("%Y-%m-%d").to_string(); + assert!(file_name.contains('-')); + assert!(file_name.contains(now.as_str())); + } + + #[test] + fn test_find_latest_archive() { + let config = ArchiverConfig::init_from_hashmap(&HashMap::from_iter(vec![ + ( + "EVENT_DATABASE_URL".to_string(), + "mongodb://localhost:27017".to_string(), + ), + ("EVENT_COLLECTION_NAME".to_string(), "clients".to_string()), + ])) + .expect("Failed to initialize archiver config"); + + let date = Utc::now(); + + let completed = Completed::new( + "events-service/events.bson.gz".to_string(), + Id::new(IdPrefix::Archive, date), + ) + .with_date(date); + + let event = Event::Completed(completed); + + let extension = Extension::Bson; + + let archive_name = find_latest_archive( + &[ + "2024-08-27-clients.bson.gz".to_string(), + "2024-08-27-clients.metadata.json.gz".to_string(), + ], + &config, + &event, + &extension, + ) + .expect("Failed to find latest archive"); + + assert_eq!(archive_name.name, "clients".to_string()); + assert_eq!(archive_name.extension, Extension::Bson); + } + + #[tokio::test] + async fn test_process_file_in_chunks() { + let mut temp_file = NamedTempFile::new().expect("Failed to create temp file"); + let content = b"abcdefghijklmnopqrstuvwxyz0123456789"; // 36 bytes + temp_file + .write_all(content) + .expect("Failed to write to temp file"); + + let path = temp_file.path().to_path_buf(); // Keep the temp file open + + let chunk_size = 10; + + let chunks = Arc::new(Mutex::new(Vec::new())); + let chunks_ref = Arc::clone(&chunks); + + process_file_in_chunks(&path, chunk_size, Duration::from_secs(30), |chunk| { + let chunks = Arc::clone(&chunks_ref); + async move { + let mut chunks = chunks.lock().expect("Failed to lock chunks"); + chunks.push((chunk.first_byte(), chunk.last_byte(), chunk.data.clone())); + Ok(()) + } + }) + .await + .expect("Failed to process file"); + + let chunks = chunks.lock().expect("Failed to lock chunks"); + assert_eq!(chunks.len(), 4); + + assert_eq!(chunks[0].0, 0); + assert_eq!(chunks[0].1, 9); + assert_eq!(chunks[0].2, b"abcdefghij".to_vec()); + + assert_eq!(chunks[1].0, 10); + assert_eq!(chunks[1].1, 19); + assert_eq!(chunks[1].2, b"klmnopqrst".to_vec()); + + assert_eq!(chunks[2].0, 20); + assert_eq!(chunks[2].1, 29); + assert_eq!(chunks[2].2, b"uvwxyz0123".to_vec()); + + assert_eq!(chunks[3].0, 30); + assert_eq!(chunks[3].1, 35); + assert_eq!(chunks[3].2, b"456789".to_vec()); + } +} diff --git a/integrationos-archiver/src/storage/mod.rs b/integrationos-archiver/src/storage/mod.rs new file mode 100644 index 00000000..1f0cc641 --- /dev/null +++ b/integrationos-archiver/src/storage/mod.rs @@ -0,0 +1,92 @@ +pub mod google_cloud; + +use crate::{config::ArchiverConfig, event::Event}; +use anyhow::Result; +use chrono::NaiveDate; +use integrationos_domain::Unit; +use std::{ + future::Future, + ops::Deref, + path::{Path, PathBuf}, +}; + +#[derive(Debug)] +pub struct Chunk { + data: Vec, + first_byte: u64, + last_byte: u64, +} + +impl Chunk { + fn first_byte(&self) -> u64 { + self.first_byte + } + + fn last_byte(&self) -> u64 { + self.last_byte + } +} + +#[derive(Debug)] +struct ArchiveName { + date: NaiveDate, + name: String, + extension: Extension, +} + +impl ArchiveName { + fn name(&self) -> String { + format!( + "{}-{}.{}", + self.date.format("%Y-%m-%d"), + self.name, + self.extension.as_ref() + ) + } +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum Extension { + Bson, + Metadata, +} + +impl Extension { + /// Returns the file extension for the given extension with the leading dot + fn with_leading_dot(self) -> String { + ".".to_owned() + self.as_ref() + } +} + +impl AsRef for Extension { + fn as_ref(&self) -> &str { + match self { + Extension::Bson => "bson.gz", + Extension::Metadata => "metadata.json.gz", + } + } +} + +impl Deref for Extension { + type Target = str; + + fn deref(&self) -> &Self::Target { + self.as_ref() + } +} + +pub trait Storage { + fn upload_file( + &self, + base_path: &Path, + extension: &Extension, + config: &ArchiverConfig, + ) -> impl Future>; + + fn download_file( + &self, + config: &ArchiverConfig, + event: &Event, + extension: &Extension, + ) -> impl Future>; +} diff --git a/integrationos-domain/src/domain/store/mod.rs b/integrationos-domain/src/domain/store/mod.rs index a2605c83..75526666 100644 --- a/integrationos-domain/src/domain/store/mod.rs +++ b/integrationos-domain/src/domain/store/mod.rs @@ -3,6 +3,7 @@ pub mod cursor; use bson::doc; use serde::{Deserialize, Serialize}; use std::fmt::{Display, Formatter}; +use std::str::FromStr; macro_rules! generate_stores { ($($name:tt, $str:expr),+) => { @@ -20,6 +21,20 @@ macro_rules! generate_stores { write!(f, "{store}") } } + + impl FromStr for Store { + type Err = String; + + fn from_str(s: &str) -> Result { + $( + if s == $str { + return Ok(Store::$name); + } + )+ + + Err(format!("Invalid store name: {}", s)) + } + } }; }