Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC][wip] faster DefaultEngine parquet reads #595

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -22,17 +22,17 @@ rust-version = "1.80"
version = "0.5.0"

[workspace.dependencies]
arrow = { version = ">=53, <54" }
arrow-arith = { version = ">=53, <54" }
arrow-array = { version = ">=53, <54" }
arrow-buffer = { version = ">=53, <54" }
arrow-cast = { version = ">=53, <54" }
arrow-data = { version = ">=53, <54" }
arrow-ord = { version = ">=53, <54" }
arrow-json = { version = ">=53, <54" }
arrow-select = { version = ">=53, <54" }
arrow-schema = { version = ">=53, <54" }
parquet = { version = ">=53, <54", features = ["object_store"] }
arrow = { version = ">=53.3, <54" }
arrow-arith = { version = ">=53.3, <54" }
arrow-array = { version = ">=53.3, <54" }
arrow-buffer = { version = ">=53.3, <54" }
arrow-cast = { version = ">=53.3, <54" }
arrow-data = { version = ">=53.3, <54" }
arrow-ord = { version = ">=53.3, <54" }
arrow-json = { version = ">=53.3, <54" }
arrow-select = { version = ">=53.3, <54" }
arrow-schema = { version = ">=53.3, <54" }
parquet = { version = ">=53.3, <54", features = ["object_store"] }
object_store = { version = ">=0.11, <0.12" }
hdfs-native-object-store = "0.12.0"
hdfs-native = "0.10.0"
5 changes: 4 additions & 1 deletion kernel/examples/read-table-single-threaded/src/main.rs
Original file line number Diff line number Diff line change
@@ -137,6 +137,9 @@ fn try_main() -> DeltaResult<()> {
}
})
.try_collect()?;
print_batches(&batches)?;
// print_batches(&batches)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for experimenting, i'd suggest using the multi-threaded reader. although i guess this does help determine how much a single call can read. regardless, read-table-multi-threaded has a --limit option for this case so you can see that some data got returned but not print it all, but it does tell you the total row count. maybe add that as an option here too :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep thanks I ended up playing with both but yea the --limit is nicer :)

