Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: new null_row ExpressionHandler API #662

Merged
merged 34 commits into from
Mar 28, 2025
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
3606dc2
create_one
zachschuermann Jan 24, 2025
b6130f2
do schema transform instead
zachschuermann Jan 24, 2025
6fb4f7f
datatypes, tests, cleanup - still need to do nullability
zachschuermann Jan 28, 2025
f5471b1
Merge remote-tracking branch 'upstream/main' into create-engine-data
zachschuermann Jan 28, 2025
975dd01
cleanup
zachschuermann Jan 31, 2025
ad643d7
more cleanup
zachschuermann Jan 31, 2025
e032799
cleanup added test
zachschuermann Jan 31, 2025
ae0f03a
Merge remote-tracking branch 'upstream/main' into create-engine-data
zachschuermann Jan 31, 2025
09548b1
do nullability craziness
zachschuermann Feb 1, 2025
2aa3a52
checkpoint - all working
zachschuermann Feb 4, 2025
a4237b2
all null children = null struct
zachschuermann Feb 4, 2025
1bf432e
new mod
zachschuermann Feb 5, 2025
f15f955
test cleanup
zachschuermann Feb 5, 2025
321f592
clippy
zachschuermann Feb 5, 2025
54d7395
test cleanup
zachschuermann Feb 6, 2025
bb27429
Merge branch 'main' into create-engine-data
zachschuermann Feb 6, 2025
d78c989
new null_row API
zachschuermann Mar 7, 2025
166dbcf
Merge remote-tracking branch 'upstream/main' into create-engine-data
zachschuermann Mar 7, 2025
058650d
single_row_transform cleanup
zachschuermann Mar 10, 2025
bc953b6
fix single_row_transform, re-add tests
zachschuermann Mar 10, 2025
66e30eb
Merge remote-tracking branch 'upstream/main' into create-engine-data
zachschuermann Mar 10, 2025
3a09224
fix
zachschuermann Mar 10, 2025
acd0645
error fix
zachschuermann Mar 10, 2025
e18abeb
Merge remote-tracking branch 'upstream/main' into create-engine-data
zachschuermann Mar 12, 2025
53e3d9d
rename to LiteralExpressionTransform
zachschuermann Mar 12, 2025
3fab250
fix null_row to take output_type
zachschuermann Mar 12, 2025
31b10f6
private create_one
zachschuermann Mar 13, 2025
c2cbd9a
Merge branch 'main' into create-engine-data
zachschuermann Mar 13, 2025
1275207
comments
zachschuermann Mar 14, 2025
9f24358
Merge remote-tracking branch 'upstream/main' into create-engine-data
zachschuermann Mar 26, 2025
1ac4ae0
address feedback
zachschuermann Mar 26, 2025
8ee3ac0
Merge remote-tracking branch 'upstream/main' into create-engine-data
zachschuermann Mar 26, 2025
51ae3df
Merge branch 'main' into create-engine-data
zachschuermann Mar 27, 2025
d6426fe
fix nits
zachschuermann Mar 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ffi/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub enum KernelError {
ChangeDataFeedUnsupported,
ChangeDataFeedIncompatibleSchema,
InvalidCheckpoint,
SingleRowTransformError,
}

impl From<Error> for KernelError {
Expand Down Expand Up @@ -110,6 +111,7 @@ impl From<Error> for KernelError {
KernelError::ChangeDataFeedIncompatibleSchema
}
Error::InvalidCheckpoint(_) => KernelError::InvalidCheckpoint,
Error::SingleRowTransformError(_) => KernelError::SingleRowTransformError,
}
}
}
Expand Down
184 changes: 181 additions & 3 deletions kernel/src/engine/arrow_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,14 @@ use crate::expressions::{
BinaryExpression, BinaryOperator, Expression, Scalar, UnaryExpression, UnaryOperator,
VariadicExpression, VariadicOperator,
};
use crate::schema::{ArrayType, DataType, MapType, PrimitiveType, Schema, SchemaRef, StructField};
use crate::schema::{
ArrayType, DataType, MapType, PrimitiveType, Schema, SchemaRef, SchemaTransform, StructField,
};
use crate::{EngineData, ExpressionEvaluator, ExpressionHandler};

pub(crate) mod single_row_transform;
use single_row_transform::SingleRowTransform;

// TODO leverage scalars / Datum

fn downcast_to_bool(arr: &dyn Array) -> DeltaResult<&BooleanArray> {
Expand Down Expand Up @@ -526,6 +531,25 @@ impl ExpressionHandler for ArrowExpressionHandler {
output_type,
})
}

