Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: new null_row ExpressionHandler API #662

Open
wants to merge 29 commits into
base: main
Choose a base branch
from

Conversation

zachschuermann
Copy link
Collaborator

@zachschuermann zachschuermann commented Jan 24, 2025

What changes are proposed in this pull request?

Adds a new required method: new_null API for creating a new single-row null literal EngineData. Then, we provide the create_one API for creating single-row EngineData by implementing a SchemaTransform (LiteralExpressionTransform) to transform the given schema + leaf values into an Expression which evaluates to literal values at the leaves of the schema. (implemented in a new private ExpressionHandlerExtension trait)

  1. Adds the new required fn new_null to our ExpressionHandler trait (breaking)
  2. Adds the new provided fn create_one to an ExpressionHandlerExtension trait
  3. Implements new_null for ArrowExpressionHandler

additionally, adds a new fields_len() method to StructType.

This PR affects the following public APIs

  1. breaking: new new_null API for ExpressionHandler
  2. breaking: new LiteralExpressionTransformError

How was this change tested?

Bunch of new unit tests. For the nullability tests of our new SchemaTransform we came up with a set of 24 exhaustive test cases:

test cases: x, a, b are nullable (n) or not-null (!). we have 6 interesting nullability
combinations:
1. n { n, n }
5. n { n, ! }
6. n { !, ! }
7. ! { n, n }
8. ! { n, ! }
9. ! { !, ! }

and for each we want to test the four combinations of values ("a" and "b" just chosen as
abitrary scalars):

1. (a, b)
2. (N, b)
4. (a, N)
5. (N, N)

here's the full list of test cases with expected output:

n { n, n }
1. (a, b) -> x (a, b)
2. (N, b) -> x (N, b)
3. (a, N) -> x (a, N)
4. (N, N) -> x (N, N)

n { n, ! }
1. (a, b) -> x (a, b)
2. (N, b) -> x (N, b)
3. (a, N) -> Err
4. (N, N) -> x NULL

n { !, ! }
1. (a, b) -> x (a, b)
2. (N, b) -> Err
3. (a, N) -> Err
4. (N, N) -> x NULL

! { n, n }
1. (a, b) -> x (a, b)
2. (N, b) -> x (N, b)
3. (a, N) -> x (a, N)
4. (N, N) -> x (N, N)

! { n, ! }
1. (a, b) -> x (a, b)
2. (N, b) -> x (N, b)
3. (a, N) -> Err
4. (N, N) -> NULL

! { !, ! }
1. (a, b) -> x (a, b)
2. (N, b) -> Err
3. (a, N) -> Err
4. (N, N) -> NULL

@zachschuermann
Copy link
Collaborator Author

note I'll be cleaning up/adding more tests. wanted to get some eyes on this approach first

@github-actions github-actions bot added the breaking-change Change that will require a version bump label Jan 24, 2025
Copy link
Collaborator

