Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix!: get prefix from offset path #699

Merged
merged 41 commits into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
a3a7671
fix: get prefix from offset path
roeap Feb 14, 2025
5d7a754
test: add acceptance tests for list_from
roeap Feb 15, 2025
8340575
docs: update list_from docs
roeap Feb 15, 2025
f08143a
test: fix tests
roeap Feb 15, 2025
c042df6
fix: try using path from url to fix windows failures
roeap Feb 15, 2025
c0b028e
fix: try using path from url to fix windows failures
roeap Feb 15, 2025
7c72265
Update kernel/src/engine/default/filesystem.rs
roeap Mar 19, 2025
24129f7
fix: Handle predicates on non-nullable columns without stats (#700)
adamreeve Feb 19, 2025
8b1dffe
feat!(ffi): new visit_schema FFI and rename old visit_schema to visit…
zachschuermann Feb 19, 2025
6654cca
feat: introduce feature flags to select major arrow versions (#654)
rtyler Feb 20, 2025
b07fc6d
feat: Support writing to not only 3/7 protocol (#693)
nicklan Feb 20, 2025
2e4bdfa
Part 4: read_table.c uses transform in ffi (#614)
nicklan Feb 21, 2025
301094f
feat!(ffi): remove `visit_snapshot_schema`, add `logical_schema` (#709)
zachschuermann Feb 21, 2025
0a77e57
Support --all-features again (#708)
nicklan Feb 24, 2025
e224573
fix: Make having `need_arrow` + no arrow a compiler error (#717)
nicklan Feb 25, 2025
08309bc
Release 0.7.0 (#716)
zachschuermann Feb 25, 2025
5c4d579
pin chrono version to fix arrow compilation failure (#719)
hntd187 Feb 28, 2025
51553f2
fix!(ffi): Visit decimals as signed values (#724)
scovich Mar 3, 2025
bf97a24
feat!(ffi): Make get_partition_column* work on a snapshot. (#697)
nicklan Mar 3, 2025
945ff1c
fix!: bump MSRV to 1.81 (#725)
zachschuermann Mar 3, 2025
9b1a91f
perf!: replace default engine JSON reader's `FileStream` with concurr…
zachschuermann Mar 3, 2025
b4ab4a9
release 0.8.0 (#726)
zachschuermann Mar 4, 2025
2f6c049
feat: extract & insert sidecar batches in `replay`'s action iterator …
sebastiantia Mar 5, 2025
1b7fb11
feat: support the `v2Checkpoint` reader/writer feature (#685)
sebastiantia Mar 6, 2025
9daa09f
Update HDFS dependencies (#689)
rzepinskip Mar 7, 2025
1c2fae7
feat: Add check for whether appendOnly table feature is supported or …
OussamaSaoudi Mar 11, 2025
cb67447
tests: add V2 checkpoint read support integration tests (#690)
sebastiantia Mar 12, 2025
725dc70
feat: Add basic partition pruning support (#713)
scovich Mar 12, 2025
bb795a7
feat: add `DeletionVectors` to supported writer features (#735)
zachschuermann Mar 13, 2025
51095d4
feat: add writer version 2/invariant table feature support (#734)
zachschuermann Mar 17, 2025
43b346c
fix!: change metadata values for column metadata to i64 (#733)
hntd187 Mar 19, 2025
ed34c42
ci: use maintained action to setup rust toolchain (#585)
roeap Mar 19, 2025
6e8e08c
fix: pr feedback
roeap Mar 19, 2025
0455a6d
Merge branch 'main' into feat/engine-fs
roeap Mar 19, 2025
4138dd4
fix: new default engine callsites
roeap Mar 19, 2025
01a9d52
fix: remove unused import
roeap Mar 19, 2025
b81e478
fix: pr feedback
roeap Mar 19, 2025
cdf814d
fix: pr feedback
roeap Mar 20, 2025
5450621
Merge branch 'main' into feat/engine-fs
roeap Mar 20, 2025
12cf723
Merge branch 'main' into feat/engine-fs
roeap Mar 20, 2025
79bd24d
Update kernel/src/engine/default/filesystem.rs
roeap Mar 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ impl<T> Default for ReferenceSet<T> {
#[cfg(test)]
mod tests {
use delta_kernel::engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine};
use object_store::{memory::InMemory, path::Path};
use object_store::memory::InMemory;
use test_utils::{actions_to_string, add_commit, TestAction};

use super::*;
Expand Down Expand Up @@ -792,11 +792,7 @@ mod tests {
actions_to_string(vec![TestAction::Metadata]),
)
.await?;
let engine = DefaultEngine::new(
storage.clone(),
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
);
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
let engine = engine_to_handle(Arc::new(engine), allocate_err);
let path = "memory:///";

Expand Down
31 changes: 18 additions & 13 deletions kernel/src/engine/default/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::{DeltaResult, Error, FileMeta, FileSlice, FileSystemClient};
pub struct ObjectStoreFileSystemClient<E: TaskExecutor> {
inner: Arc<DynObjectStore>,
has_ordered_listing: bool,
table_root: Path,
task_executor: Arc<E>,
readahead: usize,
}
Expand All @@ -23,13 +22,11 @@ impl<E: TaskExecutor> ObjectStoreFileSystemClient<E> {
pub(crate) fn new(
store: Arc<DynObjectStore>,
has_ordered_listing: bool,
table_root: Path,
task_executor: Arc<E>,
) -> Self {
Self {
inner: store,
has_ordered_listing,
table_root,
task_executor,
readahead: 10,
}
Expand All @@ -49,8 +46,18 @@ impl<E: TaskExecutor> FileSystemClient for ObjectStoreFileSystemClient<E> {
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<FileMeta>>>> {
let url = path.clone();
let offset = Path::from(path.path());
// TODO properly handle table prefix
let prefix = self.table_root.child("_delta_log");
let prefix = if url.path().ends_with('/') {
offset.clone()
} else {
let parts = offset.parts().collect_vec();
if parts.is_empty() {
return Err(Error::generic(format!(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this will double format, use Error::Generic

"Offset path must not be a root directory. Got: '{}'",
url.as_str()
)));
}
Path::from_iter(parts[..parts.len() - 1].iter().cloned())
};

let store = self.inner.clone();

Expand Down Expand Up @@ -192,11 +199,9 @@ mod tests {
let mut url = Url::from_directory_path(tmp.path()).unwrap();

let store = Arc::new(LocalFileSystem::new());
let prefix = Path::from(url.path());
let client = ObjectStoreFileSystemClient::new(
store,
false, // don't have ordered listing
prefix,
Arc::new(TokioBackgroundExecutor::new()),
);

Expand Down Expand Up @@ -229,11 +234,10 @@ mod tests {
store.put(&name, data.clone().into()).await.unwrap();

let table_root = Url::parse("memory:///").expect("valid url");
let prefix = Path::from_url_path(table_root.path()).expect("Couldn't get path");
let engine = DefaultEngine::new(store, prefix, Arc::new(TokioBackgroundExecutor::new()));
let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new()));
let files: Vec<_> = engine
.get_file_system_client()
.list_from(&table_root)
.list_from(&table_root.join("_delta_log/0").unwrap())
.unwrap()
.try_collect()
.unwrap();
Expand All @@ -260,11 +264,12 @@ mod tests {

let url = Url::from_directory_path(tmp.path()).unwrap();
let store = Arc::new(LocalFileSystem::new());
let prefix = Path::from_url_path(url.path()).expect("Couldn't get path");
let engine = DefaultEngine::new(store, prefix, Arc::new(TokioBackgroundExecutor::new()));
let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new()));
let client = engine.get_file_system_client();

let files = client.list_from(&Url::parse("file://").unwrap()).unwrap();
let files = client
.list_from(&url.join("_delta_log/0").unwrap())
.unwrap();
let mut len = 0;
for (file, expected) in files.zip(expected_names.iter()) {
assert!(
Expand Down
26 changes: 21 additions & 5 deletions kernel/src/engine/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use self::storage::parse_url_opts;
use object_store::{path::Path, DynObjectStore};
use object_store::DynObjectStore;
use url::Url;

use self::executor::TaskExecutor;
Expand Down Expand Up @@ -60,8 +60,8 @@ impl<E: TaskExecutor> DefaultEngine<E> {
V: Into<String>,
{
// table root is the path of the table in the ObjectStore
let (store, table_root) = parse_url_opts(table_root, options)?;
Ok(Self::new(Arc::new(store), table_root, task_executor))
let (store, _table_root) = parse_url_opts(table_root, options)?;
Ok(Self::new(Arc::new(store), task_executor))
}

/// Create a new [`DefaultEngine`] instance
Expand All @@ -71,7 +71,7 @@ impl<E: TaskExecutor> DefaultEngine<E> {
/// - `store`: The object store to use.
/// - `table_root_path`: The root path of the table within storage.
/// - `task_executor`: Used to spawn async IO tasks. See [executor::TaskExecutor].
pub fn new(store: Arc<DynObjectStore>, table_root: Path, task_executor: Arc<E>) -> Self {
pub fn new(store: Arc<DynObjectStore>, task_executor: Arc<E>) -> Self {
// HACK to check if we're using a LocalFileSystem from ObjectStore. We need this because
// local filesystem doesn't return a sorted list by default. Although the `object_store`
// crate explicitly says it _does not_ return a sorted listing, in practice all the cloud
Expand All @@ -97,7 +97,6 @@ impl<E: TaskExecutor> DefaultEngine<E> {
file_system: Arc::new(ObjectStoreFileSystemClient::new(
store.clone(),
!is_local,
table_root,
task_executor.clone(),
)),
json: Arc::new(DefaultJsonHandler::new(
Expand Down Expand Up @@ -158,3 +157,20 @@ impl<E: TaskExecutor> Engine for DefaultEngine<E> {
self.parquet.clone()
}
}

#[cfg(test)]
mod tests {
use super::executor::tokio::TokioBackgroundExecutor;
use super::*;
use crate::engine::tests::test_arrow_engine;
use object_store::local::LocalFileSystem;

#[test]
fn test_default_engine() {
let tmp = tempfile::tempdir().unwrap();
let url = Url::from_directory_path(tmp.path()).unwrap();
let store = Arc::new(LocalFileSystem::new());
let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new()));
test_arrow_engine(&engine, &url);
}
}
77 changes: 77 additions & 0 deletions kernel/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,80 @@ pub(crate) mod arrow_get_data;
pub(crate) mod ensure_data_types;
#[cfg(any(feature = "default-engine-base", feature = "sync-engine"))]
pub mod parquet_row_group_skipping;

#[cfg(test)]
mod tests {
use arrow_array::{RecordBatch, StringArray};
use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
use itertools::Itertools;
use object_store::path::Path;
use std::sync::Arc;
use url::Url;

use crate::engine::arrow_data::ArrowEngineData;
use crate::{Engine, EngineData};

use test_utils::delta_path_for_version;

fn test_list_from_should_sort_and_filter(
engine: &dyn Engine,
base_url: &Url,
engine_data: impl Fn() -> Box<dyn EngineData>,
) {
let json = engine.get_json_handler();
let get_data = || Box::new(std::iter::once(Ok(engine_data())));

let expected_names: Vec<Path> = (1..4)
.map(|i| delta_path_for_version(i, "json"))
.collect_vec();

for i in expected_names.iter().rev() {
let path = base_url.join(i.as_ref()).unwrap();
json.write_json_file(&path, get_data(), false).unwrap();
}
let path = base_url.join("other").unwrap();
json.write_json_file(&path, get_data(), false).unwrap();

let fs = engine.get_file_system_client();

// list files after an offset
let test_url = base_url.join(expected_names[0].as_ref()).unwrap();
let files: Vec<_> = fs.list_from(&test_url).unwrap().try_collect().unwrap();
assert_eq!(files.len(), expected_names.len() - 1);
for (file, expected) in files.iter().zip(expected_names.iter().skip(1)) {
assert_eq!(file.location, base_url.join(expected.as_ref()).unwrap());
}

let test_url = base_url
.join(delta_path_for_version(0, "json").as_ref())
.unwrap();
let files: Vec<_> = fs.list_from(&test_url).unwrap().try_collect().unwrap();
assert_eq!(files.len(), expected_names.len());

// list files inside a directory / key prefix
let test_url = base_url.join("_delta_log/").unwrap();
let files: Vec<_> = fs.list_from(&test_url).unwrap().try_collect().unwrap();
assert_eq!(files.len(), expected_names.len());
for (file, expected) in files.iter().zip(expected_names.iter()) {
assert_eq!(file.location, base_url.join(expected.as_ref()).unwrap());
}
}

fn get_arrow_data() -> Box<dyn EngineData> {
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"dog",
ArrowDataType::Utf8,
true,
)]));
let data = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(StringArray::from(vec!["remi", "wilson"]))],
)
.unwrap();
Box::new(ArrowEngineData::new(data))
}

pub(crate) fn test_arrow_engine(engine: &dyn Engine, base_url: &Url) {
test_list_from_should_sort_and_filter(engine, base_url, get_arrow_data);
}
}
8 changes: 4 additions & 4 deletions kernel/src/engine/sync/fs_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl FileSystemClient for SyncFilesystemClient {
let all_ents: Vec<_> = std::fs::read_dir(path_to_read)?
.filter(|ent_res| {
match (ent_res, min_file_name) {
(Ok(ent), Some(min_file_name)) => ent.file_name() >= *min_file_name,
(Ok(ent), Some(min_file_name)) => ent.file_name() > *min_file_name,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a behavior change? Does it need to be part of this refactor PR?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need is a strong word, but IIRC, if we split this, we need some more refactoring in wrapping objects store to filter out that one item. Since object_store behaves this way and this is the behavior that is documented in java kernel, i thought to just go for it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Come to think of it, this was required since i introduced tests that run against default-engine and sync-engine which were were inconsistent in that regard. With this change, both engines behave the same.

I then guess for most (if not all) current users of one of our engines things stay the same.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, but then can we include this in the changes listed in the PR description?

_ => true, // Keep unfiltered and/or error entries
}
})
Expand Down Expand Up @@ -106,7 +106,7 @@ mod tests {
writeln!(f, "null")?;
f.flush()?;

let url_path = tmp_dir.path().join(get_json_filename(1));
let url_path = tmp_dir.path().join(get_json_filename(0));
let url = Url::from_file_path(url_path).unwrap();
let files: Vec<_> = client.list_from(&url)?.try_collect()?;

Expand Down Expand Up @@ -137,11 +137,11 @@ mod tests {
// i+1 in index because we started at 0001 in the listing
assert_eq!(
file?.location.to_file_path().unwrap().to_str().unwrap(),
expected[i + 1].to_str().unwrap()
expected[i + 2].to_str().unwrap()
);
file_count += 1;
}
assert_eq!(file_count, 2);
assert_eq!(file_count, 1);

let url_path = tmp_dir.path().join("");
let url = Url::from_file_path(url_path).unwrap();
Expand Down
4 changes: 4 additions & 0 deletions kernel/src/engine/sync/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ impl JsonHandler for SyncJsonHandler {
)));
};

if !parent.exists() {
std::fs::create_dir_all(parent)?;
}

// write data to tmp file
let mut tmp_file = NamedTempFile::new_in(parent)?;
let buf = to_json_bytes(data)?;
Expand Down
14 changes: 14 additions & 0 deletions kernel/src/engine/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,17 @@ where
.map(|data| Ok(Box::new(ArrowEngineData::new(data??.into())) as _));
Ok(Box::new(result))
}

#[cfg(test)]
mod tests {
use super::*;
use crate::engine::tests::test_arrow_engine;

#[test]
fn test_sync_engine() {
let tmp = tempfile::tempdir().unwrap();
let url = url::Url::from_directory_path(tmp.path()).unwrap();
let engine = SyncEngine::new();
test_arrow_engine(&engine, &url);
}
}
7 changes: 5 additions & 2 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
//!
//! Delta Kernel needs to perform some basic operations against file systems like listing and
//! reading files. These interactions are encapsulated in the [`FileSystemClient`] trait.
//! Implementors must take care that all assumptions on the behavior if the functions - like sorted
//! Implementers must take care that all assumptions on the behavior if the functions - like sorted
//! results - are respected.
//!
//! ## Reading log and data files
Expand Down Expand Up @@ -346,8 +346,11 @@ pub trait ExpressionHandler: AsAny {
/// file system where the Delta table is present. Connector implementation of
/// this trait can hide filesystem specific details from Delta Kernel.
pub trait FileSystemClient: AsAny {
/// List the paths in the same directory that are lexicographically greater or equal to
/// List the paths in the same directory that are lexicographically greater than
/// (UTF-8 sorting) the given `path`. The result should also be sorted by the file name.
///
/// If the path is directory-like (ends with '/'), the result should contain
/// all the files in the directory.
Comment on lines +351 to +355
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% sure this is the behavior we want to go for, but thought I"d put up the PR for discussion.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it would be better to split this behavior change to a different PR, if at all possible?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we keep this here can we add to the PR description? I think this is a breaking change

fn list_from(&self, path: &Url)
-> DeltaResult<Box<dyn Iterator<Item = DeltaResult<FileMeta>>>>;

Expand Down
1 change: 0 additions & 1 deletion kernel/src/log_segment/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ fn build_log_with_paths_and_checkpoint(
let client = ObjectStoreFileSystemClient::new(
store,
false, // don't have ordered listing
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
);

Expand Down
3 changes: 0 additions & 3 deletions kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,9 @@ mod tests {
let url = url::Url::from_directory_path(path).unwrap();

let store = Arc::new(LocalFileSystem::new());
let prefix = Path::from(url.path());
let client = ObjectStoreFileSystemClient::new(
store,
false, // don't have ordered listing
prefix,
Arc::new(TokioBackgroundExecutor::new()),
);
let cp = read_last_checkpoint(&client, &url).unwrap();
Expand Down Expand Up @@ -291,7 +289,6 @@ mod tests {
let client = ObjectStoreFileSystemClient::new(
store,
false, // don't have ordered listing
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
);
let url = Url::parse("memory:///valid/").expect("valid url");
Expand Down
14 changes: 2 additions & 12 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ async fn single_commit_two_add_files() -> Result<(), Box<dyn std::error::Error>>
let location = Url::parse("memory:///")?;
let engine = Arc::new(DefaultEngine::new(
storage.clone(),
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
));

Expand Down Expand Up @@ -113,11 +112,7 @@ async fn two_commits() -> Result<(), Box<dyn std::error::Error>> {
.await?;

let location = Url::parse("memory:///").unwrap();
let engine = DefaultEngine::new(
storage.clone(),
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
);
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));

let table = Table::new(location);
let expected_data = vec![batch.clone(), batch];
Expand Down Expand Up @@ -171,11 +166,7 @@ async fn remove_action() -> Result<(), Box<dyn std::error::Error>> {
.await?;

let location = Url::parse("memory:///").unwrap();
let engine = DefaultEngine::new(
storage.clone(),
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
);
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));

let table = Table::new(location);
let expected_data = vec![batch];
Expand Down Expand Up @@ -249,7 +240,6 @@ async fn stats() -> Result<(), Box<dyn std::error::Error>> {
let location = Url::parse("memory:///").unwrap();
let engine = Arc::new(DefaultEngine::new(
storage.clone(),
Path::from(""),
Arc::new(TokioBackgroundExecutor::new()),
));

Expand Down
Loading
Loading