diff --git a/src/cli.rs b/src/cli.rs index 9a0e1bca2..6363482d7 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -303,6 +303,14 @@ pub struct Options { )] pub ingestor_endpoint: String, + #[arg( + long, + env = "P_INDEXER_ENDPOINT", + default_value = "", + help = "URL to connect to this specific indexer. Default is the address of the server" + )] + pub indexer_endpoint: String, + #[command(flatten)] pub oidc: Option, @@ -404,29 +412,47 @@ impl Options { } /// TODO: refactor and document - pub fn get_url(&self) -> Url { - if self.ingestor_endpoint.is_empty() { - return format!( - "{}://{}", - self.get_scheme(), - self.address - ) - .parse::() // if the value was improperly set, this will panic before hand - .unwrap_or_else(|err| { - panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `:` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address) - }); - } - - let ingestor_endpoint = &self.ingestor_endpoint; + pub fn get_url(&self, mode: Mode) -> Url { + let (endpoint, env_var) = match mode { + Mode::Ingest => { + if self.ingestor_endpoint.is_empty() { + return format!( + "{}://{}", + self.get_scheme(), + self.address + ) + .parse::() // if the value was improperly set, this will panic before hand + .unwrap_or_else(|err| { + panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `:` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address) + }); + } + (&self.ingestor_endpoint, "P_INGESTOR_ENDPOINT") + } + Mode::Index => { + if self.indexer_endpoint.is_empty() { + return format!( + "{}://{}", + self.get_scheme(), + self.address + ) + .parse::() // if the value was improperly set, this will panic before hand + .unwrap_or_else(|err| { + panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `:` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address) + }); + } + (&self.indexer_endpoint, "P_INDEXER_ENDPOINT") + } + _ => panic!("Invalid mode"), + }; - if ingestor_endpoint.starts_with("http") { - panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `:` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint); + if endpoint.starts_with("http") { + panic!("Invalid value `{}`, please set the environement variable `{env_var}` to `:` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", endpoint); } - let addr_from_env = ingestor_endpoint.split(':').collect::>(); + let addr_from_env = endpoint.split(':').collect::>(); if addr_from_env.len() != 2 { - panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `:` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint); + panic!("Invalid value `{}`, please set the environement variable `{env_var}` to `:` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", endpoint); } let mut hostname = addr_from_env[0].to_string(); diff --git a/src/enterprise/utils.rs b/src/enterprise/utils.rs index 130a1f6e6..ba345c10a 100644 --- a/src/enterprise/utils.rs +++ b/src/enterprise/utils.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, path::PathBuf, sync::Arc}; +use chrono::{TimeZone, Utc}; use datafusion::{common::Column, prelude::Expr}; use itertools::Itertools; use relative_path::RelativePathBuf; @@ -119,14 +120,35 @@ pub async fn fetch_parquet_file_paths( selected_files .into_iter() - .map(|file| { + .filter_map(|file| { let date = file.file_path.split("/").collect_vec(); - let date = date.as_slice()[1..4].iter().map(|s| s.to_string()); - - let date = RelativePathBuf::from_iter(date); - - parquet_files.entry(date).or_default().push(file); + let year = &date[1][5..9]; + let month = &date[1][10..12]; + let day = &date[1][13..15]; + let hour = &date[2][5..7]; + let min = &date[3][7..9]; + let file_date = Utc + .with_ymd_and_hms( + year.parse::().unwrap(), + month.parse::().unwrap(), + day.parse::().unwrap(), + hour.parse::().unwrap(), + min.parse::().unwrap(), + 0, + ) + .unwrap(); + + if file_date < time_range.start { + None + } else { + let date = date.as_slice()[1..4].iter().map(|s| s.to_string()); + + let date = RelativePathBuf::from_iter(date); + + parquet_files.entry(date).or_default().push(file); + Some("") + } }) .for_each(|_| {}); diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 3232d54cb..05d56e028 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -51,12 +51,14 @@ use crate::HTTP_CLIENT; use super::base_path_without_preceding_slash; use super::ingest::PostError; use super::logstream::error::StreamError; -use super::modal::IngestorMetadata; +use super::modal::{IndexerMetadata, IngestorMetadata}; use super::rbac::RBACError; use super::role::RoleError; type IngestorMetadataArr = Vec; +type IndexerMetadataArr = Vec; + pub const INTERNAL_STREAM_NAME: &str = "pmeta"; const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1); @@ -616,7 +618,6 @@ pub async fn get_cluster_metrics() -> Result { Ok(actix_web::HttpResponse::Ok().json(dresses)) } -// update the .query.json file and return the new ingestorMetadataArr pub async fn get_ingestor_info() -> anyhow::Result { let store = PARSEABLE.storage.get_object_store(); @@ -635,6 +636,24 @@ pub async fn get_ingestor_info() -> anyhow::Result { Ok(arr) } +pub async fn get_indexer_info() -> anyhow::Result { + let store = PARSEABLE.storage.get_object_store(); + + let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY); + let arr = store + .get_objects( + Some(&root_path), + Box::new(|file_name| file_name.starts_with("indexer")), + ) + .await? + .iter() + // this unwrap will most definateley shoot me in the foot later + .map(|x| serde_json::from_slice::(x).unwrap_or_default()) + .collect_vec(); + + Ok(arr) +} + pub async fn remove_ingestor(ingestor: Path) -> Result { let domain_name = to_url_string(ingestor.into_inner()); diff --git a/src/handlers/http/middleware.rs b/src/handlers/http/middleware.rs index 01162ddca..d329174bd 100644 --- a/src/handlers/http/middleware.rs +++ b/src/handlers/http/middleware.rs @@ -359,22 +359,12 @@ where } Mode::Index => { - let accessable_endpoints = ["create", "delete"]; - let cond = path.split('/').any(|x| accessable_endpoints.contains(&x)); - if !cond { - Box::pin(async { - Err(actix_web::error::ErrorUnauthorized( - "Only Index API can be accessed in Index Mode", - )) - }) - } else { - let fut = self.service.call(req); + let fut = self.service.call(req); - Box::pin(async move { - let res = fut.await?; - Ok(res) - }) - } + Box::pin(async move { + let res = fut.await?; + Ok(res) + }) } } } diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 46f9ebd83..d1b271134 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -28,6 +28,7 @@ use relative_path::RelativePathBuf; use serde_json::Value; use tokio::sync::oneshot; +use crate::option::Mode; use crate::{ analytics, handlers::{ @@ -108,7 +109,7 @@ impl ParseableServer for IngestServer { tokio::spawn(airplane::server()); // write the ingestor metadata to storage - PARSEABLE.store_ingestor_metadata().await?; + PARSEABLE.store_metadata(Mode::Ingest).await?; // Ingestors shouldn't have to deal with OpenId auth flow let result = self.start(shutdown_rx, prometheus.clone(), None).await; @@ -251,7 +252,7 @@ impl IngestServer { // check for querier state. Is it there, or was it there in the past // this should happen before the set the ingestor metadata -async fn check_querier_state() -> anyhow::Result, ObjectStorageError> { +pub async fn check_querier_state() -> anyhow::Result, ObjectStorageError> { // how do we check for querier state? // based on the work flow of the system, the querier will always need to start first // i.e the querier will create the `.parseable.json` file diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index e9aae3e6a..2f1cbded7 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -34,9 +34,10 @@ use tracing::{error, info, warn}; use crate::{ cli::Options, oidc::Claims, + option::Mode, parseable::PARSEABLE, storage::{ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY}, - utils::get_ingestor_id, + utils::{get_indexer_id, get_ingestor_id}, }; use super::{audit, cross_origin_config, health_check, API_BASE_PATH, API_VERSION}; @@ -199,7 +200,7 @@ impl IngestorMetadata { .staging_dir() .read_dir() .expect("Couldn't read from file"); - let url = options.get_url(); + let url = options.get_url(Mode::Ingest); let port = url.port().unwrap_or(80).to_string(); let url = url.to_string(); let Options { @@ -333,6 +334,181 @@ impl IngestorMetadata { } } +#[derive(Debug, Serialize, Deserialize, Default, Clone, Eq, PartialEq)] +pub struct IndexerMetadata { + pub version: String, + pub port: String, + pub domain_name: String, + pub bucket_name: String, + pub token: String, + pub indexer_id: String, + pub flight_port: String, +} + +impl IndexerMetadata { + pub fn new( + port: String, + domain_name: String, + bucket_name: String, + username: &str, + password: &str, + indexer_id: String, + flight_port: String, + ) -> Self { + let token = base64::prelude::BASE64_STANDARD.encode(format!("{username}:{password}")); + + Self { + port, + domain_name, + version: DEFAULT_VERSION.to_string(), + bucket_name, + token: format!("Basic {token}"), + indexer_id, + flight_port, + } + } + + /// Capture metadata information by either loading it from staging or starting fresh + pub fn load(options: &Options, storage: &dyn ObjectStorageProvider) -> Arc { + // all the files should be in the staging directory root + let entries = options + .staging_dir() + .read_dir() + .expect("Couldn't read from file"); + let url = options.get_url(Mode::Index); + let port = url.port().unwrap_or(80).to_string(); + let url = url.to_string(); + let Options { + username, password, .. + } = options; + let staging_path = options.staging_dir(); + let flight_port = options.flight_port.to_string(); + + for entry in entries { + // cause the staging directory will have only one file with indexer in the name + // so the JSON Parse should not error unless the file is corrupted + let path = entry.expect("Should be a directory entry").path(); + if !path + .file_name() + .and_then(|s| s.to_str()) + .is_some_and(|s| s.contains("indexer")) + { + continue; + } + + // get the indexer metadata from staging + let bytes = std::fs::read(path).expect("File should be present"); + let mut meta = + Self::from_bytes(&bytes, options.flight_port).expect("Extracted indexer metadata"); + + // compare url endpoint and port, update + if meta.domain_name != url { + info!( + "Domain Name was Updated. Old: {} New: {}", + meta.domain_name, url + ); + meta.domain_name = url; + } + + if meta.port != port { + info!("Port was Updated. Old: {} New: {}", meta.port, port); + meta.port = port; + } + + let token = format!( + "Basic {}", + BASE64_STANDARD.encode(format!("{username}:{password}")) + ); + if meta.token != token { + // TODO: Update the message to be more informative with username and password + warn!( + "Credentials were Updated. Tokens updated; Old: {} New: {}", + meta.token, token + ); + meta.token = token; + } + meta.put_on_disk(staging_path) + .expect("Couldn't write to disk"); + + return Arc::new(meta); + } + + let storage = storage.get_object_store(); + let meta = Self::new( + port, + url, + storage.get_bucket_name(), + username, + password, + get_indexer_id(), + flight_port, + ); + + meta.put_on_disk(staging_path) + .expect("Should Be valid Json"); + Arc::new(meta) + } + + pub fn get_indexer_id(&self) -> String { + self.indexer_id.clone() + } + + #[inline(always)] + pub fn file_path(&self) -> RelativePathBuf { + RelativePathBuf::from_iter([ + PARSEABLE_ROOT_DIRECTORY, + &format!("indexer.{}.json", self.get_indexer_id()), + ]) + } + + /// Updates json with `flight_port` field if not already present + fn from_bytes(bytes: &[u8], flight_port: u16) -> anyhow::Result { + let mut json: Map = serde_json::from_slice(bytes)?; + json.entry("flight_port") + .or_insert_with(|| Value::String(flight_port.to_string())); + + Ok(serde_json::from_value(Value::Object(json))?) + } + + pub async fn migrate(&self) -> anyhow::Result> { + let imp = self.file_path(); + let bytes = match PARSEABLE.storage.get_object_store().get_object(&imp).await { + Ok(bytes) => bytes, + Err(_) => { + return Ok(None); + } + }; + + let resource = Self::from_bytes(&bytes, PARSEABLE.options.flight_port)?; + let bytes = Bytes::from(serde_json::to_vec(&resource)?); + + resource.put_on_disk(PARSEABLE.options.staging_dir())?; + + PARSEABLE + .storage + .get_object_store() + .put_object(&imp, bytes) + .await?; + + Ok(Some(resource)) + } + + /// Puts the indexer info into the staging. + /// + /// This function takes the indexer info as a parameter and stores it in staging. + /// # Parameters + /// + /// * `staging_path`: Staging root directory. + pub fn put_on_disk(&self, staging_path: &Path) -> anyhow::Result<()> { + let file_name = format!("indexer.{}.json", self.indexer_id); + let file_path = staging_path.join(file_name); + + std::fs::write(file_path, serde_json::to_vec(&self)?)?; + + Ok(()) + } +} + #[cfg(test)] mod test { use actix_web::body::MessageBody; diff --git a/src/lib.rs b/src/lib.rs index 94b81639d..27809aab0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,7 +27,7 @@ mod cli; pub mod connectors; pub mod correlation; pub mod enterprise; -mod event; +pub mod event; pub mod handlers; pub mod hottier; mod livetail; @@ -72,10 +72,10 @@ pub const OBJECT_STORE_DATA_GRANULARITY: u32 = LOCAL_SYNC_INTERVAL.as_secs() as pub const STORAGE_UPLOAD_INTERVAL: Duration = Duration::from_secs(30); // A single HTTP client for all outgoing HTTP requests from the parseable server -static HTTP_CLIENT: Lazy = Lazy::new(|| { +pub static HTTP_CLIENT: Lazy = Lazy::new(|| { ClientBuilder::new() .connect_timeout(Duration::from_secs(3)) // set a timeout of 3s for each connection setup - .timeout(Duration::from_secs(10)) // set a timeout of 10s for each request + .timeout(Duration::from_secs(30)) // set a timeout of 30s for each request .pool_idle_timeout(Duration::from_secs(90)) // set a timeout of 90s for each idle connection .pool_max_idle_per_host(32) // max 32 idle connections per host .gzip(true) // gzip compress for all requests diff --git a/src/metrics/prom_utils.rs b/src/metrics/prom_utils.rs index 1a04c5ba5..244d43dbb 100644 --- a/src/metrics/prom_utils.rs +++ b/src/metrics/prom_utils.rs @@ -19,6 +19,7 @@ use crate::handlers::http::base_path_without_preceding_slash; use crate::handlers::http::ingest::PostError; use crate::handlers::http::modal::IngestorMetadata; +use crate::option::Mode; use crate::parseable::PARSEABLE; use crate::HTTP_CLIENT; use actix_web::http::header; @@ -61,7 +62,8 @@ struct StorageMetrics { impl Default for Metrics { fn default() -> Self { - let url = PARSEABLE.options.get_url(); + // for now it is only for ingestor + let url = PARSEABLE.options.get_url(Mode::Ingest); let address = format!( "http://{}:{}", url.domain() diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index f4b220db3..b10a34bdd 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -47,7 +47,7 @@ use crate::{ cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME}, ingest::PostError, logstream::error::{CreateStreamError, StreamError}, - modal::{utils::logstream_utils::PutStreamHeaders, IngestorMetadata}, + modal::{utils::logstream_utils::PutStreamHeaders, IndexerMetadata, IngestorMetadata}, }, STREAM_TYPE_KEY, }, @@ -129,6 +129,8 @@ pub struct Parseable { pub streams: Streams, /// Metadata associated only with an ingestor pub ingestor_metadata: Option>, + /// Metadata associated only with an indexer + pub indexer_metadata: Option>, /// Used to configure the kafka connector #[cfg(feature = "kafka")] pub kafka_config: KafkaConfig, @@ -144,11 +146,16 @@ impl Parseable { Mode::Ingest => Some(IngestorMetadata::load(&options, storage.as_ref())), _ => None, }; + let indexer_metadata = match &options.mode { + Mode::Index => Some(IndexerMetadata::load(&options, storage.as_ref())), + _ => None, + }; Parseable { options: Arc::new(options), storage, streams: Streams::default(), ingestor_metadata, + indexer_metadata, #[cfg(feature = "kafka")] kafka_config, } @@ -258,35 +265,68 @@ impl Parseable { } // create the ingestor metadata and put the .ingestor.json file in the object store - pub async fn store_ingestor_metadata(&self) -> anyhow::Result<()> { - let Some(meta) = self.ingestor_metadata.as_ref() else { - return Ok(()); - }; - let storage_ingestor_metadata = meta.migrate().await?; - let store = self.storage.get_object_store(); - - // use the id that was generated/found in the staging and - // generate the path for the object store - let path = meta.file_path(); - - // we are considering that we can always get from object store - if let Some(mut store_data) = storage_ingestor_metadata { - if store_data.domain_name != meta.domain_name { - store_data.domain_name.clone_from(&meta.domain_name); - store_data.port.clone_from(&meta.port); - - let resource = Bytes::from(serde_json::to_vec(&store_data)?); + pub async fn store_metadata(&self, mode: Mode) -> anyhow::Result<()> { + match mode { + Mode::Ingest => { + let Some(meta) = self.ingestor_metadata.as_ref() else { + return Ok(()); + }; + let storage_ingestor_metadata = meta.migrate().await?; + let store = self.storage.get_object_store(); + + // use the id that was generated/found in the staging and + // generate the path for the object store + let path = meta.file_path(); + + // we are considering that we can always get from object store + if let Some(mut store_data) = storage_ingestor_metadata { + if store_data.domain_name != meta.domain_name { + store_data.domain_name.clone_from(&meta.domain_name); + store_data.port.clone_from(&meta.port); + + let resource = Bytes::from(serde_json::to_vec(&store_data)?); + + // if pushing to object store fails propagate the error + store.put_object(&path, resource).await?; + } + } else { + let resource = serde_json::to_vec(&meta)?.into(); - // if pushing to object store fails propagate the error - store.put_object(&path, resource).await?; + store.put_object(&path, resource).await?; + } + Ok(()) } - } else { - let resource = serde_json::to_vec(&meta)?.into(); + Mode::Index => { + let Some(meta) = self.indexer_metadata.as_ref() else { + return Ok(()); + }; + let storage_indexer_metadata = meta.migrate().await?; + let store = self.storage.get_object_store(); + + // use the id that was generated/found in the staging and + // generate the path for the object store + let path = meta.file_path(); + + // we are considering that we can always get from object store + if let Some(mut store_data) = storage_indexer_metadata { + if store_data.domain_name != meta.domain_name { + store_data.domain_name.clone_from(&meta.domain_name); + store_data.port.clone_from(&meta.port); + + let resource = Bytes::from(serde_json::to_vec(&store_data)?); + + // if pushing to object store fails propagate the error + store.put_object(&path, resource).await?; + } + } else { + let resource = serde_json::to_vec(&meta)?.into(); - store.put_object(&path, resource).await?; + store.put_object(&path, resource).await?; + } + Ok(()) + } + _ => Err(anyhow::anyhow!("Invalid mode")), } - - Ok(()) } /// list all streams from storage diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index 70d5f3cf7..a7c6709ce 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -41,6 +41,7 @@ use object_store::{ BackoffConfig, ClientOptions, ObjectMeta, ObjectStore, PutPayload, RetryConfig, }; use relative_path::{RelativePath, RelativePathBuf}; +use tokio::{fs::OpenOptions, io::AsyncReadExt}; use tracing::{error, info}; use url::Url; @@ -53,8 +54,8 @@ use crate::{ use super::{ metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path, ObjectStorage, ObjectStorageError, ObjectStorageProvider, CONNECT_TIMEOUT_SECS, - PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, - STREAM_ROOT_DIRECTORY, + MIN_MULTIPART_UPLOAD_SIZE, PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, SCHEMA_FILE_NAME, + STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; #[derive(Debug, Clone, clap::Args)] @@ -378,6 +379,62 @@ impl BlobStore { res } + async fn _upload_multipart( + &self, + key: &RelativePath, + path: &Path, + ) -> Result<(), ObjectStorageError> { + let mut file = OpenOptions::new().read(true).open(path).await?; + let location = &to_object_store_path(key); + + let mut async_writer = self.client.put_multipart(location).await?; + + let meta = file.metadata().await?; + let total_size = meta.len() as usize; + if total_size < MIN_MULTIPART_UPLOAD_SIZE { + let mut data = Vec::new(); + file.read_to_end(&mut data).await?; + self.client.put(location, data.into()).await?; + // async_writer.put_part(data.into()).await?; + // async_writer.complete().await?; + return Ok(()); + } else { + let mut data = Vec::new(); + file.read_to_end(&mut data).await?; + + // let mut upload_parts = Vec::new(); + + let has_final_partial_part = total_size % MIN_MULTIPART_UPLOAD_SIZE > 0; + let num_full_parts = total_size / MIN_MULTIPART_UPLOAD_SIZE; + let total_parts = num_full_parts + if has_final_partial_part { 1 } else { 0 }; + + // Upload each part + for part_number in 0..(total_parts) { + let start_pos = part_number * MIN_MULTIPART_UPLOAD_SIZE; + let end_pos = if part_number == num_full_parts && has_final_partial_part { + // Last part might be smaller than 5MB (which is allowed) + total_size + } else { + // All other parts must be at least 5MB + start_pos + MIN_MULTIPART_UPLOAD_SIZE + }; + + // Extract this part's data + let part_data = data[start_pos..end_pos].to_vec(); + + // Upload the part + async_writer.put_part(part_data.into()).await?; + + // upload_parts.push(part_number as u64 + 1); + } + if let Err(err) = async_writer.complete().await { + error!("Failed to complete multipart upload. {:?}", err); + async_writer.abort().await?; + }; + } + Ok(()) + } + // TODO: introduce parallel, multipart-uploads if required // async fn _upload_multipart(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { // let mut buf = vec![0u8; MULTIPART_UPLOAD_SIZE / 2]; @@ -424,6 +481,13 @@ impl BlobStore { #[async_trait] impl ObjectStorage for BlobStore { + async fn upload_multipart( + &self, + key: &RelativePath, + path: &Path, + ) -> Result<(), ObjectStorageError> { + self._upload_multipart(key, path).await + } async fn get_buffered_reader( &self, _path: &RelativePath, diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 5d6485689..57d2f5cdd 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -30,7 +30,10 @@ use fs_extra::file::CopyOptions; use futures::{stream::FuturesUnordered, TryStreamExt}; use object_store::{buffered::BufReader, ObjectMeta}; use relative_path::{RelativePath, RelativePathBuf}; -use tokio::fs::{self, DirEntry}; +use tokio::{ + fs::{self, DirEntry, OpenOptions}, + io::AsyncReadExt, +}; use tokio_stream::wrappers::ReadDirStream; use crate::{ @@ -104,6 +107,16 @@ impl LocalFS { #[async_trait] impl ObjectStorage for LocalFS { + async fn upload_multipart( + &self, + key: &RelativePath, + path: &Path, + ) -> Result<(), ObjectStorageError> { + let mut file = OpenOptions::new().read(true).open(path).await?; + let mut data = Vec::new(); + file.read_to_end(&mut data).await?; + self.put_object(key, data.into()).await + } async fn get_buffered_reader( &self, _path: &RelativePath, diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 6a272c8e4..b6bc9bb25 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -77,6 +77,7 @@ pub const CURRENT_SCHEMA_VERSION: &str = "v6"; const CONNECT_TIMEOUT_SECS: u64 = 5; const REQUEST_TIMEOUT_SECS: u64 = 300; +pub const MIN_MULTIPART_UPLOAD_SIZE: usize = 25 * 1024 * 1024; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ObjectStoreFormat { /// Version of schema registry diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 680d20f3e..dea669975 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -90,6 +90,11 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { base_path: Option<&RelativePath>, filter_fun: Box bool + Send>, ) -> Result, ObjectStorageError>; + async fn upload_multipart( + &self, + key: &RelativePath, + path: &Path, + ) -> Result<(), ObjectStorageError>; async fn put_object( &self, path: &RelativePath, @@ -839,7 +844,11 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let stream_relative_path = format!("{stream_name}/{file_suffix}"); // Try uploading the file, handle potential errors without breaking the loop - if let Err(e) = self.upload_file(&stream_relative_path, &path).await { + // if let Err(e) = self.upload_multipart(key, path) + if let Err(e) = self + .upload_multipart(&RelativePathBuf::from(&stream_relative_path), &path) + .await + { error!("Failed to upload file {filename:?}: {e}"); continue; // Skip to the next file } diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 76f00f167..69b0acdfe 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -43,6 +43,7 @@ use object_store::{ BackoffConfig, ClientOptions, ObjectMeta, ObjectStore, PutPayload, RetryConfig, }; use relative_path::{RelativePath, RelativePathBuf}; +use tokio::{fs::OpenOptions, io::AsyncReadExt}; use tracing::{error, info}; use crate::{ @@ -54,8 +55,8 @@ use crate::{ use super::{ metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path, ObjectStorage, ObjectStorageError, ObjectStorageProvider, CONNECT_TIMEOUT_SECS, - PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, - STREAM_ROOT_DIRECTORY, + MIN_MULTIPART_UPLOAD_SIZE, PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, SCHEMA_FILE_NAME, + STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; // in bytes @@ -509,48 +510,61 @@ impl S3 { res } - // TODO: introduce parallel, multipart-uploads if required - // async fn _upload_multipart(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { - // let mut buf = vec![0u8; MULTIPART_UPLOAD_SIZE / 2]; - // let mut file = OpenOptions::new().read(true).open(path).await?; - - // // let (multipart_id, mut async_writer) = self.client.put_multipart(&key.into()).await?; - // let mut async_writer = self.client.put_multipart(&key.into()).await?; - - // /* `abort_multipart()` has been removed */ - // // let close_multipart = |err| async move { - // // error!("multipart upload failed. {:?}", err); - // // self.client - // // .abort_multipart(&key.into(), &multipart_id) - // // .await - // // }; - - // loop { - // match file.read(&mut buf).await { - // Ok(len) => { - // if len == 0 { - // break; - // } - // if let Err(err) = async_writer.write_all(&buf[0..len]).await { - // // close_multipart(err).await?; - // break; - // } - // if let Err(err) = async_writer.flush().await { - // // close_multipart(err).await?; - // break; - // } - // } - // Err(err) => { - // // close_multipart(err).await?; - // break; - // } - // } - // } - - // async_writer.shutdown().await?; - - // Ok(()) - // } + async fn _upload_multipart( + &self, + key: &RelativePath, + path: &Path, + ) -> Result<(), ObjectStorageError> { + let mut file = OpenOptions::new().read(true).open(path).await?; + let location = &to_object_store_path(key); + + let mut async_writer = self.client.put_multipart(location).await?; + + let meta = file.metadata().await?; + let total_size = meta.len() as usize; + if total_size < MIN_MULTIPART_UPLOAD_SIZE { + let mut data = Vec::new(); + file.read_to_end(&mut data).await?; + self.client.put(location, data.into()).await?; + // async_writer.put_part(data.into()).await?; + // async_writer.complete().await?; + return Ok(()); + } else { + let mut data = Vec::new(); + file.read_to_end(&mut data).await?; + + // let mut upload_parts = Vec::new(); + + let has_final_partial_part = total_size % MIN_MULTIPART_UPLOAD_SIZE > 0; + let num_full_parts = total_size / MIN_MULTIPART_UPLOAD_SIZE; + let total_parts = num_full_parts + if has_final_partial_part { 1 } else { 0 }; + + // Upload each part + for part_number in 0..(total_parts) { + let start_pos = part_number * MIN_MULTIPART_UPLOAD_SIZE; + let end_pos = if part_number == num_full_parts && has_final_partial_part { + // Last part might be smaller than 5MB (which is allowed) + total_size + } else { + // All other parts must be at least 5MB + start_pos + MIN_MULTIPART_UPLOAD_SIZE + }; + + // Extract this part's data + let part_data = data[start_pos..end_pos].to_vec(); + + // Upload the part + async_writer.put_part(part_data.into()).await?; + + // upload_parts.push(part_number as u64 + 1); + } + if let Err(err) = async_writer.complete().await { + error!("Failed to complete multipart upload. {:?}", err); + async_writer.abort().await?; + }; + } + Ok(()) + } } #[async_trait] @@ -566,6 +580,13 @@ impl ObjectStorage for S3 { let buf = object_store::buffered::BufReader::new(store, &meta); Ok(buf) } + async fn upload_multipart( + &self, + key: &RelativePath, + path: &Path, + ) -> Result<(), ObjectStorageError> { + self._upload_multipart(key, path).await + } async fn head(&self, path: &RelativePath) -> Result { Ok(self.client.head(&to_object_store_path(path)).await?) } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 021d1bf77..7a130ba91 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -42,6 +42,13 @@ pub fn get_ingestor_id() -> String { id } +pub fn get_indexer_id() -> String { + let now = Utc::now().to_rfc3339(); + let id = get_hash(&now).to_string().split_at(15).0.to_string(); + debug!("Indexer ID: {id}"); + id +} + pub fn extract_datetime(path: &str) -> Option { let re = Regex::new(r"date=(\d{4}-\d{2}-\d{2})/hour=(\d{2})/minute=(\d{2})").unwrap(); if let Some(caps) = re.captures(path) { diff --git a/src/utils/time.rs b/src/utils/time.rs index 622f28dc1..ec39a423b 100644 --- a/src/utils/time.rs +++ b/src/utils/time.rs @@ -49,7 +49,7 @@ impl TimeBounds { } /// Represents a range of time with a start and end point. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct TimeRange { pub start: DateTime, pub end: DateTime,