Skip to content

Commit 70ca2f0

Browse files
authored
refactor: associate conversion with STAGING, not storage (#1176)
1 parent 6346928 commit 70ca2f0

File tree

7 files changed

+86
-94
lines changed

7 files changed

+86
-94
lines changed

src/alerts/mod.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -742,7 +742,8 @@ impl Alerts {
742742
outbox_tx,
743743
)?;
744744

745-
self.update_task(alert.id, handle, outbox_rx, inbox_tx).await;
745+
self.update_task(alert.id, handle, outbox_rx, inbox_tx)
746+
.await;
746747

747748
map.insert(alert.id, alert);
748749
}

src/handlers/http/modal/ingest_server.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ impl ParseableServer for IngestServer {
213213
let result = self.start(shutdown_rx, prometheus.clone(), None).await;
214214
// Cancel sync jobs
215215
cancel_tx.send(()).expect("Cancellation should not fail");
216-
216+
217217
result
218218
}
219219
}

src/handlers/http/modal/mod.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ use super::API_VERSION;
4949
use crate::handlers::http::health_check;
5050
use crate::oidc;
5151
use crate::option::CONFIG;
52+
use crate::staging::STAGING;
5253

5354
pub type OpenIdClient = Arc<openid::Client<Discovered, oidc::Claims>>;
5455