// print the total number of rows
let total_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();
println!("Total rows read: {}", total_rows);
Ok(())
}
2 changes: 1 addition & 1 deletion kernel/src/engine/default/file_stream.rs
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ pub type FileOpenFuture =
/// stream of [`RecordBatch`]
///
/// [`ObjectStore`]: object_store::ObjectStore
pub trait FileOpener: Send + Unpin {
pub trait FileOpener: Send + Unpin + Sync {
/// Asynchronously open the specified file and return a stream
/// of [`RecordBatch`]
fn open(&self, file_meta: FileMeta, range: Option<Range<i64>>) -> DeltaResult<FileOpenFuture>;
192 changes: 178 additions & 14 deletions kernel/src/engine/default/mod.rs
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ use self::executor::TaskExecutor;
use self::filesystem::ObjectStoreFileSystemClient;
use self::json::DefaultJsonHandler;
use self::parquet::DefaultParquetHandler;
use self::parquet2::AsyncParquetHandler;
use super::arrow_data::ArrowEngineData;
use super::arrow_expression::ArrowExpressionHandler;
use crate::schema::Schema;
@@ -31,14 +32,15 @@ pub mod file_stream;
pub mod filesystem;
pub mod json;
pub mod parquet;
pub mod parquet2;
pub mod storage;

#[derive(Debug)]
// #[derive(Debug)]
pub struct DefaultEngine<E: TaskExecutor> {
store: Arc<DynObjectStore>,
file_system: Arc<ObjectStoreFileSystemClient<E>>,
json: Arc<DefaultJsonHandler<E>>,
parquet: Arc<DefaultParquetHandler<E>>,
parquet: Arc<dyn ParquetHandler>,
expression: Arc<ArrowExpressionHandler>,
}

@@ -93,6 +95,12 @@ impl<E: TaskExecutor> DefaultEngine<E> {
// `filesystem.rs`
let store_str = format!("{}", store);
let is_local = store_str.starts_with("LocalFileSystem");
let parquet = Arc::new(DefaultParquetHandler::new(
store.clone(),
task_executor.clone(),
));
let parquet = Arc::new(AsyncParquetHandler::new(store.clone()));

Self {
file_system: Arc::new(ObjectStoreFileSystemClient::new(
store.clone(),
@@ -104,7 +112,7 @@ impl<E: TaskExecutor> DefaultEngine<E> {
store.clone(),
task_executor.clone(),
)),
parquet: Arc::new(DefaultParquetHandler::new(store.clone(), task_executor)),
parquet,
store,
expression: Arc::new(ArrowExpressionHandler {}),
}
@@ -118,8 +126,8 @@ impl<E: TaskExecutor> DefaultEngine<E> {
&self,
data: &ArrowEngineData,
write_context: &WriteContext,
partition_values: HashMap<String, String>,
data_change: bool,
_partition_values: HashMap<String, String>,
_data_change: bool,
) -> DeltaResult<Box<dyn EngineData>> {
let transform = write_context.logical_to_physical();
let input_schema: Schema = data.record_batch().schema().try_into()?;
@@ -129,15 +137,16 @@ impl<E: TaskExecutor> DefaultEngine<E> {
transform.clone(),
output_schema.clone().into(),
);
let physical_data = logical_to_physical_expr.evaluate(data)?;
self.parquet
.write_parquet_file(
write_context.target_dir(),
physical_data,
partition_values,
data_change,
)
.await
let _physical_data = logical_to_physical_expr.evaluate(data)?;
unimplemented!()
// self.parquet
// .write_parquet_file(
// write_context.target_dir(),
// physical_data,
// partition_values,
// data_change,
// )
// .await
}
}

@@ -158,3 +167,158 @@ impl<E: TaskExecutor> Engine for DefaultEngine<E> {
self.parquet.clone()
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use crate::FileMeta;
use crate::Url;

#[test]
fn test_big_parquet_read() {
let files = [
"part-00000-1eed20e0-ffed-42ec-9459-cd7e9920ae86-c000.snappy.parquet",
"part-00001-e7571634-7b4e-4b36-aa8b-1dabd2023b1b-c000.snappy.parquet",
"part-00002-b5002f7e-c190-4a8d-aac5-73699d4708c6-c000.snappy.parquet",
"part-00003-76e19cf6-40c5-4eb1-82aa-7e4eb1a05461-c000.snappy.parquet",
"part-00004-b97e958b-287d-4d2e-a5cc-61ee5313f5cb-c000.snappy.parquet",
"part-00005-3e43af0a-cc9d-4d4a-98da-c64e2f3f1c0c-c000.snappy.parquet",
"part-00006-2db9d154-8105-423a-9789-86db387d5d52-c000.snappy.parquet",
"part-00007-8be8674c-8113-40a0-bea8-14720ebe0334-c000.snappy.parquet",
"part-00008-9dda491d-f90e-4780-ace5-ff7f9a509288-c000.snappy.parquet",
"part-00009-eb0301f3-4e74-47f3-b175-9e5cb13c6b25-c000.snappy.parquet",
"part-00010-e4d63187-a542-4d08-ae48-030a2f16d4b5-c000.snappy.parquet",
"part-00011-78bf0375-5c6e-4716-96ed-f46be99f299b-c000.snappy.parquet",
"part-00012-eb0e01b4-2c14-41ba-a423-ad9c4dd8d59e-c000.snappy.parquet",
"part-00013-509ea10e-ce58-4e72-83d8-5c384cce6f24-c000.snappy.parquet",
"part-00014-dbe072f7-6de5-417a-a0cc-6580b71863f6-c000.snappy.parquet",
"part-00015-d32aaafc-0c0b-4549-bd86-9b00aaa87c7d-c000.snappy.parquet",
"part-00016-6752753d-d6fd-46d0-ad60-93891b7e650e-c000.snappy.parquet",
"part-00017-0a813b31-f101-4446-b035-5716e3856a63-c000.snappy.parquet",
"part-00018-41c0dc65-15de-4340-bc2c-3135094168d8-c000.snappy.parquet",
"part-00019-d242aa3b-30aa-42c7-a68d-7afb39e25e92-c000.snappy.parquet",
"part-00020-5ffce99f-e8af-457a-b948-b7b0848007a3-c000.snappy.parquet",
"part-00021-a6a7e366-88ea-473e-b8c1-3836926a2692-c000.snappy.parquet",
"part-00022-df1f04f7-e310-4e7d-b696-13ea8b96f208-c000.snappy.parquet",
"part-00023-12fdef69-9e84-4c1e-b70a-b0efe351cbed-c000.snappy.parquet",
"part-00024-5ea7aca5-e3bf-4bb4-a151-11700110bc88-c000.snappy.parquet",
"part-00025-939fe0df-9b5b-4686-b9e6-6663868dd0e0-c000.snappy.parquet",
"part-00026-53fbc359-37c5-43c4-8893-0e5de66ebc8a-c000.snappy.parquet",
"part-00027-0e67a127-09cc-4566-b2c6-e68acd09ac86-c000.snappy.parquet",
"part-00028-8a1bd6c4-8793-4f2c-8457-c33498f0d58a-c000.snappy.parquet",
"part-00029-0e81d1fc-51f5-4027-bb1e-fd78322ced9f-c000.snappy.parquet",
"part-00030-5e1e7572-8502-4c40-a9a7-bb6e1ece7d4f-c000.snappy.parquet",
"part-00031-3e1e2862-fffa-4cf1-877a-d71bb7154e1d-c000.snappy.parquet",
"part-00032-69f795b5-3b14-47dc-a838-1beb630d3248-c000.snappy.parquet",
"part-00033-169c0e30-781a-40f9-8d67-a03a11c5686f-c000.snappy.parquet",
"part-00034-2c1543cd-2c96-48d2-8632-368bedc93cf7-c000.snappy.parquet",
"part-00035-2e3bade5-f50c-476f-9ab7-b2e79c54e3a0-c000.snappy.parquet",
"part-00036-e0fde4bb-1a68-4c14-93d4-b4faf6a69523-c000.snappy.parquet",
"part-00037-3ccd9985-7635-404f-886c-5e930cdcf391-c000.snappy.parquet",
"part-00038-24e21c26-b016-4377-b2c4-f9a36c4b54d3-c000.snappy.parquet",
"part-00039-a18c6f27-751e-4e7c-9b46-879be8e218dc-c000.snappy.parquet",
"part-00040-e7a9fad5-72d5-4ea0-81dd-d7d567004e5f-c000.snappy.parquet",
"part-00041-291b7825-a746-44b6-90d4-5e7fe25d9860-c000.snappy.parquet",
"part-00042-13a834db-1ca8-41ba-b506-3c1b49a3433c-c000.snappy.parquet",
"part-00043-d8bc1430-0ecf-417d-aca1-2cf84b228ba2-c000.snappy.parquet",
"part-00044-808a7ef3-5021-40f7-ab60-fb54028e53a7-c000.snappy.parquet",
"part-00045-f3caa90c-9ff5-4c30-888c-f6c048a28b7a-c000.snappy.parquet",
"part-00046-b93fc157-93d0-4d97-bb87-d3c8a7364af3-c000.snappy.parquet",
"part-00047-b6e433ae-02e1-4d40-9f4f-b44a7d00af35-c000.snappy.parquet",
"part-00048-934224c8-03a4-4f6d-8a1d-ac3a33be8936-c000.snappy.parquet",
"part-00049-35fadcf8-dd37-4371-87b9-ecb20b3b10ca-c000.snappy.parquet",
"part-00050-748f1818-814e-4ee3-9381-155d984fed1f-c000.snappy.parquet",
"part-00051-75508184-80bf-4c55-a105-a19f6b4cc325-c000.snappy.parquet",
"part-00052-e5a811c7-1df4-41f0-8e96-88ff10937e5b-c000.snappy.parquet",
"part-00053-25c84512-0b76-45aa-8303-f73da5aa1a10-c000.snappy.parquet",
"part-00054-40b6bd80-f9b1-437f-9a61-db509aef4cff-c000.snappy.parquet",
"part-00055-c03b109b-c0df-4307-949c-63e04444a4bb-c000.snappy.parquet",
"part-00056-54613c5b-f2ed-4ac7-8366-e49989147a2b-c000.snappy.parquet",
"part-00057-0b5851af-dc8b-4dff-b2cb-4fa3e4bce90c-c000.snappy.parquet",
"part-00058-3b289d75-7d06-445f-a282-968e2fadac77-c000.snappy.parquet",
"part-00059-c0baadc0-cf06-455a-9899-6c1662716c55-c000.snappy.parquet",
"part-00060-57baf0ce-7c7e-4b74-a696-cf5cfa95478f-c000.snappy.parquet",
"part-00061-b1b94ff5-a6c8-453d-9a46-89928d00c219-c000.snappy.parquet",
"part-00062-7fcf3eb1-21a9-47bd-b0c4-b8e9ebfb45f5-c000.snappy.parquet",
"part-00063-f8c75bb2-6b67-4880-899f-f7049422f8e4-c000.snappy.parquet",
"part-00064-35f87c88-7ea6-45a1-a8f8-6a969642e8db-c000.snappy.parquet",
"part-00065-64ff3648-b6b4-4da2-afc5-19fd7f9dda86-c000.snappy.parquet",
"part-00066-c8112b58-40fc-4224-80a3-c57d50fe6a5d-c000.snappy.parquet",
"part-00067-86415a13-af6b-46b1-840b-5fe17f99f428-c000.snappy.parquet",
"part-00068-d88eb4b4-fef3-49ee-a613-3ec371638d14-c000.snappy.parquet",
"part-00069-38cf2c6d-9c71-4028-a515-58dbacff00d0-c000.snappy.parquet",
"part-00070-a016a9eb-5493-4522-9072-f6aa6725d071-c000.snappy.parquet",
"part-00071-169a0900-63b3-4da7-b064-6dbefb5c03ac-c000.snappy.parquet",
"part-00072-fdc17060-5aab-47a4-8ae3-be7536c441ff-c000.snappy.parquet",
"part-00073-ff014ad4-b4aa-4627-85c7-5d8309e14716-c000.snappy.parquet",
"part-00074-db8aad8f-3521-4a01-9802-2fd6dada4a3c-c000.snappy.parquet",
"part-00075-d4e9b806-677b-468a-808b-152ccbf39cb6-c000.snappy.parquet",
"part-00076-193b7c62-886d-4d6f-a6b8-f47ffaada74a-c000.snappy.parquet",
"part-00077-89d3b0ed-455c-4790-b773-22b7fbf508d2-c000.snappy.parquet",
"part-00078-f254064d-987e-4464-8dc2-370aff954738-c000.snappy.parquet",
"part-00079-2fd5ce5c-e3fc-4e8b-8753-178f5831f002-c000.snappy.parquet",
"part-00080-5378d594-1048-4e13-afad-a4ca8f1dc42c-c000.snappy.parquet",
"part-00081-9b4f2e4a-ec98-4876-8c0f-3f5944589908-c000.snappy.parquet",
"part-00082-027c187c-1c55-4de2-b0b4-296ec58c1ba7-c000.snappy.parquet",
"part-00083-11f7de5e-9421-4b40-acbb-5c7f98fd200e-c000.snappy.parquet",
"part-00084-f2af393c-0dbd-4d3b-93cf-a92ce2de43dc-c000.snappy.parquet",
"part-00085-61562868-01a3-45fc-be1e-524ff1bbcca9-c000.snappy.parquet",
"part-00086-487df01a-4b4c-4dc9-8c6c-b9dd2b65d269-c000.snappy.parquet",
"part-00087-812f8992-a896-44e5-aba4-f505abb3492a-c000.snappy.parquet",
"part-00088-c8e927e4-2510-4e43-aefe-9bc86e1b2371-c000.snappy.parquet",
"part-00089-2ba2c768-300b-48ee-ab81-dcb5716c8a72-c000.snappy.parquet",
"part-00090-bdeca86e-ce91-45ea-b82a-95a266baeaa4-c000.snappy.parquet",
"part-00091-41d71af6-5e80-4dee-b068-9154d90e120e-c000.snappy.parquet",
"part-00092-53359e14-81de-40ea-85d4-1ac0f8df2e00-c000.snappy.parquet",
"part-00093-454ab78c-ed1a-4a9d-b7b1-064ea5f49747-c000.snappy.parquet",
"part-00094-a02961df-d6dc-49c1-b6af-7490db161a78-c000.snappy.parquet",
"part-00095-a8f95229-df71-4795-9788-6ddeeb4c00c1-c000.snappy.parquet",
"part-00096-a1c24e11-51ef-41b5-ad8b-d219731b37d0-c000.snappy.parquet",
"part-00097-8ca73b6e-8f95-4e47-a5c9-f5a2db6bb406-c000.snappy.parquet",
"part-00098-4b8e9732-a65b-4e33-ab13-ec66e9323300-c000.snappy.parquet",
"part-00099-61972ac8-b84a-4bea-adc1-42e614cb7e47-c000.snappy.parquet",
];

let files: Vec<FileMeta> = files
.iter()
.map(|f| {
let location = Url::parse(&format!(
"file:///Users/zach.schuermann/Desktop/100_file_table/{}",
f
))
.unwrap();
FileMeta {
location,
size: 0,
last_modified: 0,
}
})
.collect();

let store = Arc::new(object_store::local::LocalFileSystem::new());
use crate::engine::default::executor::tokio::TokioBackgroundExecutor;
let engine = super::DefaultEngine::new(
store,
object_store::path::Path::from_filesystem_path(
"/Users/zach.schuermann/Desktop/100_file_table/",
)
.unwrap(),
Arc::new(TokioBackgroundExecutor::new()),
);

use crate::Engine;

// schema is just id int
let schema = Arc::new(crate::schema::StructType::new(vec![
crate::schema::StructField::new("id", crate::schema::DataType::LONG, false),
]));

let res = engine
.get_parquet_handler()
.read_parquet_files(&files, schema, None)
.unwrap();

println!("read {:?} rows", res.map(|r| r.unwrap().len()).sum::<usize>());
}
}
18 changes: 16 additions & 2 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
@@ -202,6 +202,7 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
physical_schema.clone(),
predicate,
self.store.clone(),
None,
)),
};
FileStream::new_async_read_iterator(
@@ -215,13 +216,14 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
}

