Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit b98106b

Browse files
committedMar 18, 2025··
updates for enterprise
1 parent 4af9a5e commit b98106b

File tree

15 files changed

+446
-117
lines changed

15 files changed

+446
-117
lines changed
 

‎src/cli.rs

+44-18
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,14 @@ pub struct Options {
294294
)]
295295
pub ingestor_endpoint: String,
296296

297+
#[arg(
298+
long,
299+
env = "P_INDEXER_ENDPOINT",
300+
default_value = "",
301+
help = "URL to connect to this specific indexer. Default is the address of the server"
302+
)]
303+
pub indexer_endpoint: String,
304+
297305
#[command(flatten)]
298306
pub oidc: Option<OidcConfig>,
299307

@@ -395,29 +403,47 @@ impl Options {
395403
}
396404

397405
/// TODO: refactor and document
398-
pub fn get_url(&self) -> Url {
399-
if self.ingestor_endpoint.is_empty() {
400-
return format!(
401-
"{}://{}",
402-
self.get_scheme(),
403-
self.address
404-
)
405-
.parse::<Url>() // if the value was improperly set, this will panic before hand
406-
.unwrap_or_else(|err| {
407-
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
408-
});
409-
}
410-
411-
let ingestor_endpoint = &self.ingestor_endpoint;
406+
pub fn get_url(&self, mode: Mode) -> Url {
407+
let (endpoint, env_var) = match mode {
408+
Mode::Ingest => {
409+
if self.ingestor_endpoint.is_empty() {
410+
return format!(
411+
"{}://{}",
412+
self.get_scheme(),
413+
self.address
414+
)
415+
.parse::<Url>() // if the value was improperly set, this will panic before hand
416+
.unwrap_or_else(|err| {
417+
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
418+
});
419+
}
420+
(&self.ingestor_endpoint, "P_INGESTOR_ENDPOINT")
421+
}
422+
Mode::Index => {
423+
if self.indexer_endpoint.is_empty() {
424+
return format!(
425+
"{}://{}",
426+
self.get_scheme(),
427+
self.address
428+
)
429+
.parse::<Url>() // if the value was improperly set, this will panic before hand
430+
.unwrap_or_else(|err| {
431+
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
432+
});
433+
}
434+
(&self.indexer_endpoint, "P_INDEXER_ENDPOINT")
435+
}
436+
_ => panic!("Invalid mode"),
437+
};
412438

413-
if ingestor_endpoint.starts_with("http") {
414-
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
439+
if endpoint.starts_with("http") {
440+
panic!("Invalid value `{}`, please set the environement variable `{env_var}` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", endpoint);
415441
}
416442

417-
let addr_from_env = ingestor_endpoint.split(':').collect::<Vec<&str>>();
443+
let addr_from_env = endpoint.split(':').collect::<Vec<&str>>();
418444

419445
if addr_from_env.len() != 2 {
420-
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
446+
panic!("Invalid value `{}`, please set the environement variable `{env_var}` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", endpoint);
421447
}
422448

423449
let mut hostname = addr_from_env[0].to_string();

‎src/enterprise/utils.rs

+28-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::{collections::HashMap, path::PathBuf, sync::Arc};
22

3+
use chrono::{TimeZone, Utc};
34
use datafusion::{common::Column, prelude::Expr};
45
use itertools::Itertools;
56
use relative_path::RelativePathBuf;
@@ -119,14 +120,35 @@ pub async fn fetch_parquet_file_paths(
119120

120121
selected_files
121122
.into_iter()
122-
.map(|file| {
123+
.filter_map(|file| {
123124
let date = file.file_path.split("/").collect_vec();
124125

125-
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
126-
127-
let date = RelativePathBuf::from_iter(date);
128-
129-
parquet_files.entry(date).or_default().push(file);
126+
let year = &date[1][5..9];
127+
let month = &date[1][10..12];
128+
let day = &date[1][13..15];
129+
let hour = &date[2][5..7];
130+
let min = &date[3][7..9];
131+
let file_date = Utc
132+
.with_ymd_and_hms(
133+
year.parse::<i32>().unwrap(),
134+
month.parse::<u32>().unwrap(),
135+
day.parse::<u32>().unwrap(),
136+
hour.parse::<u32>().unwrap(),
137+
min.parse::<u32>().unwrap(),
138+
0,
139+
)
140+
.unwrap();
141+
142+
if file_date < time_range.start {
143+
None
144+
} else {
145+
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
146+
147+
let date = RelativePathBuf::from_iter(date);
148+
149+
parquet_files.entry(date).or_default().push(file);
150+
Some("")
151+
}
130152
})
131153
.for_each(|_| {});
132154

‎src/handlers/http/cluster/mod.rs

+21-2
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,14 @@ use crate::HTTP_CLIENT;
5151
use super::base_path_without_preceding_slash;
5252
use super::ingest::PostError;
5353
use super::logstream::error::StreamError;
54-
use super::modal::IngestorMetadata;
54+
use super::modal::{IndexerMetadata, IngestorMetadata};
5555
use super::rbac::RBACError;
5656
use super::role::RoleError;
5757

5858
type IngestorMetadataArr = Vec<IngestorMetadata>;
5959

60+
type IndexerMetadataArr = Vec<IndexerMetadata>;
61+
6062
pub const INTERNAL_STREAM_NAME: &str = "pmeta";
6163

6264
const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1);
@@ -616,7 +618,6 @@ pub async fn get_cluster_metrics() -> Result<impl Responder, PostError> {
616618
Ok(actix_web::HttpResponse::Ok().json(dresses))
617619
}
618620

619-
// update the .query.json file and return the new ingestorMetadataArr
620621
pub async fn get_ingestor_info() -> anyhow::Result<IngestorMetadataArr> {
621622
let store = PARSEABLE.storage.get_object_store();
622623

@@ -635,6 +636,24 @@ pub async fn get_ingestor_info() -> anyhow::Result<IngestorMetadataArr> {
635636
Ok(arr)
636637
}
637638

639+
pub async fn get_indexer_info() -> anyhow::Result<IndexerMetadataArr> {
640+
let store = PARSEABLE.storage.get_object_store();
641+
642+
let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
643+
let arr = store
644+
.get_objects(
645+
Some(&root_path),
646+
Box::new(|file_name| file_name.starts_with("indexer")),
647+
)
648+
.await?
649+
.iter()
650+
// this unwrap will most definateley shoot me in the foot later
651+
.map(|x| serde_json::from_slice::<IndexerMetadata>(x).unwrap_or_default())
652+
.collect_vec();
653+
654+
Ok(arr)
655+
}
656+
638657
pub async fn remove_ingestor(ingestor: Path<String>) -> Result<impl Responder, PostError> {
639658
let domain_name = to_url_string(ingestor.into_inner());
640659

‎src/handlers/http/middleware.rs

+5-15
Original file line numberDiff line numberDiff line change
@@ -359,22 +359,12 @@ where
359359
}
360360

361361
Mode::Index => {
362-
let accessable_endpoints = ["create", "delete"];
363-
let cond = path.split('/').any(|x| accessable_endpoints.contains(&x));
364-
if !cond {
365-
Box::pin(async {
366-
Err(actix_web::error::ErrorUnauthorized(
367-
"Only Index API can be accessed in Index Mode",
368-
))
369-
})
370-
} else {
371-
let fut = self.service.call(req);
362+
let fut = self.service.call(req);
372363

373-
Box::pin(async move {
374-
let res = fut.await?;
375-
Ok(res)
376-
})
377-
}
364+
Box::pin(async move {
365+
let res = fut.await?;
366+
Ok(res)
367+
})
378368
}
379369
}
380370
}

‎src/handlers/http/modal/ingest_server.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use relative_path::RelativePathBuf;
2828
use serde_json::Value;
2929
use tokio::sync::oneshot;
3030

31+
use crate::option::Mode;
3132
use crate::{
3233
analytics,
3334
handlers::{
@@ -108,7 +109,7 @@ impl ParseableServer for IngestServer {
108109
tokio::spawn(airplane::server());
109110

110111
// write the ingestor metadata to storage
111-
PARSEABLE.store_ingestor_metadata().await?;
112+
PARSEABLE.store_metadata(Mode::Ingest).await?;
112113

113114
// Ingestors shouldn't have to deal with OpenId auth flow
114115
let result = self.start(shutdown_rx, prometheus.clone(), None).await;
@@ -251,7 +252,7 @@ impl IngestServer {
251252

252253
// check for querier state. Is it there, or was it there in the past
253254
// this should happen before the set the ingestor metadata
254-
async fn check_querier_state() -> anyhow::Result<Option<Bytes>, ObjectStorageError> {
255+
pub async fn check_querier_state() -> anyhow::Result<Option<Bytes>, ObjectStorageError> {
255256
// how do we check for querier state?
256257
// based on the work flow of the system, the querier will always need to start first
257258
// i.e the querier will create the `.parseable.json` file

‎src/handlers/http/modal/mod.rs

+178-2
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,10 @@ use tracing::{error, info, warn};
3434
use crate::{
3535
cli::Options,
3636
oidc::Claims,
37+
option::Mode,
3738
parseable::PARSEABLE,
3839
storage::{ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY},
39-
utils::get_ingestor_id,
40+
utils::{get_indexer_id, get_ingestor_id},
4041
};
4142

4243
use super::{audit, cross_origin_config, health_check, API_BASE_PATH, API_VERSION};
@@ -199,7 +200,7 @@ impl IngestorMetadata {
199200
.staging_dir()
200201
.read_dir()
201202
.expect("Couldn't read from file");
202-
let url = options.get_url();
203+
let url = options.get_url(Mode::Ingest);
203204
let port = url.port().unwrap_or(80).to_string();
204205
let url = url.to_string();
205206
let Options {
@@ -333,6 +334,181 @@ impl IngestorMetadata {
333334
}
334335
}
335336

337+
#[derive(Debug, Serialize, Deserialize, Default, Clone, Eq, PartialEq)]
338+
pub struct IndexerMetadata {
339+
pub version: String,
340+
pub port: String,
341+
pub domain_name: String,
342+
pub bucket_name: String,
343+
pub token: String,
344+
pub indexer_id: String,
345+
pub flight_port: String,
346+
}
347+
348+
impl IndexerMetadata {
349+
pub fn new(
350+
port: String,
351+
domain_name: String,
352+
bucket_name: String,
353+
username: &str,
354+
password: &str,
355+
indexer_id: String,
356+
flight_port: String,
357+
) -> Self {
358+
let token = base64::prelude::BASE64_STANDARD.encode(format!("{username}:{password}"));
359+
360+
Self {
361+
port,
362+
domain_name,
363+
version: DEFAULT_VERSION.to_string(),
364+
bucket_name,
365+
token: format!("Basic {token}"),
366+
indexer_id,
367+
flight_port,
368+
}
369+
}
370+
371+
/// Capture metadata information by either loading it from staging or starting fresh
372+
pub fn load(options: &Options, storage: &dyn ObjectStorageProvider) -> Arc<Self> {
373+
// all the files should be in the staging directory root
374+
let entries = options
375+
.staging_dir()
376+
.read_dir()
377+
.expect("Couldn't read from file");
378+
let url = options.get_url(Mode::Index);
379+
let port = url.port().unwrap_or(80).to_string();
380+
let url = url.to_string();
381+
let Options {
382+
username, password, ..
383+
} = options;
384+
let staging_path = options.staging_dir();
385+
let flight_port = options.flight_port.to_string();
386+
387+
for entry in entries {
388+
// cause the staging directory will have only one file with indexer in the name
389+
// so the JSON Parse should not error unless the file is corrupted
390+
let path = entry.expect("Should be a directory entry").path();
391+
if !path
392+
.file_name()
393+
.and_then(|s| s.to_str())
394+
.is_some_and(|s| s.contains("indexer"))
395+
{
396+
continue;
397+
}
398+
399+
// get the indexer metadata from staging
400+
let bytes = std::fs::read(path).expect("File should be present");
401+
let mut meta =
402+
Self::from_bytes(&bytes, options.flight_port).expect("Extracted indexer metadata");
403+
404+
// compare url endpoint and port, update
405+
if meta.domain_name != url {
406+
info!(
407+
"Domain Name was Updated. Old: {} New: {}",
408+
meta.domain_name, url
409+
);
410+
meta.domain_name = url;
411+
}
412+
413+
if meta.port != port {
414+
info!("Port was Updated. Old: {} New: {}", meta.port, port);
415+
meta.port = port;
416+
}
417+
418+
let token = format!(
419+
"Basic {}",
420+
BASE64_STANDARD.encode(format!("{username}:{password}"))
421+
);
422+
if meta.token != token {
423+
// TODO: Update the message to be more informative with username and password
424+
warn!(
425+
"Credentials were Updated. Tokens updated; Old: {} New: {}",
426+
meta.token, token
427+
);
428+
meta.token = token;
429+
}
430+
meta.put_on_disk(staging_path)
431+
.expect("Couldn't write to disk");
432+
433+
return Arc::new(meta);
434+
}
435+
436+
let storage = storage.get_object_store();
437+
let meta = Self::new(
438+
port,
439+
url,
440+
storage.get_bucket_name(),
441+
username,
442+
password,
443+
get_indexer_id(),
444+
flight_port,
445+
);
446+
447+
meta.put_on_disk(staging_path)
448+
.expect("Should Be valid Json");
449+
Arc::new(meta)
450+
}
451+
452+
pub fn get_indexer_id(&self) -> String {
453+
self.indexer_id.clone()
454+
}
455+
456+
#[inline(always)]
457+
pub fn file_path(&self) -> RelativePathBuf {
458+
RelativePathBuf::from_iter([
459+
PARSEABLE_ROOT_DIRECTORY,
460+
&format!("indexer.{}.json", self.get_indexer_id()),
461+
])
462+
}
463+
464+
/// Updates json with `flight_port` field if not already present
465+
fn from_bytes(bytes: &[u8], flight_port: u16) -> anyhow::Result<Self> {
466+
let mut json: Map<String, Value> = serde_json::from_slice(bytes)?;
467+
json.entry("flight_port")
468+
.or_insert_with(|| Value::String(flight_port.to_string()));
469+
470+
Ok(serde_json::from_value(Value::Object(json))?)
471+
}
472+
473+
pub async fn migrate(&self) -> anyhow::Result<Option<IndexerMetadata>> {
474+
let imp = self.file_path();
475+
let bytes = match PARSEABLE.storage.get_object_store().get_object(&imp).await {
476+
Ok(bytes) => bytes,
477+
Err(_) => {
478+
return Ok(None);
479+
}
480+
};
481+
482+
let resource = Self::from_bytes(&bytes, PARSEABLE.options.flight_port)?;
483+
let bytes = Bytes::from(serde_json::to_vec(&resource)?);
484+
485+
resource.put_on_disk(PARSEABLE.options.staging_dir())?;
486+
487+
PARSEABLE
488+
.storage
489+
.get_object_store()
490+
.put_object(&imp, bytes)
491+
.await?;
492+
493+
Ok(Some(resource))
494+
}
495+
496+
/// Puts the indexer info into the staging.
497+
///
498+
/// This function takes the indexer info as a parameter and stores it in staging.
499+
/// # Parameters
500+
///
501+
/// * `staging_path`: Staging root directory.
502+
pub fn put_on_disk(&self, staging_path: &Path) -> anyhow::Result<()> {
503+
let file_name = format!("indexer.{}.json", self.indexer_id);
504+
let file_path = staging_path.join(file_name);
505+
506+
std::fs::write(file_path, serde_json::to_vec(&self)?)?;
507+
508+
Ok(())
509+
}
510+
}
511+
336512
#[cfg(test)]
337513
mod test {
338514
use actix_web::body::MessageBody;

‎src/lib.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,10 @@ pub const OBJECT_STORE_DATA_GRANULARITY: u32 = LOCAL_SYNC_INTERVAL.as_secs() as
7272
pub const STORAGE_UPLOAD_INTERVAL: Duration = Duration::from_secs(30);
7373

7474
// A single HTTP client for all outgoing HTTP requests from the parseable server
75-
static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {
75+
pub static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {
7676
ClientBuilder::new()
7777
.connect_timeout(Duration::from_secs(3)) // set a timeout of 3s for each connection setup
78-
.timeout(Duration::from_secs(10)) // set a timeout of 10s for each request
78+
.timeout(Duration::from_secs(30)) // set a timeout of 30s for each request
7979
.pool_idle_timeout(Duration::from_secs(90)) // set a timeout of 90s for each idle connection
8080
.pool_max_idle_per_host(32) // max 32 idle connections per host
8181
.gzip(true) // gzip compress for all requests

‎src/metrics/prom_utils.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use crate::handlers::http::base_path_without_preceding_slash;
2020
use crate::handlers::http::ingest::PostError;
2121
use crate::handlers::http::modal::IngestorMetadata;
22+
use crate::option::Mode;
2223
use crate::parseable::PARSEABLE;
2324
use crate::HTTP_CLIENT;
2425
use actix_web::http::header;
@@ -61,7 +62,8 @@ struct StorageMetrics {
6162

6263
impl Default for Metrics {
6364
fn default() -> Self {
64-
let url = PARSEABLE.options.get_url();
65+
// for now it is only for ingestor
66+
let url = PARSEABLE.options.get_url(Mode::Ingest);
6567
let address = format!(
6668
"http://{}:{}",
6769
url.domain()

‎src/parseable/mod.rs

+66-26
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use crate::{
4141
cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME},
4242
ingest::PostError,
4343
logstream::error::{CreateStreamError, StreamError},
44-
modal::{utils::logstream_utils::PutStreamHeaders, IngestorMetadata},
44+
modal::{utils::logstream_utils::PutStreamHeaders, IndexerMetadata, IngestorMetadata},
4545
},
4646
STREAM_TYPE_KEY,
4747
},
@@ -120,6 +120,8 @@ pub struct Parseable {
120120
pub streams: Streams,
121121
/// Metadata associated only with an ingestor
122122
pub ingestor_metadata: Option<Arc<IngestorMetadata>>,
123+
/// Metadata associated only with an indexer
124+
pub indexer_metadata: Option<Arc<IndexerMetadata>>,
123125
/// Used to configure the kafka connector
124126
#[cfg(feature = "kafka")]
125127
pub kafka_config: KafkaConfig,
@@ -135,11 +137,16 @@ impl Parseable {
135137
Mode::Ingest => Some(IngestorMetadata::load(&options, storage.as_ref())),
136138
_ => None,
137139
};
140+
let indexer_metadata = match &options.mode {
141+
Mode::Index => Some(IndexerMetadata::load(&options, storage.as_ref())),
142+
_ => None,
143+
};
138144
Parseable {
139145
options: Arc::new(options),
140146
storage,
141147
streams: Streams::default(),
142148
ingestor_metadata,
149+
indexer_metadata,
143150
#[cfg(feature = "kafka")]
144151
kafka_config,
145152
}
@@ -249,35 +256,68 @@ impl Parseable {
249256
}
250257

251258
// create the ingestor metadata and put the .ingestor.json file in the object store
252-
pub async fn store_ingestor_metadata(&self) -> anyhow::Result<()> {
253-
let Some(meta) = self.ingestor_metadata.as_ref() else {
254-
return Ok(());
255-
};
256-
let storage_ingestor_metadata = meta.migrate().await?;
257-
let store = self.storage.get_object_store();
258-
259-
// use the id that was generated/found in the staging and
260-
// generate the path for the object store
261-
let path = meta.file_path();
262-
263-
// we are considering that we can always get from object store
264-
if let Some(mut store_data) = storage_ingestor_metadata {
265-
if store_data.domain_name != meta.domain_name {
266-
store_data.domain_name.clone_from(&meta.domain_name);
267-
store_data.port.clone_from(&meta.port);
268-
269-
let resource = Bytes::from(serde_json::to_vec(&store_data)?);
259+
pub async fn store_metadata(&self, mode: Mode) -> anyhow::Result<()> {
260+
match mode {
261+
Mode::Ingest => {
262+
let Some(meta) = self.ingestor_metadata.as_ref() else {
263+
return Ok(());
264+
};
265+
let storage_ingestor_metadata = meta.migrate().await?;
266+
let store = self.storage.get_object_store();
267+
268+
// use the id that was generated/found in the staging and
269+
// generate the path for the object store
270+
let path = meta.file_path();
271+
272+
// we are considering that we can always get from object store
273+
if let Some(mut store_data) = storage_ingestor_metadata {
274+
if store_data.domain_name != meta.domain_name {
275+
store_data.domain_name.clone_from(&meta.domain_name);
276+
store_data.port.clone_from(&meta.port);
277+
278+
let resource = Bytes::from(serde_json::to_vec(&store_data)?);
279+
280+
// if pushing to object store fails propagate the error
281+
store.put_object(&path, resource).await?;
282+
}
283+
} else {
284+
let resource = serde_json::to_vec(&meta)?.into();
270285

271-
// if pushing to object store fails propagate the error
272-
store.put_object(&path, resource).await?;
286+
store.put_object(&path, resource).await?;
287+
}
288+
Ok(())
273289
}
274-
} else {
275-
let resource = serde_json::to_vec(&meta)?.into();
290+
Mode::Index => {
291+
let Some(meta) = self.indexer_metadata.as_ref() else {
292+
return Ok(());
293+
};
294+
let storage_indexer_metadata = meta.migrate().await?;
295+
let store = self.storage.get_object_store();
296+
297+
// use the id that was generated/found in the staging and
298+
// generate the path for the object store
299+
let path = meta.file_path();
300+
301+
// we are considering that we can always get from object store
302+
if let Some(mut store_data) = storage_indexer_metadata {
303+
if store_data.domain_name != meta.domain_name {
304+
store_data.domain_name.clone_from(&meta.domain_name);
305+
store_data.port.clone_from(&meta.port);
306+
307+
let resource = Bytes::from(serde_json::to_vec(&store_data)?);
308+
309+
// if pushing to object store fails propagate the error
310+
store.put_object(&path, resource).await?;
311+
}
312+
} else {
313+
let resource = serde_json::to_vec(&meta)?.into();
276314

277-
store.put_object(&path, resource).await?;
315+
store.put_object(&path, resource).await?;
316+
}
317+
Ok(())
318+
}
319+
_ => Err(anyhow::anyhow!("Invalid mode")),
278320
}
279-
280-
Ok(())
281321
}
282322

283323
/// list all streams from storage

‎src/storage/azure_blob.rs

+7
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,13 @@ impl BlobStore {
424424

425425
#[async_trait]
426426
impl ObjectStorage for BlobStore {
427+
async fn upload_multipart(
428+
&self,
429+
_key: &RelativePath,
430+
_path: &Path,
431+
) -> Result<(), ObjectStorageError> {
432+
unimplemented!()
433+
}
427434
async fn get_buffered_reader(
428435
&self,
429436
_path: &RelativePath,

‎src/storage/localfs.rs

+7
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,13 @@ impl LocalFS {
104104

105105
#[async_trait]
106106
impl ObjectStorage for LocalFS {
107+
async fn upload_multipart(
108+
&self,
109+
_key: &RelativePath,
110+
_path: &Path,
111+
) -> Result<(), ObjectStorageError> {
112+
unimplemented!()
113+
}
107114
async fn get_buffered_reader(
108115
&self,
109116
_path: &RelativePath,

‎src/storage/object_storage.rs

+5
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
8888
base_path: Option<&RelativePath>,
8989
filter_fun: Box<dyn Fn(String) -> bool + Send>,
9090
) -> Result<Vec<Bytes>, ObjectStorageError>;
91+
async fn upload_multipart(
92+
&self,
93+
key: &RelativePath,
94+
path: &Path,
95+
) -> Result<(), ObjectStorageError>;
9196
async fn put_object(
9297
&self,
9398
path: &RelativePath,

‎src/storage/s3.rs

+69-42
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use object_store::{
4343
BackoffConfig, ClientOptions, ObjectMeta, ObjectStore, PutPayload, RetryConfig,
4444
};
4545
use relative_path::{RelativePath, RelativePathBuf};
46+
use tokio::{fs::OpenOptions, io::AsyncReadExt};
4647
use tracing::{error, info};
4748

4849
use crate::{
@@ -61,6 +62,7 @@ use super::{
6162
// in bytes
6263
// const MULTIPART_UPLOAD_SIZE: usize = 1024 * 1024 * 100;
6364
const AWS_CONTAINER_CREDENTIALS_RELATIVE_URI: &str = "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI";
65+
const MIN_MULTIPART_UPLOAD_SIZE: usize = 5 * 1024 * 1024;
6466

6567
#[derive(Debug, Clone, clap::Args)]
6668
#[command(
@@ -509,48 +511,66 @@ impl S3 {
509511
res
510512
}
511513

512-
// TODO: introduce parallel, multipart-uploads if required
513-
// async fn _upload_multipart(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> {
514-
// let mut buf = vec![0u8; MULTIPART_UPLOAD_SIZE / 2];
515-
// let mut file = OpenOptions::new().read(true).open(path).await?;
516-
517-
// // let (multipart_id, mut async_writer) = self.client.put_multipart(&key.into()).await?;
518-
// let mut async_writer = self.client.put_multipart(&key.into()).await?;
519-
520-
// /* `abort_multipart()` has been removed */
521-
// // let close_multipart = |err| async move {
522-
// // error!("multipart upload failed. {:?}", err);
523-
// // self.client
524-
// // .abort_multipart(&key.into(), &multipart_id)
525-
// // .await
526-
// // };
527-
528-
// loop {
529-
// match file.read(&mut buf).await {
530-
// Ok(len) => {
531-
// if len == 0 {
532-
// break;
533-
// }
534-
// if let Err(err) = async_writer.write_all(&buf[0..len]).await {
535-
// // close_multipart(err).await?;
536-
// break;
537-
// }
538-
// if let Err(err) = async_writer.flush().await {
539-
// // close_multipart(err).await?;
540-
// break;
541-
// }
542-
// }
543-
// Err(err) => {
544-
// // close_multipart(err).await?;
545-
// break;
546-
// }
547-
// }
548-
// }
549-
550-
// async_writer.shutdown().await?;
551-
552-
// Ok(())
553-
// }
514+
async fn _upload_multipart(
515+
&self,
516+
key: &RelativePath,
517+
path: &Path,
518+
) -> Result<(), ObjectStorageError> {
519+
let mut file = OpenOptions::new().read(true).open(path).await?;
520+
let location = &to_object_store_path(key);
521+
522+
let mut async_writer = self.client.put_multipart(location).await?;
523+
524+
// /* `abort_multipart()` has been removed */
525+
// let close_multipart = |err| async move {
526+
// error!("multipart upload failed. {:?}", err);
527+
// self.client
528+
// .abort_multipart(&key.into(), &multipart_id)
529+
// .await
530+
// };
531+
532+
let meta = file.metadata().await?;
533+
let total_size = meta.len() as usize;
534+
if total_size < MIN_MULTIPART_UPLOAD_SIZE {
535+
let mut data = Vec::new();
536+
file.read_to_end(&mut data).await?;
537+
self.client.put(location, data.into()).await?;
538+
// async_writer.put_part(data.into()).await?;
539+
// async_writer.complete().await?;
540+
return Ok(());
541+
} else {
542+
let mut data = Vec::new();
543+
file.read_to_end(&mut data).await?;
544+
545+
// let mut upload_parts = Vec::new();
546+
547+
let has_final_partial_part = total_size % MIN_MULTIPART_UPLOAD_SIZE > 0;
548+
let num_full_parts = total_size / MIN_MULTIPART_UPLOAD_SIZE;
549+
let total_parts = num_full_parts + if has_final_partial_part { 1 } else { 0 };
550+
551+
// Upload each part
552+
for part_number in 0..(total_parts) {
553+
let start_pos = part_number * MIN_MULTIPART_UPLOAD_SIZE;
554+
let end_pos = if part_number == num_full_parts && has_final_partial_part {
555+
// Last part might be smaller than 5MB (which is allowed)
556+
total_size
557+
} else {
558+
// All other parts must be at least 5MB
559+
start_pos + MIN_MULTIPART_UPLOAD_SIZE
560+
};
561+
562+
// Extract this part's data
563+
let part_data = data[start_pos..end_pos].to_vec();
564+
565+
// Upload the part
566+
async_writer.put_part(part_data.into()).await?;
567+
568+
// upload_parts.push(part_number as u64 + 1);
569+
}
570+
async_writer.complete().await?;
571+
}
572+
Ok(())
573+
}
554574
}
555575

556576
#[async_trait]
@@ -566,6 +586,13 @@ impl ObjectStorage for S3 {
566586
let buf = object_store::buffered::BufReader::new(store, &meta);
567587
Ok(buf)
568588
}
589+
async fn upload_multipart(
590+
&self,
591+
key: &RelativePath,
592+
path: &Path,
593+
) -> Result<(), ObjectStorageError> {
594+
self._upload_multipart(key, path).await
595+
}
569596
async fn head(&self, path: &RelativePath) -> Result<ObjectMeta, ObjectStorageError> {
570597
Ok(self.client.head(&to_object_store_path(path)).await?)
571598
}

‎src/utils/mod.rs

+7
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ pub fn get_ingestor_id() -> String {
4242
id
4343
}
4444

45+
pub fn get_indexer_id() -> String {
46+
let now = Utc::now().to_rfc3339();
47+
let id = get_hash(&now).to_string().split_at(15).0.to_string();
48+
debug!("Indexer ID: {id}");
49+
id
50+
}
51+
4552
pub fn extract_datetime(path: &str) -> Option<NaiveDateTime> {
4653
let re = Regex::new(r"date=(\d{4}-\d{2}-\d{2})/hour=(\d{2})/minute=(\d{2})").unwrap();
4754
if let Some(caps) = re.captures(path) {

‎src/utils/time.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ impl TimeBounds {
4949
}
5050

5151
/// Represents a range of time with a start and end point.
52-
#[derive(Debug)]
52+
#[derive(Debug, Clone)]
5353
pub struct TimeRange {
5454
pub start: DateTime<Utc>,
5555
pub end: DateTime<Utc>,

0 commit comments

Comments
 (0)
Please sign in to comment.