Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Fix UNION field nullability tracking #14356

Merged
merged 6 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 5 additions & 23 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ use datafusion_common::file_options::file_type::FileType;
use datafusion_common::{
exec_err, get_target_functional_dependencies, internal_err, not_impl_err,
plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, DataFusionError,
FunctionalDependencies, Result, ScalarValue, TableReference, ToDFSchema,
UnnestOptions,
Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions,
};
use datafusion_expr_common::type_coercion::binary::type_union_resolution;

Expand Down Expand Up @@ -1518,27 +1517,10 @@ pub fn validate_unique_names<'a>(
/// [`TypeCoercionRewriter::coerce_union`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/struct.TypeCoercionRewriter.html#method.coerce_union
/// [`coerce_union_schema`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/fn.coerce_union_schema.html
pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
if left_plan.schema().fields().len() != right_plan.schema().fields().len() {
return plan_err!(
"UNION queries have different number of columns: \
left has {} columns whereas right has {} columns",
left_plan.schema().fields().len(),
right_plan.schema().fields().len()
);
}

// Temporarily use the schema from the left input and later rely on the analyzer to
// coerce the two schemas into a common one.

// Functional Dependencies doesn't preserve after UNION operation
let schema = (**left_plan.schema()).clone();
let schema =
Arc::new(schema.with_functional_dependencies(FunctionalDependencies::empty())?);

Ok(LogicalPlan::Union(Union {
inputs: vec![Arc::new(left_plan), Arc::new(right_plan)],
schema,
}))
Ok(LogicalPlan::Union(Union::try_new_with_loose_types(vec![
Arc::new(left_plan),
Arc::new(right_plan),
])?))
}