@@ -140,7 +141,7 @@ pub trait ParseableServer {
140141
// Perform S3 sync and wait for completion
141142
info!("Starting data sync to S3...");
142143

143-
if let Err(e) = CONFIG.storage().get_object_store().conversion(true).await {
144+
if let Err(e) = STAGING.prepare_parquet(true) {
144145
warn!("Failed to convert arrow files to parquet. {:?}", e);
145146
} else {
146147
info!("Successfully converted arrow files to parquet.");

src/staging/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
use once_cell::sync::Lazy;
2121
pub use streams::{Stream, Streams};
2222

23+
use crate::metadata::error::stream_info::MetadataError;
24+
2325
mod reader;
2426
mod streams;
2527
mod writer;
@@ -34,6 +36,8 @@ pub enum StagingError {
3436
ObjectStorage(#[from] std::io::Error),
3537
#[error("Could not generate parquet file")]
3638
Create,
39+
#[error("Metadata Error: {0}")]
40+
Metadata(#[from] MetadataError),
3741
}
3842

3943
/// Staging is made up of multiple streams, each stream's context is housed in a single `Stream` object.

src/staging/streams.rs

+76-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
use std::{
2121
collections::HashMap,
22-
fs::{remove_file, File, OpenOptions},
22+
fs::{self, remove_file, File, OpenOptions},
2323
path::{Path, PathBuf},
2424
process,
2525
sync::{Arc, Mutex, RwLock},
@@ -42,15 +42,17 @@ use parquet::{
4242
schema::types::ColumnPath,
4343
};
4444
use rand::distributions::DistString;
45-
use tracing::{error, trace};
45+
use relative_path::RelativePathBuf;
46+
use tracing::{error, info, trace, warn};
4647

4748
use crate::{
4849
cli::Options,
4950
event::DEFAULT_TIMESTAMP_KEY,
5051
handlers::http::modal::ingest_server::INGESTOR_META,
52+
metadata::{LOCK_EXPECT, STREAM_INFO},
5153
metrics,
5254
option::{Mode, CONFIG},
53-
storage::{StreamType, OBJECT_STORE_DATA_GRANULARITY},
55+
storage::{object_storage::to_bytes, StreamType, OBJECT_STORE_DATA_GRANULARITY},
5456
utils::minute_to_slot,
5557
};
5658

@@ -278,6 +280,62 @@ impl<'a> Stream<'a> {
278280
parquet_path
279281
}
280282

283+
/// Converts arrow files in staging into parquet files, does so only for past minutes when run with `!shutdown_signal`
284+
fn prepare_parquet(&self, shutdown_signal: bool) -> Result<(), StagingError> {
285+
info!(
286+
"Starting arrow_conversion job for stream- {}",
287+
self.stream_name
288+
);
289+
290+
let time_partition = STREAM_INFO.get_time_partition(&self.stream_name)?;
291+
let custom_partition = STREAM_INFO.get_custom_partition(&self.stream_name)?;
292+
293+
// read arrow files on disk
294+
// convert them to parquet
295+
let schema = self
296+
.convert_disk_files_to_parquet(
297+
time_partition.as_ref(),
298+
custom_partition.as_ref(),
299+
shutdown_signal,
300+
)
301+
.inspect_err(|err| warn!("Error while converting arrow to parquet- {err:?}"))?;
302+
303+
// check if there is already a schema file in staging pertaining to this stream
304+
// if yes, then merge them and save
305+
306+
if let Some(schema) = schema {
307+
let static_schema_flag = STREAM_INFO.get_static_schema_flag(&self.stream_name)?;
308+
if !static_schema_flag {
309+
// schema is dynamic, read from staging and merge if present
310+
311+
// need to add something before .schema to make the file have an extension of type `schema`
312+
let path = RelativePathBuf::from_iter([format!("{}.schema", self.stream_name)])
313+
.to_path(&self.data_path);
314+
315+
let staging_schemas = self.get_schemas_if_present();
316+
if let Some(mut staging_schemas) = staging_schemas {
317+
warn!(
318+
"Found {} schemas in staging for stream- {}",
319+
staging_schemas.len(),
320+
self.stream_name
321+
);
322+
staging_schemas.push(schema);
323+
let merged_schema = Schema::try_merge(staging_schemas)?;
324+
325+
warn!("writing merged schema to path- {path:?}");
326+
// save the merged schema on staging disk
327+
// the path should be stream/.ingestor.id.schema
328+
fs::write(path, to_bytes(&merged_schema))?;
329+
} else {
330+
info!("writing single schema to path- {path:?}");
331+
fs::write(path, to_bytes(&schema))?;
332+
}
333+
}
334+
}
335+
336+
Ok(())
337+
}
338+
281339
pub fn recordbatches_cloned(&self, schema: &Arc<Schema>) -> Vec<RecordBatch> {
282340
self.writer.lock().unwrap().mem.recordbatch_cloned(schema)
283341
}
@@ -506,6 +564,21 @@ impl Streams {
506564
staging.flush()
507565
}
508566
}
567+
568+
/// Convert arrow files into parquet, preparing it for upload
569+
pub fn prepare_parquet(&self, shutdown_signal: bool) -> Result<(), StagingError> {
570+
if !Path::new(&CONFIG.staging_dir()).exists() {
571+
return Ok(());
572+
}
573+
574+
for stream in self.read().expect(LOCK_EXPECT).values() {
575+
stream
576+
.prepare_parquet(shutdown_signal)
577+
.inspect_err(|err| error!("Failed to run conversion task {err:?}"))?;
578+
}
579+
580+
Ok(())
581+
}
509582
}
510583

511584
#[cfg(test)]

src/storage/object_storage.rs

-87
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,6 @@ use async_trait::async_trait;
4646
use bytes::Bytes;
4747
use chrono::{DateTime, Local, Utc};
4848
use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder};
49-
use futures::stream::FuturesUnordered;
50-
use futures::StreamExt;
5149
use once_cell::sync::OnceCell;
5250
use relative_path::RelativePath;
5351
use relative_path::RelativePathBuf;
@@ -748,91 +746,6 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
748746

749747
Ok(())
750748
}
751-
752-
async fn conversion(&self, shutdown_signal: bool) -> Result<(), ObjectStorageError> {
753-
if !Path::new(&CONFIG.staging_dir()).exists() {
754-
return Ok(());
755-
}
756-
757-
let mut conversion_tasks = FuturesUnordered::new();
758-
for stream in STREAM_INFO.list_streams() {
759-
conversion_tasks.push(conversion_for_stream(stream, shutdown_signal));
760-
}
761-
762-
while let Some(res) = conversion_tasks.next().await {
763-
if let Err(err) = res {
764-
error!("Failed to run conversion task {err:?}");
765-
return Err(err);
766-
}
767-
}
768-
769-
Ok(())
770-
}
771-
}
772-
773-
async fn conversion_for_stream(
774-
stream: String,
775-
shutdown_signal: bool,
776-
) -> Result<(), ObjectStorageError> {
777-
info!("Starting arrow_conversion job for stream- {stream}");
778-
779-
let time_partition = STREAM_INFO
780-
.get_time_partition(&stream)
781-
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
782-
let custom_partition = STREAM_INFO
783-
.get_custom_partition(&stream)
784-
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
785-
let stage = STAGING.get_or_create_stream(&stream);
786-
787-
// read arrow files on disk
788-
// convert them to parquet
789-
let schema = stage
790-
.convert_disk_files_to_parquet(
791-
time_partition.as_ref(),
792-
custom_partition.as_ref(),
793-
shutdown_signal,
794-
)
795-
.map_err(|err| {
796-
warn!("Error while converting arrow to parquet- {err:?}");
797-
ObjectStorageError::UnhandledError(Box::new(err))
798-
})?;
799-
800-
// check if there is already a schema file in staging pertaining to this stream
801-
// if yes, then merge them and save
802-
803-
if let Some(schema) = schema {
804-
let static_schema_flag = STREAM_INFO
805-
.get_static_schema_flag(&stream)
806-
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
807-
if !static_schema_flag {
808-
// schema is dynamic, read from staging and merge if present
809-
810-
// need to add something before .schema to make the file have an extension of type `schema`
811-
let path =
812-
RelativePathBuf::from_iter([format!("{stream}.schema")]).to_path(&stage.data_path);
813-
814-
let staging_schemas = stage.get_schemas_if_present();
815-
if let Some(mut staging_schemas) = staging_schemas {
816-
warn!(
817-
"Found {} schemas in staging for stream- {stream}",
818-
staging_schemas.len()
819-
);
820-
staging_schemas.push(schema);
821-
let merged_schema = Schema::try_merge(staging_schemas)
822-
.map_err(|e| ObjectStorageError::Custom(e.to_string()))?;
823-
824-
warn!("writing merged schema to path- {path:?}");
825-
// save the merged schema on staging disk
826-
// the path should be stream/.ingestor.id.schema
827-
fs::write(path, to_bytes(&merged_schema))?;
828-
} else {
829-
info!("writing single schema to path- {path:?}");
830-
fs::write(path, to_bytes(&schema))?;
831-
}
832-
}
833-
}
834-
835-
Ok(())
836749
}
837750

838751
pub async fn commit_schema_to_storage(

src/sync.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ pub fn arrow_conversion() -> (
209209
if let Err(e) = monitor_task_duration(
210210
"arrow_conversion",
211211
Duration::from_secs(30),
212-
|| async { CONFIG.storage().get_object_store().conversion(false).await },
212+
|| async { STAGING.prepare_parquet(false) },
213213
).await
214214
{
215215
warn!("failed to convert local arrow data to parquet. {e:?}");

0 commit comments

Comments
 (0)