/// TODO
fn create_one(&self, schema: SchemaRef, values: &[Scalar]) -> DeltaResult<Box<dyn EngineData>> {
let mut array_transform = SingleRowTransform::new(values);
let datatype = schema.into();
// we build up the array within the `SingleRowArrayTransform` - we don't actually care
// about the 'transformed' type
let _transformed = array_transform.transform(&datatype);
let array = array_transform.into_struct_array()?;
let struct_array = array.as_struct_opt().unwrap(); // FIXME

// detect top-level null
if struct_array.is_null(0) {
return Err(Error::generic("Top-level null in single-row array"));
}

let record_batch: RecordBatch = struct_array.into(); // FIXME
Ok(Box::new(ArrowEngineData::new(record_batch)))
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -568,13 +592,13 @@ impl ExpressionEvaluator for DefaultExpressionEvaluator {
mod tests {
use std::ops::{Add, Div, Mul, Sub};

use arrow_array::{GenericStringArray, Int32Array};
use arrow_array::{create_array, GenericStringArray, Int32Array};
use arrow_buffer::ScalarBuffer;
use arrow_schema::{DataType, Field, Fields, Schema};

use super::*;
use crate::expressions::*;
use crate::schema::ArrayType;
use crate::schema::{ArrayType, StructType};
use crate::DataType as DeltaDataTypes;

#[test]
Expand Down Expand Up @@ -867,4 +891,158 @@ mod tests {
let expected = Arc::new(BooleanArray::from(vec![true, false]));
assert_eq!(results.as_ref(), expected.as_ref());
}

// helper to take values/schema to pass to `create_one` and assert the result = expected
fn assert_create_one(values: &[Scalar], schema: SchemaRef, expected: RecordBatch) {
let handler = ArrowExpressionHandler;
let actual = handler.create_one(schema, values).unwrap();
let actual_rb: RecordBatch = actual
.into_any()
.downcast::<ArrowEngineData>()
.unwrap()
.into();
assert_eq!(actual_rb, expected);
}

#[test]
fn test_create_one() {
let values: &[Scalar] = &[
1.into(),
"B".into(),
3.into(),
Scalar::Null(DeltaDataTypes::INTEGER),
];
let schema = Arc::new(StructType::new([
StructField::nullable("a", DeltaDataTypes::INTEGER),
StructField::nullable("b", DeltaDataTypes::STRING),
StructField::not_null("c", DeltaDataTypes::INTEGER),
StructField::nullable("d", DeltaDataTypes::INTEGER),
]));

let expected_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Utf8, true),
Field::new("c", DataType::Int32, false),
Field::new("d", DataType::Int32, true),
]));
let expected = RecordBatch::try_new(
expected_schema,
vec![
create_array!(Int32, [1]),
create_array!(Utf8, ["B"]),
create_array!(Int32, [3]),
create_array!(Int32, [None]),
],
)
.unwrap();
assert_create_one(values, schema, expected);
}

#[test]
fn test_create_one_nested() {
let values: &[Scalar] = &[1.into(), 2.into()];
let schema = Arc::new(StructType::new([StructField::not_null(
"a",
DeltaDataTypes::struct_type([
StructField::nullable("b", DeltaDataTypes::INTEGER),
StructField::not_null("c", DeltaDataTypes::INTEGER),
]),
)]));
let expected_schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Struct(
vec![
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, false),
]
.into(),
),
false,
)]));
let expected = RecordBatch::try_new(
expected_schema,
vec![Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("b", DataType::Int32, true)),
create_array!(Int32, [1]) as ArrayRef,
),
(
Arc::new(Field::new("c", DataType::Int32, false)),
create_array!(Int32, [2]) as ArrayRef,
),
]))],
)
.unwrap();
assert_create_one(values, schema, expected);
}

#[test]
fn test_create_one_nested_null() {
let values: &[Scalar] = &[Scalar::Null(DeltaDataTypes::INTEGER), 1.into()];
let schema = Arc::new(StructType::new([StructField::not_null(
"a",
DeltaDataTypes::struct_type([
StructField::nullable("b", DeltaDataTypes::INTEGER),
StructField::not_null("c", DeltaDataTypes::INTEGER),
]),
)]));
let expected_schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Struct(
vec![
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, false),
]
.into(),
),
false,
)]));
let expected = RecordBatch::try_new(
expected_schema,
vec![Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("b", DataType::Int32, true)),
create_array!(Int32, [None]) as ArrayRef,
),
(
Arc::new(Field::new("c", DataType::Int32, false)),
create_array!(Int32, [1]) as ArrayRef,
),
]))],
)
.unwrap();
assert_create_one(values, schema, expected);
}

#[test]
fn test_create_one_not_null_struct() {
let values: &[Scalar] = &[
Scalar::Null(DeltaDataTypes::INTEGER),
Scalar::Null(DeltaDataTypes::INTEGER),
];
let schema = Arc::new(StructType::new([StructField::not_null(
"a",
DeltaDataTypes::struct_type([
StructField::not_null("b", DeltaDataTypes::INTEGER),
StructField::nullable("c", DeltaDataTypes::INTEGER),
]),
)]));
let handler = ArrowExpressionHandler;
assert!(handler.create_one(schema, values).is_err());
}

#[test]
fn test_create_one_top_level_null() {
let values = &[Scalar::Null(DeltaDataTypes::INTEGER)];
let handler = ArrowExpressionHandler;

let schema = Arc::new(StructType::new([StructField::not_null(
"col_1",
DeltaDataTypes::INTEGER,
)]));
assert!(matches!(
handler.create_one(schema, values),
Err(Error::Generic(_))
));
}
}
Loading
Loading