/// Create Projection
Expand Down
114 changes: 106 additions & 8 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,15 +699,13 @@ impl LogicalPlan {
}))
}
LogicalPlan::Union(Union { inputs, schema }) => {
let input_schema = inputs[0].schema();
// If inputs are not pruned do not change schema
// TODO this seems wrong (shouldn't we always use the schema of the input?)
let schema = if schema.fields().len() == input_schema.fields().len() {
Arc::clone(&schema)
let first_input_schema = inputs[0].schema();
if schema.fields().len() == first_input_schema.fields().len() {
// If inputs are not pruned do not change schema
Ok(LogicalPlan::Union(Union { inputs, schema }))
} else {
Arc::clone(input_schema)
};
Ok(LogicalPlan::Union(Union { inputs, schema }))
Ok(LogicalPlan::Union(Union::try_new(inputs)?))
}
}
LogicalPlan::Distinct(distinct) => {
let distinct = match distinct {
Expand Down Expand Up @@ -2645,6 +2643,106 @@ pub struct Union {
pub schema: DFSchemaRef,
}

impl Union {
/// Constructs new Union instance deriving schema from inputs.
fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
let schema = Self::derive_schema_from_inputs(&inputs, false)?;
Ok(Union { inputs, schema })
}

/// Constructs new Union instance deriving schema from inputs.
/// Inputs do not have to have matching types and produced schema will
/// take type from the first input.
pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another name for this one is try_new_with_coerce_types to emphasize it is coercing the types

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this one should be coercing types, but it's not doing this. It just takes a type from first UNION branch.
This is not new logic though. It's just moved from plan builder over here, to avoid duplicating code.

I didn't know why this logic existed in this shape, but it felt intentional enough not to simply replace it in this PR. I would prefer it to be fixed later... Hence this function name to make the caller wonder what "loose types" mean.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this was noted once before. Thank you for making it more explicit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> #14380

let schema = Self::derive_schema_from_inputs(&inputs, true)?;
Ok(Union { inputs, schema })
}

/// Constructs new Union instance deriving schema from inputs.
///
/// `loose_types` if true, inputs do not have to have matching types and produced schema will
/// take type from the first input. TODO this is not necessarily reasonable behavior.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible alternative: coerce type to common type? https://www.postgresql.org/docs/17/typeconv-union-case.html is what PG does

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is what should be happening.
There are two places where this code is run:

  1. plan building
  2. recompute_schema

For now in the (1) case I retained pre-existing logic, see current main

// Temporarily use the schema from the left input and later rely on the analyzer to
// coerce the two schemas into a common one.

For (2) we don't want any coercions at all.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> #14380

fn derive_schema_from_inputs(
inputs: &[Arc<LogicalPlan>],
loose_types: bool,
) -> Result<DFSchemaRef> {
if inputs.len() < 2 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is already code that computes the coerced schema in the analyzer:

pub fn coerce_union_schema(inputs: &[Arc<LogicalPlan>]) -> Result<DFSchema> {

Can we reuse the same logic? Maybe we can move the coercion code here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the optimizer case, no coercion logic should be invoked, the types must match.
So we need a version of this code which does that. (#14296 (comment))

For the variant which constructs new schema from some uncoerced inputs -- currently called "loose types" and currently not doing coercion to maintain behavior as it was -- yes, i agree this one could use the coerce_union_schema from the analyzer (perhaps moving the logic into here)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can file a ticket describing what is desired and leave a comment in the code referring to that ticket

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making sure I understand the use case. If I want to construct a UNION logical plan with different types that are coercible (be it by current builtin rules or future user-defined rules), then I would use the Union::try_new_with_loose_types and have the analyzer pass handle coercion. Is this right?

Then what exactly is the use case for the Union::try_new? Since it's used in the schema recompute which can occur after the analyzer type coercion. Do we therefore need it to always perform proper coercion, including any future user-defined coercion rules?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

f I want to construct a UNION logical plan with different types that are coercible (be it by current builtin rules or future user-defined rules), then I would use the Union::try_new_with_loose_types and have the analyzer pass handle coercion. Is this right?

Correct.
Note: this is not my design. It was exactly the same before the PR. Just the code moved around.

Then what exactly is the use case for the Union::try_new? Since it's used in the schema recompute which can occur after the analyzer type coercion.

"schema recompute" is an overloaded term
if it runs after analyzer, it doesn't have to do any type coercion. In fact, it MUST NOT do any type coercion (#14296 (comment)). And in fact the try_new does not do any coercions. It's still needed to do column pruning.
In fact, IMO we should remove "schema recompute" from optimizer: #14357. For column pruning we should explicitly prune inputs of union and the unin itself using the same set of "required columns/indices". No need for a generic "recompute schema".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can file a ticket describing what is desired and leave a comment in the code referring to that ticket

added link to #14380 in the code.
while #14357 is also very relevant, i don't see a place where a link would be suitable, so not adding it for now

return plan_err!("UNION requires at least two inputs");
}
let first_schema = inputs[0].schema();
let fields_count = first_schema.fields().len();
for input in inputs {
if fields_count != input.schema().fields().len() {
return plan_err!(
"UNION queries have different number of columns: \
left has {} columns whereas right has {} columns",
fields_count,
input.schema().fields().len()
);
}
}

let union_fields = (0..fields_count)
.map(|i| {
let fields = inputs
.iter()
.map(|input| input.schema().field(i))
.collect::<Vec<_>>();
let first_field = fields[0];
let name = first_field.name();
let data_type = if loose_types {
// TODO apply type coercion here, or document why it's better to defer
// temporarily use the data type from the left input and later rely on the analyzer to
// coerce the two schemas into a common one.
first_field.data_type()
} else {
fields.iter().skip(1).try_fold(
first_field.data_type(),
|acc, field| {
if acc != field.data_type() {
return plan_err!(
"UNION field {i} have different type in inputs: \
left has {} whereas right has {}",
first_field.data_type(),
field.data_type()
);
}
Ok(acc)
},
)?
};
let nullable = fields.iter().any(|field| field.is_nullable());
let mut field = Field::new(name, data_type.clone(), nullable);
let field_metadata =
intersect_maps(fields.iter().map(|field| field.metadata()));
field.set_metadata(field_metadata);
// TODO reusing table reference from the first schema is probably wrong
let table_reference = first_schema.qualified_field(i).0.cloned();
Ok((table_reference, Arc::new(field)))
})
.collect::<Result<_>>()?;
let union_schema_metadata =
intersect_maps(inputs.iter().map(|input| input.schema().metadata()));

// Functional Dependencies doesn't preserve after UNION operation
let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
let schema = Arc::new(schema);

Ok(schema)
}
}

fn intersect_maps<'a>(
inputs: impl IntoIterator<Item = &'a HashMap<String, String>>,
) -> HashMap<String, String> {
let mut inputs = inputs.into_iter();
let mut merged: HashMap<String, String> = inputs.next().cloned().unwrap_or_default();
for input in inputs {
merged.retain(|k, v| input.get(k) == Some(v));
}
merged
}

// Manual implementation needed because of `schema` field. Comparison excludes this field.
impl PartialOrd for Union {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Expand Down
15 changes: 15 additions & 0 deletions datafusion/sqllogictest/test_files/union.slt
Original file line number Diff line number Diff line change
Expand Up @@ -836,3 +836,18 @@ physical_plan
# Clean up after the test
statement ok
drop table aggregate_test_100;

# test for https://github.com/apache/datafusion/issues/14352
query TB rowsort
SELECT
a,
a IS NOT NULL
FROM (
-- second column, even though it's not selected, was necessary to reproduce the bug linked above
SELECT 'foo' AS a, 3 AS b
UNION ALL
SELECT NULL AS a, 4 AS b
)
----
NULL false
foo true