Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf!: replace default engine JSON reader's FileStream with concurrent futures #711

Merged
merged 39 commits into from
Mar 3, 2025
Merged
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
ea417b5
let's see
nicklan Feb 21, 2025
504b20c
support --all-features again
nicklan Feb 21, 2025
c217abc
revert
nicklan Feb 21, 2025
8926729
workflows use --all-features again
nicklan Feb 21, 2025
04cdd68
Merge branch 'main' into fix-semvar-check
nicklan Feb 21, 2025
4aa86aa
wip: simple buffered streams
zachschuermann Feb 21, 2025
b0869fc
into_iter
zachschuermann Feb 21, 2025
ffed827
Merge remote-tracking branch 'upstream/main' into concurrent-json
zachschuermann Feb 21, 2025
0cd0cf3
cleaner selection + readme
nicklan Feb 21, 2025
85ebcb8
also for parquet.rs
nicklan Feb 21, 2025
ae8c559
add a `need_arrow` flag
nicklan Feb 22, 2025
22394c8
Merge branch 'main' into fix-semvar-check
nicklan Feb 22, 2025
ce2667f
Merge remote-tracking branch 'nick/fix-semvar-check' into concurrent-…
zachschuermann Feb 24, 2025
1f2f79c
Merge branch 'main' into concurrent-json
zachschuermann Feb 24, 2025
9862a3a
Merge remote-tracking branch 'refs/remotes/origin/concurrent-json' in…
zachschuermann Feb 24, 2025
bc16927
Merge remote-tracking branch 'upstream/main' into concurrent-json
zachschuermann Feb 25, 2025
971ed43
make Json opener async fn and add test
zachschuermann Feb 25, 2025
1a14f90
fmt
zachschuermann Feb 25, 2025
811cc2e
fix comments
zachschuermann Feb 25, 2025
494a470
cleanup
zachschuermann Feb 25, 2025
ba09853
comments and add warn for error
zachschuermann Feb 25, 2025
5ab75c4
address feedback
zachschuermann Feb 25, 2025
f6f5729
add with_buffer_size and deprecate the readahead one
zachschuermann Feb 25, 2025
df7a819
add deterministic test via OrderedGetStore
zachschuermann Feb 27, 2025
f0270c7
clean up imports
zachschuermann Feb 27, 2025
14e1288
combine keys and wakers under one lock
zachschuermann Feb 27, 2025
968926a
address feedback
zachschuermann Feb 27, 2025
baf04d0
better test_read_json_files_ordering
zachschuermann Feb 27, 2025
e05fdfa
fix docs
zachschuermann Feb 28, 2025
5063f8c
revert small changes
zachschuermann Feb 28, 2025
3316d8d
address feedback
zachschuermann Feb 28, 2025
2cff468
comment
zachschuermann Feb 28, 2025
751e582
add small buffer test
zachschuermann Feb 28, 2025
69284d2
flatmap
zachschuermann Feb 28, 2025
8551bca
fix
zachschuermann Feb 28, 2025
1e6f746
Merge branch 'main' into concurrent-json
zachschuermann Mar 1, 2025
e11dc01
remove 'with_readahead'
zachschuermann Mar 3, 2025
d00503f
Merge remote-tracking branch 'refs/remotes/origin/concurrent-json' in…
zachschuermann Mar 3, 2025
f390be1
Merge branch 'main' into concurrent-json
zachschuermann Mar 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
address feedback
zachschuermann committed Feb 28, 2025
commit 3316d8d89caad0e9190284875088e793485dec36
2 changes: 1 addition & 1 deletion kernel/Cargo.toml
Original file line number Diff line number Diff line change
@@ -150,6 +150,7 @@ rustc_version = "0.4.1"
[dev-dependencies]
delta_kernel = { path = ".", features = ["arrow", "default-engine", "sync-engine"] }
test_utils = { path = "../test-utils" }
async-trait = "0.1" # only used for our custom SlowGetStore ObjectStore implementation
paste = "1.0"
test-log = { version = "0.2", default-features = false, features = ["trace"] }
tempfile = "3"
@@ -159,4 +160,3 @@ tracing-subscriber = { version = "0.3", default-features = false, features = [
"env-filter",
"fmt",
] }
async-trait = "0.1" # only used for our custom SlowGetStore ObjectStore implementation
21 changes: 8 additions & 13 deletions kernel/src/engine/default/json.rs
Original file line number Diff line number Diff line change
@@ -129,8 +129,8 @@ impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
let mut stream = stream::iter(file_futures)
.buffered(buffer_size)
.try_flatten()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does try_flatten do? And where is it defined/documented?
(my google-fu is apparently weak today)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, found it -- TryStreamExt::try_flatten (not to be confused with TryFutureExt::try_flatten).

So open returns a future (whose Ok result is a stream) and try_flatten effectively concatenates all those streams into a single stream, but preserving any Err results?

And this is the key to preserving order, because each stream is ordered within its file, and the flattened stream guarantees that

each individual stream will get exhausted before moving on to the next

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes exactly! (and I'll document this more in line)

the key to ordering is both buffered and the try_flatten both combinators on the stream which each retain ordering

.map_ok(|record_batch| {
Box::new(ArrowEngineData::new(record_batch)) as Box<dyn EngineData>
.map_ok(|record_batch| -> Box<dyn EngineData> {
Box::new(ArrowEngineData::new(record_batch))
});

// send each record batch over the channel
@@ -303,7 +303,7 @@ mod tests {
state: Mutex<KeysAndWakers>,
}

#[derive(Debug, Default)]
#[derive(Debug)]
struct KeysAndWakers {
// Queue of paths in order which they will resolve
ordered_keys: VecDeque<Path>,
@@ -556,11 +556,7 @@ mod tests {
let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));
let physical_schema = Arc::new(ArrowSchema::try_from(get_log_schema().as_ref()).unwrap());
let data: Vec<RecordBatch> = handler
.read_json_files(
files,
Arc::new(physical_schema.clone().try_into().unwrap()),
None,
)
.read_json_files(files, get_log_schema().clone(), None)
.unwrap()
.map_ok(into_record_batch)
.try_collect()
@@ -629,8 +625,9 @@ mod tests {
future::join_all(handles).await;
drop(tx);

// NB (from mpsc::IntoIter): This iterator will block whenever next is called, waiting for
// a new message, and None will be returned if the corresponding channel has hung up.
// NB (from mpsc::Receiver::recv): This function will always block the current thread if
// there is no data available and it's possible for more data to be sent (at least one
// sender still exists).
let mut completed = Vec::new();
while let Ok(path) = rx.recv() {
completed.push(path);
@@ -717,9 +714,7 @@ mod tests {
.iter()
.flat_map(|batch| {
let val_col: &Int32Array = batch.column(0).as_primitive();
(0..val_col.len())
.map(|i| val_col.value(i))
.collect::<Vec<_>>()
(0..val_col.len()).map(|i| val_col.value(i)).collect_vec()
})
.collect();
assert_eq!(all_values, (0..1000).collect_vec());