/// Implements [`FileOpener`] for a parquet file
struct ParquetOpener {
pub(crate) struct ParquetOpener {
// projection: Arc<[usize]>,
batch_size: usize,
table_schema: SchemaRef,
predicate: Option<ExpressionRef>,
limit: Option<usize>,
store: Arc<DynObjectStore>,
runtime: Option<Arc<tokio::runtime::Runtime>>,
}

impl ParquetOpener {
@@ -230,13 +232,15 @@ impl ParquetOpener {
table_schema: SchemaRef,
predicate: Option<ExpressionRef>,
store: Arc<DynObjectStore>,
runtime: Option<Arc<tokio::runtime::Runtime>>,
) -> Self {
Self {
batch_size,
table_schema,
predicate,
limit: None,
store,
runtime,
}
}
}
@@ -251,11 +255,18 @@ impl FileOpener for ParquetOpener {
let table_schema = self.table_schema.clone();
let predicate = self.predicate.clone();
let limit = self.limit;
let handle = match &self.runtime {
Some(runtime) => Some(runtime.handle().clone()),
None => None,
};

Ok(Box::pin(async move {
// TODO avoid IO by converting passed file meta to ObjectMeta
let meta = store.head(&path).await?;
let mut reader = ParquetObjectReader::new(store, meta);
if let Some(handle) = handle {
reader = reader.with_runtime(handle);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does setting this do?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new in arrow 53.3 i think - lets you push down a runtime for them to schedule their IO on. This has gotten me thinking about various ways to enable this sort of 'runtime passthrough' ourselves..

Perform IO on the provided tokio runtime
Tokio is a cooperative scheduler, and relies on tasks yielding in a timely manner to service IO. Therefore, running IO and CPU-bound tasks, such as parquet decoding, on the same tokio runtime can lead to degraded throughput, dropped connections and other issues. For more information see here.

see with_runtime

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Yeah, seems similar to what you're doing.

}
let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default()).await?;
let parquet_schema = metadata.schema();
let (indicies, requested_ordering) =
@@ -281,6 +292,9 @@ impl FileOpener for ParquetOpener {

let stream = builder.with_batch_size(batch_size).build()?;

// println!("read IO");
// tokio::time::sleep(std::time::Duration::from_millis(10000)).await; // simulate IO delay

let stream = stream.map(move |rbr| {
// re-order each batch if needed
rbr.map_err(Error::Parquet).and_then(|rb| {
@@ -293,7 +307,7 @@ impl FileOpener for ParquetOpener {
}

/// Implements [`FileOpener`] for a opening a parquet file from a presigned URL
struct PresignedUrlOpener {
pub(crate) struct PresignedUrlOpener {
batch_size: usize,
predicate: Option<ExpressionRef>,
limit: Option<usize>,
Loading