Skip to content

Commit 2f76a6a

Browse files
committed
equivalence classes: use normalized mapping for projection
1 parent 09a0844 commit 2f76a6a

File tree

3 files changed

+142
-12
lines changed

3 files changed

+142
-12
lines changed

datafusion/physical-expr/src/equivalence/class.rs

+20
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,19 @@ impl EquivalenceGroup {
535535
.collapse()
536536
}
537537

538+
/// Makes a new [`ProjectionMapping`] where each source expression is normalized.
539+
pub fn normalize_mapping(&self, mapping: &ProjectionMapping) -> ProjectionMapping {
540+
ProjectionMapping {
541+
map: mapping
542+
.map
543+
.iter()
544+
.map(|(src, dst)| {
545+
(self.normalize_expr(Arc::clone(src)), Arc::clone(dst))
546+
})
547+
.collect(),
548+
}
549+
}
550+
538551
/// Projects `expr` according to the given projection mapping.
539552
/// If the resulting expression is invalid after projection, returns `None`.
540553
pub fn project_expr(
@@ -584,6 +597,13 @@ impl EquivalenceGroup {
584597
.collect::<Vec<_>>();
585598
(new_class.len() > 1).then_some(EquivalenceClass::new(new_class))
586599
});
600+
601+
// We need to find equivalent projected expressions.
602+
// e.g. table with columns [a,b,c] and a == b, projection: [a+c, b+c].
603+
// To conclude that a + c == b + c we firsty normalize all source expressions
604+
// in the mapping, then merge all equivalent expressions into the classes.
605+
let mapping = self.normalize_mapping(mapping);
606+
587607
// the key is the source expression and the value is the EquivalenceClass that contains the target expression of the source expression.
588608
let mut new_classes: IndexMap<Arc<dyn PhysicalExpr>, EquivalenceClass> =
589609
IndexMap::new();

datafusion/physical-expr/src/equivalence/properties.rs

+1-10
Original file line numberDiff line numberDiff line change
@@ -920,16 +920,7 @@ impl EquivalenceProperties {
920920
fn normalized_mapping(&self, mapping: &ProjectionMapping) -> ProjectionMapping {
921921
// Construct the mapping where source expressions are normalized. In this way
922922
// In the algorithms below we can work on exact equalities
923-
ProjectionMapping {
924-
map: mapping
925-
.iter()
926-
.map(|(source, target)| {
927-
let normalized_source =
928-
self.eq_group.normalize_expr(Arc::clone(source));
929-
(normalized_source, Arc::clone(target))
930-
})
931-
.collect(),
932-
}
923+
self.eq_group.normalize_mapping(mapping)
933924
}
934925

935926
/// Computes projected orderings based on a given projection mapping.

datafusion/physical-plan/src/projection.rs

+121-2
Original file line numberDiff line numberDiff line change
@@ -1000,13 +1000,19 @@ mod tests {
10001000
use super::*;
10011001
use std::sync::Arc;
10021002

1003-
use crate::common::collect;
1003+
use crate::execution_plan::Boundedness;
10041004
use crate::test;
1005+
use crate::{common::collect, execution_plan::EmissionType};
1006+
use datafusion_expr::Operator;
1007+
use datafusion_physical_expr::{
1008+
expressions::{binary, col},
1009+
Partitioning,
1010+
};
10051011

10061012
use arrow_schema::DataType;
10071013
use datafusion_common::ScalarValue;
1014+
use datafusion_physical_expr::EquivalenceProperties;
10081015

1009-
use datafusion_expr::Operator;
10101016
use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
10111017

10121018
#[test]
@@ -1197,4 +1203,117 @@ mod tests {
11971203

11981204
assert_eq!(result, expected);
11991205
}
1206+
1207+
#[derive(Debug)]
1208+
struct PropertiesExec {
1209+
properties: PlanProperties,
1210+
schema: SchemaRef,
1211+
}
1212+
1213+
impl DisplayAs for PropertiesExec {
1214+
fn fmt_as(
1215+
&self,
1216+
_t: DisplayFormatType,
1217+
f: &mut std::fmt::Formatter,
1218+
) -> std::fmt::Result {
1219+
write!(
1220+
f,
1221+
"PropertiesExec, schema={}, properties={:?}",
1222+
self.schema, self.properties
1223+
)
1224+
}
1225+
}
1226+
1227+
impl ExecutionPlan for PropertiesExec {
1228+
fn name(&self) -> &str {
1229+
"PropertiesExec"
1230+
}
1231+
1232+
fn as_any(&self) -> &dyn Any {
1233+
self
1234+
}
1235+
1236+
fn properties(&self) -> &PlanProperties {
1237+
&self.properties
1238+
}
1239+
1240+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1241+
vec![]
1242+
}
1243+
1244+
fn with_new_children(
1245+
self: Arc<Self>,
1246+
_children: Vec<Arc<dyn ExecutionPlan>>,
1247+
) -> Result<Arc<dyn ExecutionPlan>> {
1248+
Ok(self)
1249+
}
1250+
1251+
fn execute(
1252+
&self,
1253+
_partition: usize,
1254+
_context: Arc<TaskContext>,
1255+
) -> Result<SendableRecordBatchStream> {
1256+
unimplemented!("not supposed to be executed")
1257+
}
1258+
}
1259+
1260+
#[tokio::test]
1261+
async fn test_stat_projection_equivalent_classes() -> Result<()> {
1262+
// - columns: [a, b, c].
1263+
// - "a" and "b" in the same equivalence class.
1264+
// - then after a+c, b+c projection col(0) and col(1) must be
1265+
// in the same class too.
1266+
let schema = Arc::new(Schema::new(vec![
1267+
Field::new("a", DataType::Int32, false),
1268+
Field::new("b", DataType::Int32, false),
1269+
Field::new("c", DataType::Int32, false),
1270+
]));
1271+
let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1272+
eq_properties.add_equal_conditions(&col("a", &schema)?, &col("b", &schema)?)?;
1273+
1274+
let properties = PlanProperties::new(
1275+
eq_properties,
1276+
Partitioning::UnknownPartitioning(1),
1277+
EmissionType::Both,
1278+
Boundedness::Bounded,
1279+
);
1280+
1281+
let projection = ProjectionExec::try_new(
1282+
vec![
1283+
(
1284+
binary(
1285+
col("a", &schema)?,
1286+
Operator::Plus,
1287+
col("c", &schema)?,
1288+
&schema,
1289+
)?,
1290+
"a+c".to_owned(),
1291+
),
1292+
(
1293+
binary(
1294+
col("b", &schema)?,
1295+
Operator::Plus,
1296+
col("c", &schema)?,
1297+
&schema,
1298+
)?,
1299+
"b+c".to_owned(),
1300+
),
1301+
],
1302+
Arc::new(PropertiesExec {
1303+
properties,
1304+
schema: Arc::clone(&schema),
1305+
}),
1306+
)?;
1307+
1308+
let actual_properties = projection.properties();
1309+
let eq_group = actual_properties.eq_properties.eq_group();
1310+
1311+
assert!(!eq_group.is_empty());
1312+
let first_normalized = eq_group.normalize_expr(col("a+c", &projection.schema)?);
1313+
let second_normalized = eq_group.normalize_expr(col("b+c", &projection.schema)?);
1314+
1315+
assert!(first_normalized.eq(&second_normalized));
1316+
1317+
Ok(())
1318+
}
12001319
}

0 commit comments

Comments
 (0)