@OussamaSaoudi OussamaSaoudi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I want to consider/discuss about this approach is that we require that the expression struct heirarchy matches the schema one. So a schema Struct(Struct(Scalar(Int)) requires an expression Struct(Struct(Literal(int))). This code wouldn't allow a Literal(int) expression.

Idk if we want to enforce that requirement in the long run? It's very common for kernel to flatten out the fields of a schema (ex: in a visitor), so I don't see why we shouldn't allow flattened expressions.

Perhaps this acts as a safety thing. Kernel is the only one calling create_one, and it ensures that things are nested as we expected.

Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quick pass, couple reactions
(overall approach looks good)

Copy link

codecov bot commented Jan 28, 2025

Codecov Report

Attention: Patch coverage is 92.59259% with 38 lines in your changes missing coverage. Please review.

Project coverage is 84.57%. Comparing base (ce31e97) to head (1275207).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...el/src/expressions/literal_expression_transform.rs 88.84% 28 Missing and 2 partials ⚠️
kernel/src/engine/arrow_expression.rs 97.59% 1 Missing and 4 partials ⚠️
kernel/src/lib.rs 84.61% 0 Missing and 2 partials ⚠️
ffi/src/error.rs 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #662      +/-   ##
==========================================
+ Coverage   84.30%   84.57%   +0.27%     
==========================================
  Files          77       78       +1     
  Lines       19102    19614     +512     
  Branches    19102    19614     +512     
==========================================
+ Hits        16103    16588     +485     
- Misses       2201     2220      +19     
- Partials      798      806       +8     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@zachschuermann zachschuermann changed the title feat: new create_one ExpressionHandler API feat!: new create_one ExpressionHandler API Jan 31, 2025
Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flushing comments from an interrupted-and-forgotten review...

Comment on lines 578 to 581
match self.stack.pop() {
Some(array) => Ok(array),
None => Err(Error::generic("didn't build array")),
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relating to the other FIXME about panicking:

Suggested change
match self.stack.pop() {
Some(array) => Ok(array),
None => Err(Error::generic("didn't build array")),
}
let Some(array) = self.stack.pop() else {
return Err(Error::generic("didn't build array"));
}
let Some(array) = array.as_struct_opt() else {
return Err(Error::generic("not a struct"));
}
Ok(array)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think as_struct_opt will return an &StructArray - and I want to avoid having to clone that

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just ended up checking array.data_type() though I wonder if it would be better to actually return an Arc<StructArray> instead of the trait object ArrayRef?

Comment on lines 736 to 753
for (child, field) in child_arrays.iter().zip(struct_type.fields()) {
if !field.is_nullable() && child.is_null(0) {
// if we have a null child array for a not-nullable field, either all other
// children must be null (and we make a null struct) or error
if child_arrays.iter().all(|c| c.is_null(0))
&& self.nullability_stack.iter().any(|n| *n)
{
self.stack.push(Arc::new(StructArray::new_null(fields, 1)));
return Some(Cow::Borrowed(struct_type));
} else {
self.set_error(Error::Generic(format!(
"Non-nullable field {} is null in single-row struct",
field.name()
)));
return None;
}
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm i'm not convinced by this. Pls correct me if I'm missing something! We're keeping track of the parent nullability with the nullability stack. Seems that we allow a nullability violation if any ancestor node is nullable and all the children are null. But I may have found a counter example:
Consider this schema

{
  x(nullable): {
    a (non-nullable),
    b (non-nullable) {
      c (non-nullable)
    }
  }
}

suppose we get the scalar: [1, NULL]

When we're processing struct b, we'll iterate over all of its fields. We'll find that c is null when it's non-nullable. At b I think the nullability stack would be [true, false] from x and b respectively.

Given all these, we don't return an error. We allowed c to be null because we thought its ancestor x is null. That's this check

if child_arrays.iter().all(|c| c.is_null(0)) && self.nullability_stack.iter().any(|n| *n)

But if x is null, then a should also be null, which it isn't.

Copy link
Collaborator

@OussamaSaoudi OussamaSaoudi Feb 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming I'm not missing something, I thought up an alternate solution.

Definitions

We should fail if there is a nullability violation. Nullability violations can happen in 2 cases:

  • Base case: a leaf field is non-nullable, but the value is null.
  • Struct case: A struct has a nullability violation if both hold:
    1. at least one of its children has a nullability violation
    2. The struct does not resolve the nullability violation.

A nullability violation for a struct node is resolved when both hold:
1) all of its children are null
2) the node is nullable.

This is the case where the entire struct is null. All of its children may be null, and violations can be safely ignored.

Solution

We keep track of 2 variables for each node:

  • Null_subtree: This is true if all of the node and all its descendants are null.
  • null_violation: This is true if the node has a nullability violation (as defined above).

And an additional variable for struct nodes:

  • is_resolved: This is true if the node is nullable and the node is null_subtree is True

Base case:

  • null_subtree = True if the leaf is null
  • null_violation = True if the field is non-nullable, but the value is null

Inductive case:

  • null_subtree = True if all the children are null
  • is_resolved = True if null_subtree and current node is nullable
  • null_violation = True if (any child has null_violation) and !(is_resolved)

Return an error if at the top level (null_violation == true).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may have found a counter example:

Ignoring all code for a moment, and tweaking slightly to add d as a sibling to c:

x(nullable): { 
  a (non-nullable), 
  b (non-nullable) { 
    c (non-nullable) 
    d (nullable) 
  }
}
Analysis

At the time we encounter NULL for c, there are only two possible outcomes:

  1. b is non-NULL => definitely an error
  2. b is NULL => possibly allowed (depending on whether b is allowed to be NULL, which in turn depends on whether x is NULL)

However, we are doing a depth-first traversal. So at the time we process e.g. c we have not even seen d yet, let alone processed parent parent b and grandparent x. The stack is [a:<whatever>, c:NULL].

Since we cannot yet know the correct handling of c, we just push its NULL value on the stack and move on to d (which we also just push onto the stack). Once the recursion unwinds to b, we have two possibilities:

  1. [a:<whatever>, c:NULL, d:NULL] -- because all children of b are NULL (c and d), and at least one of those children is "immediately" non-nullable, we assume the intent was to express (by transitivity) the fact that b itself is NULL (recall that b is not a leaf so we can't represent its nullness directly). Result: [a:<whatever>, b:NULL]. Whether that's good or bad is still to be determined transitively as the recursion unwinds.
  2. [a:<whatever>, c:NULL, d:<something>] -- because d is non-NULL, we know b cannot be NULL and therefore it is an error for "immediately" non-nullable c to be NULL. Result: **ERROR**.

Assuming we did not already error out, we again have two possibilities:

  1. [a:NULL, b:NULL] -- as before, all children of x are NULL (a and b), and at least one of those children is "immediately" non-nullable, so we assume the intent was to express x is NULL. Since x is immediately nullable, this is totally legitimate and the recursion completes successfully.
  2. [a:<something>, b: NULL] -- again as before, x cannot be NULL because it has a non-NULL child a. So NULL value for "immediately" non-nullable b is illegal and the recursion errors out.

Coming back to code:

The recursive algorithm would seem to be:

  • For all leaf values, accept NULL values unconditionally, deferring correctness checks to the parent.
  • Whenever the recursion unwinds to reach a (now complete) struct node, examine the children. We have several possible child statuses:
    • All children non-NULL -- No problem, nothing to see here, move on.
    • All children NULL.
      • If all children are nullable, this is fine, and we interpret the parent as non-NULL with all-null children.
      • Otherwise, we interpret this as an indirect way of expression that the parent itself is NULL. As with a leaf value, we accept that NULL value unconditionally, deferring correctness checks to the parent.
    • Otherwise, we have a mix of NULL and non-NULL children. The parent thus cannot be NULL.
      • If any of the NULL children are immediately non-nullable => ERROR
      • Otherwise, no problem, nothing to see here, move on.

If we consider all combos of the above schema, that involve least one NULL:

  • [a:<something>, c:<something>, d:NULL] - OK (x.b.d is nullable)
  • [a:<something>, c:NULL, d:<something>] - ERROR (x.b.c is non-nullable, detected by b)
  • [a:NULL, c:<something>, d:<something>] - ERROR (x.a is non-nullable, detected by x)
  • [a:<something>, c:NULL, d:NULL] - ERROR (x.b is non-nullable, detected by x)
  • [a:NULL, c:<something>, d:NULL] - ERROR (x.a is non-nullable, detected by x)
  • [a:NULL, c:NULL, d:<something>] - ERROR (x.b.c is non-nullable, detected by b)
  • [a:NULL, c:NULL, d:NULL] - OK (x is nullable)

Notably, I dont' think we need a stack to track nullability -- each parent just verifies its direct children for correct match-up of their nullability (and NULL values) vs. its own nullability. If there is no obvious local conflict, it makes itself either NULL or non-null as appropriate and then trusts its parent to do the same checking as needed.

Code
fn transform_struct(&mut self, struct_type: &'a StructType) -> Option<Cow<'a, StructType>> {
    // NOTE: This is an optimization; the other early-return suffices to produce correct behavior.
    if self.error.is_some() {
        return None;
    }
    
    // Only consume newly-added entries (if any). There could be fewer than expected if
    // the recursion encountered an error.
    let mark = self.stack.len();
    let _ = self.recurse_into_struct(struct_type);
    let field_values = self.stack.split_off(mark);
    if self.error.is_some() {
        return None;
    }
    
    require!(field_values.len() == struct_type.len(), ...);
    let mut found_non_nullable_null = false;
    let mut all_null = true;
    for (f, v) in struct_type.fields().zip(&field_values) {
        if v.is_valid(0) {
            all_null = false;
        } else if !f.is_nullable() {
            found_non_nullable_null = true;
        }
    }
    
    let null_buffer = found_non_nullable_null.then(|| {
        // The struct had a non-nullable NULL. This is only legal if all fields were NULL, which we
        // interpret as the struct itself being NULL.
        require!(all_null, ...);
        
        // We already have the all-null columns we need, just need a null buffer
        NullBuffer::new_null(1)
    });
    
    // Assemble the struct normally but mark it NULL? Or make a NULL struct directly?
    let sa = match StructArray::try_new(..., null_buffer) { ... };
    self.stack.push(sa);
    None
}  

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For completeness of testing, we probably need a schema that exercises every possible combo of fields, along with one set of leaf scalars for every possible combo of NULL and non-NULL.

There are six "interesting" combos (n = nullable, ! = non-null):

n { n, n }
n { n, ! }
n { !, ! }
! { n, n }
! { n, ! }
! { !, ! }

Each one can have 4 distinct input value combinations, for a total of 6x4 = 24 cases to test.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: What happens when a struct is non-nullable, but all its children are nullable? Does this mean that we enforce that at least one of the children is non-null?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One last thing I wanted to flag: my original 'nullability stack' started off with 'false' for the root (the root struct array must not be null in order to create a RecordBatch out of it). In the new approach, it's slightly more general and could produce a NULL top-level StructArray which is unable to become a RecordBatch so I've introduced just a simple one-off check that will cause create_one to fail if the transform hands back a NULL StructArray.

aside: I'm not sure why there isn't just an easy API for StructArray to RecordBatch that doesn't panic..? Am I missing it?

Copy link
Collaborator Author

@zachschuermann zachschuermann Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the expected in this case? do we need to treat the top-level NULLs differently? I would expect the following to fail but it seems that arrow disagrees...

x: (not_null) {
  a: (nullable) LONG,
  b: (not_null) LONG,
}

if values = [Null, Null], we get the "all null" struct collapsing at level a,b.
this gives x: (not_null) { NULL }

if we consider all-null children to always be safe, this will also simplify to just a single top-level NULL (feels incorrect)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for some additional context it seems arrow will happily create a StructArray with a not-null field if the null buffer passed in to try_new contains all of the of the corresponding child array's nulls.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my original 'nullability stack' started off with 'false' for the root (the root struct array must not be null in order to create a RecordBatch out of it). In the new approach, it's slightly more general and could produce a NULL top-level StructArray which is unable to become a RecordBatch

That's definitely annoying, and possibly a good reason to keep old behavior that all-null only translates to null struct if some fields are non-nullable...

it seems arrow will happily create a StructArray with a not-null field if the null buffer passed in to try_new contains all of the of the corresponding child array's nulls.

Right, this is similar to our recursive algo -- whether that null top-level value is bad depends on the parent. For example, record batch as a parent does not like top-level NULL, but a nullable field as a parent is totally fine.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i went ahead and reverted to the "only make struct null if required" semantics - I've also documented the exhaustive list of test cases in the description (and implemented)

Comment on lines 736 to 753
for (child, field) in child_arrays.iter().zip(struct_type.fields()) {
if !field.is_nullable() && child.is_null(0) {
// if we have a null child array for a not-nullable field, either all other
// children must be null (and we make a null struct) or error
if child_arrays.iter().all(|c| c.is_null(0))
&& self.nullability_stack.iter().any(|n| *n)
{
self.stack.push(Arc::new(StructArray::new_null(fields, 1)));
return Some(Cow::Borrowed(struct_type));
} else {
self.set_error(Error::Generic(format!(
"Non-nullable field {} is null in single-row struct",
field.name()
)));
return None;
}
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may have found a counter example:

Ignoring all code for a moment, and tweaking slightly to add d as a sibling to c:

x(nullable): { 
  a (non-nullable), 
  b (non-nullable) { 
    c (non-nullable) 
    d (nullable) 
  }
}
Analysis

At the time we encounter NULL for c, there are only two possible outcomes:

  1. b is non-NULL => definitely an error
  2. b is NULL => possibly allowed (depending on whether b is allowed to be NULL, which in turn depends on whether x is NULL)

However, we are doing a depth-first traversal. So at the time we process e.g. c we have not even seen d yet, let alone processed parent parent b and grandparent x. The stack is [a:<whatever>, c:NULL].

Since we cannot yet know the correct handling of c, we just push its NULL value on the stack and move on to d (which we also just push onto the stack). Once the recursion unwinds to b, we have two possibilities:

  1. [a:<whatever>, c:NULL, d:NULL] -- because all children of b are NULL (c and d), and at least one of those children is "immediately" non-nullable, we assume the intent was to express (by transitivity) the fact that b itself is NULL (recall that b is not a leaf so we can't represent its nullness directly). Result: [a:<whatever>, b:NULL]. Whether that's good or bad is still to be determined transitively as the recursion unwinds.
  2. [a:<whatever>, c:NULL, d:<something>] -- because d is non-NULL, we know b cannot be NULL and therefore it is an error for "immediately" non-nullable c to be NULL. Result: **ERROR**.

Assuming we did not already error out, we again have two possibilities:

  1. [a:NULL, b:NULL] -- as before, all children of x are NULL (a and b), and at least one of those children is "immediately" non-nullable, so we assume the intent was to express x is NULL. Since x is immediately nullable, this is totally legitimate and the recursion completes successfully.
  2. [a:<something>, b: NULL] -- again as before, x cannot be NULL because it has a non-NULL child a. So NULL value for "immediately" non-nullable b is illegal and the recursion errors out.

Coming back to code:

The recursive algorithm would seem to be:

  • For all leaf values, accept NULL values unconditionally, deferring correctness checks to the parent.
  • Whenever the recursion unwinds to reach a (now complete) struct node, examine the children. We have several possible child statuses:
    • All children non-NULL -- No problem, nothing to see here, move on.
    • All children NULL.
      • If all children are nullable, this is fine, and we interpret the parent as non-NULL with all-null children.
      • Otherwise, we interpret this as an indirect way of expression that the parent itself is NULL. As with a leaf value, we accept that NULL value unconditionally, deferring correctness checks to the parent.
    • Otherwise, we have a mix of NULL and non-NULL children. The parent thus cannot be NULL.
      • If any of the NULL children are immediately non-nullable => ERROR
      • Otherwise, no problem, nothing to see here, move on.

If we consider all combos of the above schema, that involve least one NULL:

  • [a:<something>, c:<something>, d:NULL] - OK (x.b.d is nullable)
  • [a:<something>, c:NULL, d:<something>] - ERROR (x.b.c is non-nullable, detected by b)
  • [a:NULL, c:<something>, d:<something>] - ERROR (x.a is non-nullable, detected by x)
  • [a:<something>, c:NULL, d:NULL] - ERROR (x.b is non-nullable, detected by x)
  • [a:NULL, c:<something>, d:NULL] - ERROR (x.a is non-nullable, detected by x)
  • [a:NULL, c:NULL, d:<something>] - ERROR (x.b.c is non-nullable, detected by b)
  • [a:NULL, c:NULL, d:NULL] - OK (x is nullable)

Notably, I dont' think we need a stack to track nullability -- each parent just verifies its direct children for correct match-up of their nullability (and NULL values) vs. its own nullability. If there is no obvious local conflict, it makes itself either NULL or non-null as appropriate and then trusts its parent to do the same checking as needed.

Code
fn transform_struct(&mut self, struct_type: &'a StructType) -> Option<Cow<'a, StructType>> {
    // NOTE: This is an optimization; the other early-return suffices to produce correct behavior.
    if self.error.is_some() {
        return None;
    }
    
    // Only consume newly-added entries (if any). There could be fewer than expected if
    // the recursion encountered an error.
    let mark = self.stack.len();
    let _ = self.recurse_into_struct(struct_type);
    let field_values = self.stack.split_off(mark);
    if self.error.is_some() {
        return None;
    }
    
    require!(field_values.len() == struct_type.len(), ...);
    let mut found_non_nullable_null = false;
    let mut all_null = true;
    for (f, v) in struct_type.fields().zip(&field_values) {
        if v.is_valid(0) {
            all_null = false;
        } else if !f.is_nullable() {
            found_non_nullable_null = true;
        }
    }
    
    let null_buffer = found_non_nullable_null.then(|| {
        // The struct had a non-nullable NULL. This is only legal if all fields were NULL, which we
        // interpret as the struct itself being NULL.
        require!(all_null, ...);
        
        // We already have the all-null columns we need, just need a null buffer
        NullBuffer::new_null(1)
    });
    
    // Assemble the struct normally but mark it NULL? Or make a NULL struct directly?
    let sa = match StructArray::try_new(..., null_buffer) { ... };
    self.stack.push(sa);
    None
}  

Comment on lines 736 to 753
for (child, field) in child_arrays.iter().zip(struct_type.fields()) {
if !field.is_nullable() && child.is_null(0) {
// if we have a null child array for a not-nullable field, either all other
// children must be null (and we make a null struct) or error
if child_arrays.iter().all(|c| c.is_null(0))
&& self.nullability_stack.iter().any(|n| *n)
{
self.stack.push(Arc::new(StructArray::new_null(fields, 1)));
return Some(Cow::Borrowed(struct_type));
} else {
self.set_error(Error::Generic(format!(
"Non-nullable field {} is null in single-row struct",
field.name()
)));
return None;
}
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For completeness of testing, we probably need a schema that exercises every possible combo of fields, along with one set of leaf scalars for every possible combo of NULL and non-NULL.

There are six "interesting" combos (n = nullable, ! = non-null):

n { n, n }
n { n, ! }
n { !, ! }
! { n, n }
! { n, ! }
! { !, ! }

Each one can have 4 distinct input value combinations, for a total of 6x4 = 24 cases to test.

@roeap
Copy link
Collaborator

roeap commented Feb 3, 2025

Just thinking out loud here...

I do think we can already do a lot of data generation using the existing expression API. The main thing that is missing is the ability to communicate the desired number of rows in evaluate.

The code below produces data much like we want it to.

let add_expr = Expression::struct_from([
    Expression::literal("file:///path"),
    Expression::literal(100),
    Expression::literal(Scalar::Null(DeltaDataTypes::INTEGER)),
]);
let schema = StructType::new(vec![
    StructField::new("path", DeltaDataTypes::STRING, false),
    StructField::new("size", DeltaDataTypes::INTEGER, false),
    StructField::new("size_null", DeltaDataTypes::INTEGER, true),
]);

let dummy_schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
let dummy_batch = RecordBatch::try_new(
    Arc::new(dummy_schema),
    vec![Arc::new(BooleanArray::from(vec![true]))],
)
.unwrap();

let handler = ArrowExpressionHandler {};
let evaluator = handler.get_evaluator(schema.clone().into(), add_expr, schema.into());

let data = Box::new(ArrowEngineData::new(dummy_batch));

let result = evaluator.evaluate(data.as_ref()).unwrap();
let result = result
    .any_ref()
    .downcast_ref::<ArrowEngineData>()
    .unwrap()
    .record_batch()
    .clone();

print_batches(&[result]).unwrap();

As the implementation we expect engines for to provide for expression evaluation, I wonder if it is simpler for the engine if we use the expression mechanics and maybe add a method evaluate_one(&self) ... which tells the engine to evaluate an expression over an empty batch with one row?

The current approach here feels more explicit, but would also incur more work for engines wanting to adopt?

@scovich
Copy link
Collaborator

scovich commented Feb 4, 2025

Interesting. If I try to distill/refine the idea, is it basically this?

  1. Define a new API whose only job is to produce a "dummy" engine data (***) with the requested number of rows
  2. Kernel uses the result of that API call as the input to an otherwise unremarkable expression evaluation

(***) The ideal "dummy" engine data would have no columns, but arrow probably doesn't allow that. So the next best would wrap a NullArray in a RecordBatch with an unpredictable field name. A uuid would work nicely for example.

@zachschuermann zachschuermann changed the title feat!: new create_one ExpressionHandler API feat!: new null_row and create_one ExpressionHandler API Mar 7, 2025
Copy link
Collaborator

@nicklan nicklan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some comments

Some(s) => s,
None => {
self.set_error(Error::generic(
"Not enough scalars to create a single-row array",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think the error is actually: Number of scalars didn't match number of leaf elements in schema (or something like that)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was old code? or an old comment?

/// `values`.
// Note: we will stick with a Schema instead of DataType (more constrained can expand in
// future)
fn create_one(&self, schema: SchemaRef, values: &[Scalar]) -> DeltaResult<Box<dyn EngineData>> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we ever expect engines to override this with their own implementation?

If not, we should remove it from the trait and just make it a pub(crate) function somewhere.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm that's a good point - I could conceive of an engine that would prefer to 'build' their own arrays like in my old implementation instead of doing the new_null and expression evaluation, but perhaps we should just make this pub(crate) for now and let that optimization occur if engines ask for it? I guess the downside of the current approach is just 'leaking' extra code in pub interface?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'd keep this pub(crate) for now if we don't know we need it. We can always move it to the trait if someone asks but it's much harder to the the other way.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure sounds good. i moved to a separate 'extension' trait - lmk if you think of a better way to implement it. (and unfortunately rustc keeps warning that it isn't used even though it is - if i remove it doesn't compile lol - so i added allow(dead_code) to get it to shut up and can make an issue to try to sort that out later?)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrmm, odd. Yeah let's make an issue just so we don't completely forget it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea will do - was getting annoyed with it so decided to not go further down that rabbit hole hahaha

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -0,0 +1,540 @@
use std::borrow::Cow;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an impl specifically for arrow. I think we should then move it into kernel/src/engine and call it arrow_single_row_transform.rs.

We don't generally let arrow leak out of there :) Open to other places that make sense too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I think I may have left some incorrect comments, but this shouldn't be dependent on arrow at all (just schema-to-expression transform)

@@ -15,6 +15,8 @@ use crate::DataType;
mod column_names;
mod scalars;

pub(crate) mod single_row_transform;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to predicate including this on having an arrow dep

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commented above but just adding here: shouldn't be dependent on arrow since this is just Schema to Expression

@zachschuermann zachschuermann requested a review from nicklan March 10, 2025 20:45
Comment on lines +367 to +378
let (fields, columns, nulls) = applied.into_parts();
if let Some(nulls) = nulls {
if nulls.null_count() != 0 {
return Err(Error::invalid_struct_data(
"Top-level nulls in struct are not supported",
));
}
}
Ok(RecordBatch::try_new(
Arc::new(ArrowSchema::new(fields)),
columns,
)?)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I included this change so that we could leverage the existing arrow_expression infra with the new changes. without this we will panic within arrow on some of the tests I have for top-level nulls (instead of just returning an error)

/// `values`.
// Note: we will stick with a Schema instead of DataType (more constrained can expand in
// future)
fn create_one(&self, schema: SchemaRef, values: &[Scalar]) -> DeltaResult<Box<dyn EngineData>> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'd keep this pub(crate) for now if we don't know we need it. We can always move it to the trait if someone asks but it's much harder to the the other way.

handler.create_one(schema, values),
Err(Error::InvalidStructData(_))
));
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oof on the file size. We should probably split the tests out, and even split this all up. Maybe just make an issue for it (would be a good first issue)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zachschuermann zachschuermann requested a review from nicklan March 13, 2025 23:11
Copy link
Collaborator

@nicklan nicklan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, thanks!

/// `values`.
// Note: we will stick with a Schema instead of DataType (more constrained can expand in
// future)
fn create_one(&self, schema: SchemaRef, values: &[Scalar]) -> DeltaResult<Box<dyn EngineData>> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrmm, odd. Yeah let's make an issue just so we don't completely forget it

@zachschuermann zachschuermann changed the title feat!: new null_row and create_one ExpressionHandler API feat!: new null_row ExpressionHandler API Mar 14, 2025
@@ -35,6 +35,8 @@ use crate::expressions::{
VariadicExpression, VariadicOperator,
};
use crate::schema::{ArrayType, DataType, MapType, PrimitiveType, Schema, SchemaRef, StructField};
#[allow(unused_imports)]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zachschuermann zachschuermann requested a review from roeap March 18, 2025 15:08
Copy link
Collaborator

@roeap roeap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Just a nit to maybe think about (sometime :)).


/// Any error for [`LiteralExpressionTransform`]
#[derive(thiserror::Error, Debug)]
pub enum Error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I do very much like the pattern to have dedicated and very specific errors in sub-modules, I also learned (the hard way :)), that this sometimes ends up in a nested mess ... One thing that worked in the past, is to use such errors, but not expose them in the top level error and make the struct non-pub.

This is likely for a thing for a follow-up though, if other feel the same.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, I suspect we need to do what rust has always done, and what spark is now doing after many years of arbitrary expression hierarchies: Have a single error class that encapsulates a "soft" hierarchy of error classification codes (which are traditionally strings satisfying the regexp [0-9A-Z]+ (all-caps alpnanumeric). Easier to extend, easier to document, etc.

NOTE: This approach does not prevent us from internally using and defining exception hierarchies, enums, etc. It just makes the crate a lot easier to deal with because adding new private exception types is no longer a breaking change.

)?;
self.stack.push(arr);

Some(Cow::Borrowed(prim_type))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we always ignore the return value, should we just return None here?
Or are we consistently using None to mean error? (I didn't see any code that checks tho?)

Comment on lines 131 to 135
let arr = self.check_error(
scalar
.to_array(1)
.map_err(|delta_error| Error::Schema(delta_error.to_string())),
)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can save a line hoisting out an intermediate result:

Suggested change
let arr = self.check_error(
scalar
.to_array(1)
.map_err(|delta_error| Error::Schema(delta_error.to_string())),
)?;
let arr = scalar
.to_array(1)
.map_err(|delta_error| Error::Schema(delta_error.to_string()));
let arr = self.check_error(arr)?;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could save even more by adding a constructor for Error::schema that takes impl Into<String>:

Suggested change
let arr = self.check_error(
scalar
.to_array(1)
.map_err(|delta_error| Error::Schema(delta_error.to_string())),
)?;
let arr = self.check_error(scalar.to_array(1).map_err(Error::schema))?;

(several call sites would benefit from that)

Comment on lines 192 to 198
// first always check error to terminate early if possible
if self.error.is_some() {
return None;
}
self.set_error(Error::Unsupported(
"ArrayType not yet supported for creating single-row array".to_string(),
));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is the first check even needed, given that we unconditionally error out?
I guess checking first avoids the warning log?

Might it be reasonable to directly set the error instead?

Suggested change
// first always check error to terminate early if possible
if self.error.is_some() {
return None;
}
self.set_error(Error::Unsupported(
"ArrayType not yet supported for creating single-row array".to_string(),
));
self.error = Error::Unsupported(
"ArrayType not yet supported for creating single-row array".to_string(),
);

Comment on lines +551 to +554
.map(|field| {
let data_type = field.data_type();
Scalar::Null(data_type.clone()).to_array(1)
})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why not just

Suggested change
.map(|field| {
let data_type = field.data_type();
Scalar::Null(data_type.clone()).to_array(1)
})
.map(|field| Scalar::Null(field.data_type().clone()).to_array(1))

Comment on lines +1229 to +1232
StructField::nullable("b", DataType::LONG),
StructField::not_null("b", DataType::LONG),
StructField::nullable("c", DataType::LONG),
StructField::nullable("c", DataType::LONG),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a pretty sketchy scenario... should we at least document the expected behavior? Do we keep the first or the last version for each field name?

Comment on lines +153 to +155
let struct_expr = if all_null && found_non_nullable_null {
Expression::null_literal(struct_type.clone().into())
} else if found_non_nullable_null {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a little easier to read with nesting?

let struct_expr = if found_non_nullable_null {
    if !all_null {
          ...
        return None;
    }
    Expression::null_literal(struct_type.clone().into())
} else {
    Expression::struct_from(field_exprs)
}

}
}

fn set_error(&mut self, e: Error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we make this return None, then all the call sites simplify. For example, check_error below turns to:

        match result {
            Ok(val) => Some(val),
            Err(err) => self.set_error(err.into()),
        }

Is the "cleverness" worth it?

Comment on lines +98 to +100
if self.error.is_some() {
return None;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If we stored Result<(), Error> instead of Option<Error> then we could do:

Suggested change
if self.error.is_some() {
return None;
}
let _ = self.error.ok()?;

Is the "cleverness" worth it?

Comment on lines +181 to +188
// first always check error to terminate early if possible
if self.error.is_some() {
return None;
}
self.set_error(Error::Unsupported(
"ArrayType not yet supported in TODO".to_string(),
));
None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If we applied the other two suggestions, this code simplifies to just:

Suggested change
// first always check error to terminate early if possible
if self.error.is_some() {
return None;
}
self.set_error(Error::Unsupported(
"ArrayType not yet supported in TODO".to_string(),
));
None
// first always check error to terminate early if possible
let _ = self.error.ok()?;
self.set_error(Error::Unsupported(
"ArrayType not yet supported in TODO".to_string(),
))

Comment on lines +61 to +63
if let Some(e) = self.error {
return Err(e);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we took the other suggestion,

Suggested change
if let Some(e) = self.error {
return Err(e);
}
let _ = self.error?;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
breaking-change Change that will require a version bump
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants