Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

partition skipping filter #624

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions kernel/src/predicates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,8 +534,6 @@ impl ResolveColumnAsScalar for EmptyColumnResolver {
}
}

// In testing, it is convenient to just build a hashmap of scalar values.
#[cfg(test)]
impl ResolveColumnAsScalar for std::collections::HashMap<ColumnName, Scalar> {
fn resolve_column(&self, col: &ColumnName) -> Option<Scalar> {
self.get(col).cloned()
Expand Down
49 changes: 43 additions & 6 deletions kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use itertools::Itertools;
use tracing::debug;

use super::data_skipping::DataSkippingFilter;
use super::partition_skipping::PartitionSkippingFilter;
use super::{ScanData, Transform};
use crate::actions::get_log_add_schema;
use crate::engine_data::{GetData, RowVisitor, TypedGetData as _};
Expand All @@ -30,7 +31,11 @@ impl FileActionKey {
}

struct LogReplayScanner {
filter: Option<DataSkippingFilter>,
/// Filter based on partition values
partition_filter: Option<PartitionSkippingFilter>,

/// Filter based on min/max values in the statistics
data_filter: Option<DataSkippingFilter>,

/// A set of (data file path, dv_unique_id) pairs that have been seen thus
/// far in the log. This is used to filter out files with Remove actions as
Expand Down Expand Up @@ -248,9 +253,18 @@ fn get_add_transform_expr() -> Expression {

impl LogReplayScanner {
/// Create a new [`LogReplayScanner`] instance
fn new(engine: &dyn Engine, physical_predicate: Option<(ExpressionRef, SchemaRef)>) -> Self {
fn new(
engine: &dyn Engine,
physical_predicate: Option<(ExpressionRef, SchemaRef)>,
partition_columns: &[String],
) -> Self {
Self {
filter: DataSkippingFilter::new(engine, physical_predicate),
partition_filter: PartitionSkippingFilter::new(
engine,
physical_predicate.clone(),
partition_columns,
),
data_filter: DataSkippingFilter::new(engine, physical_predicate),
seen: Default::default(),
}
}
Expand All @@ -265,11 +279,31 @@ impl LogReplayScanner {
) -> DeltaResult<ScanData> {
// Apply data skipping to get back a selection vector for actions that passed skipping. We
// will update the vector below as log replay identifies duplicates that should be ignored.
let selection_vector = match &self.filter {
let data_selection_vector = match &self.data_filter {
Some(filter) => filter.apply(actions)?,
None => vec![true; actions.len()],
};
assert_eq!(selection_vector.len(), actions.len());
if data_selection_vector.len() != actions.len() {
return Err(crate::Error::internal_error(
"Data skipping filter returned incorrect number of rows",
));
}

let partition_selection_vector = match &self.partition_filter {
Some(filter) => filter.apply(actions)?,
None => vec![true; actions.len()],
};
if partition_selection_vector.len() != actions.len() {
return Err(crate::Error::internal_error(
"Partition skipping filter returned incorrect number of rows",
));
}

let selection_vector = data_selection_vector
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just:

Suggested change
let selection_vector = data_selection_vector
data_selection_vector.append(&mut partition_selection_vector);
let selection_vector = data_selection_vector;

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to do this - we want to zip one iter with the other, not append it.

.iter()
.zip(partition_selection_vector.iter())
.map(|(a, b)| *a && *b)
.collect();

let mut visitor = AddRemoveDedupVisitor {
seen: &mut self.seen,
Expand Down Expand Up @@ -298,8 +332,9 @@ pub(crate) fn scan_action_iter(
logical_schema: SchemaRef,
transform: Option<Arc<Transform>>,
physical_predicate: Option<(ExpressionRef, SchemaRef)>,
partition_columns: &[String],
) -> impl Iterator<Item = DeltaResult<ScanData>> {
let mut log_scanner = LogReplayScanner::new(engine, physical_predicate);
let mut log_scanner = LogReplayScanner::new(engine, physical_predicate, partition_columns);
let add_transform = engine.get_expression_handler().get_evaluator(
get_log_add_schema().clone(),
get_add_transform_expr(),
Expand Down Expand Up @@ -394,6 +429,7 @@ mod tests {
logical_schema,
None,
None,
&[],
);
for res in iter {
let (_batch, _sel, transforms) = res.unwrap();
Expand All @@ -417,6 +453,7 @@ mod tests {
schema,
static_transform,
None,
&[],
);

fn validate_transform(transform: Option<&ExpressionRef>, expected_date_offset: i32) {
Expand Down
3 changes: 3 additions & 0 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use self::state::GlobalScanState;

pub(crate) mod data_skipping;
pub mod log_replay;
pub(crate) mod partition_skipping;
pub mod state;

/// Builder to scan a snapshot of a table.
Expand Down Expand Up @@ -408,6 +409,7 @@ impl Scan {
self.logical_schema.clone(),
static_transform,
physical_predicate,
&self.snapshot.metadata().partition_columns,
);
Ok(Some(it).into_iter().flatten())
}
Expand Down Expand Up @@ -813,6 +815,7 @@ pub(crate) mod test_utils {
logical_schema,
transform,
None,
&[],
);
let mut batch_count = 0;
for res in iter {
Expand Down
224 changes: 224 additions & 0 deletions kernel/src/scan/partition_skipping.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
use std::{
collections::HashMap,
sync::{Arc, LazyLock},
};

use tracing::debug;

use crate::schema::column_name;
use crate::{
engine_data::GetData,
expressions::Scalar,
predicates::{DefaultPredicateEvaluator, PredicateEvaluator},
scan::get_log_add_schema,
schema::{ColumnName, DataType, MapType, SchemaRef},
DeltaResult, Engine, EngineData, Expression, ExpressionEvaluator, ExpressionRef, RowVisitor,
};
use crate::{expressions::column_expr, schema::StructType};

pub(crate) struct PartitionSkippingFilter {
evaluator: Arc<dyn ExpressionEvaluator>,
predicate: Arc<Expression>,
schema: SchemaRef,
}

impl PartitionSkippingFilter {
pub(crate) fn new(
engine: &dyn Engine,
physical_predicate: Option<(ExpressionRef, SchemaRef)>,
partition_columns: &[String],
) -> Option<Self> {
static PARITIONS_EXPR: LazyLock<Expression> =
LazyLock::new(|| column_expr!("add.partitionValues"));

let (predicate, schema) = physical_predicate?;
debug!("Creating a partition skipping filter for {:#?}", predicate);

// Limit the schema passed to the row visitor of only the fields that are included
// in the predicate and are also partition columns. The data skipping columns will
// be handled elsewhere.
let mut partition_fields = schema
.fields()
.filter(|f| partition_columns.contains(f.name()))
.cloned()
.peekable();
partition_fields.peek()?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe helpful to make it clear we're intentionally discarding a result?

Suggested change
partition_fields.peek()?;
let _ = partition_fields.peek()?;

let schema = Arc::new(StructType::new(partition_fields));

let partitions_map_type = MapType::new(DataType::STRING, DataType::STRING, true);

let evaluator = engine.get_expression_handler().get_evaluator(
get_log_add_schema().clone(),
PARITIONS_EXPR.clone(),
partitions_map_type.into(),
);

Some(Self {
evaluator,
predicate,
schema,
})
}

pub(crate) fn apply(&self, actions: &dyn EngineData) -> DeltaResult<Vec<bool>> {
let partitions = self.evaluator.evaluate(actions)?;
assert_eq!(partitions.len(), actions.len());

let mut visitor = PartitionVisitor::new(&self.predicate, &self.schema);
visitor.visit_rows_of(partitions.as_ref())?;
Ok(visitor.selection_vector.clone())
}
}

struct PartitionVisitor {
pub(crate) selection_vector: Vec<bool>,
predicate: Arc<Expression>,
schema: SchemaRef,
}

impl PartitionVisitor {
pub(crate) fn new(predicate: &Arc<Expression>, schema: &SchemaRef) -> Self {
Self {
selection_vector: Vec::default(),
predicate: Arc::clone(predicate),
schema: Arc::clone(schema),
}
}
}

impl RowVisitor for PartitionVisitor {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<crate::schema::ColumnNamesAndTypes> =
LazyLock::new(|| {
(
vec![column_name!("output")],
vec![DataType::Map(Box::new(MapType::new(
DataType::STRING,
DataType::STRING,
true,
)))],
)
.into()
});
NAMES_AND_TYPES.as_ref()
}

fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
let getter = getters[0];
for i in 0..row_count {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider passing (and updating) the existing selection vector instead of creating a new one, because not all rows of the data we visit are even valid add actions. That way, we only try to extract partition values for rows that have survived this far, and we clear the selection bit if the filter prunes out the file.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I think we should have an additive vector to minimize what work we need to do, and to keep things simple.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: It's not just about minimizing work. Right now, we're attempting to apply partition filters to every action, even non-file actions. Which forces us to treat the partition values column as nullable even tho the Delta spec says it's required. That means extra complexity to compensate for possible nullness, and risk of missing a corrupt table whose add actions illegally lack the add.partitionValues column.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it would be simpler to inject partition pruning into the existing AddRemoveDedupVisitor::is_valid_add method, instead of defining a whole new visitor? It already accepts a selection vector, and it already skips non-file actions. We would just have to add the partition values to the schema it works with. Because add and remove actions both always provide partition columns, we could partition prune both before calling AddRemoveDedupVisitor::self.check_and_record_seen, to avoid bloating the hash table with pruned entries.

The one complication would be how to handle non-partition tables cleanly, if we try to avoid fetching the partition values columns for non-partitioned tables. My guess is, that's over-optimizing. We should go ahead and unconditionally fetch the empty column, and just let the visitor conditionally ignore it. Something like this:

    fn is_valid_add<'a>(&mut self, i: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<bool> {
        // Add will have a path at index 0 if it is valid; otherwise, if it is a log batch, we may
        // have a remove with a path at index 5. In either case, extract the getters for partition 
        // values and dv columns at indexes that immediately follow a valid path index.
        let (path, pv_getter, dv_getters, is_add) = 
            if let Some(path) = getters[0].get_str(i, "add.path")? {
                (path, &getters[1], &getters[2..5], true)
            } else if !self.is_log_batch {
                return Ok(false);
            } else if let Some(path) = getters[5].get_opt(i, "remove.path")? {
                (path, &getters[6], &getters[7..10], false)
            } else {
                return Ok(false);
            };

        // Only consider adds and removes that survive partition pruning
        if !self.apply_partition_filters(pv_getter)? {
          return Ok(false)
        }

        let dv_unique_id = match dv_getters[0].get_opt(i, "deletionVector.storageType")? {
            Some(storage_type) => Some(DeletionVectorDescriptor::unique_id_from_parts(
                storage_type,
                dv_getters[1].get(i, "deletionVector.pathOrInlineDv")?,
                dv_getters[2].get_opt(i, "deletionVector.offset")?,
            )),
            None => None,
        };

        // Process both adds and removes, but only return not already-seen adds
        let file_key = FileActionKey::new(path, dv_unique_id);
        Ok(!self.check_and_record_seen(file_key) && is_add)
    }

    fn apply_partition_filters<'a>(&mut self, i: usize, pv_getter: &'a dyn GetData<'a>) -> DeltaResult<bool> {
        let Some(partition_filter) = self.partition_filter else {
            return Ok(true); // no filter => keep it
        }
        // extract and parse partition values, and apply the filter to them
        let partition_values: HashMap<String, String> = pv_getter.get(i, "partitionValues")?;
        todo!()
    }

Also: I think today's approach only looks at adds, and removes would still bloat up the hash table. If so, we should fix that regardless of where the partition pruning code lives.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to do this when I merge this with #607. I think I'd be okay to merge this first with this code and then I can move it over when I update my pr. Thoughts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: Thinking more about this, I realized it's actually UNSAFE to partition-prune removes, for the same reason we can't apply data skipping over removes: Those removes need to suppress earlier incompatible adds we might encounter, if the table's schema was replaced after the most recent checkpoint.

let val: Option<DeltaResult<bool>> = getter.get_map(i, "output")?.map(|m| {
let partition_values = m.materialize();
let resolver = self.schema.fields()
.map(|field| {
let data_type = field.data_type();

let DataType::Primitive(primitive_type) = data_type else {
return Err(crate::Error::unsupported(
format!("Partition filtering only supported for primitive types. Found type {} for field {}", data_type, field.name())
));
};

let scalar = partition_values
.get(field.name())
.map(|v| primitive_type.parse_scalar(v))
.transpose()?
.unwrap_or(
match field.nullable {
true => Ok(Scalar::Null(data_type.clone())),
false => Err(crate::Error::missing_data(format!("Missing partition values on a non-nullable field is not supported. Field {}", field.name)))
}?);

Ok((ColumnName::new([field.name()]), scalar))
})
.collect::<DeltaResult<HashMap<ColumnName, Scalar>>>()?;

let filter = DefaultPredicateEvaluator::from(resolver);
Ok(filter.eval_expr(&self.predicate, false).unwrap_or(true))
});

self.selection_vector.push(val.transpose()?.unwrap_or(true));
}
Ok(())
}
}

#[cfg(test)]
mod tests {
use arrow_array::{RecordBatch, StringArray};
use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema};
use std::sync::Arc;

use crate::engine::{arrow_data::ArrowEngineData, sync::SyncEngine};
use crate::expressions::UnaryOperator;
use crate::scan::get_log_schema;
use crate::schema::{DataType, Schema, StructField};
use crate::{DeltaResult, Engine, EngineData, Expression};

use super::PartitionSkippingFilter;

// TODO(nick): Merge all copies of this into one "test utils" thing
fn string_array_to_engine_data(string_array: StringArray) -> Box<dyn EngineData> {
let string_field = Arc::new(ArrowField::new("a", ArrowDataType::Utf8, true));
let schema = Arc::new(ArrowSchema::new(vec![string_field]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(string_array)])
.expect("Can't convert to record batch");
Box::new(ArrowEngineData::new(batch))
}

#[test]
fn test_partition_skipping() -> DeltaResult<()> {
let engine = SyncEngine::new();
let json_handler = engine.get_json_handler();
let json_strings: StringArray = vec![
// All these values should be filtered due to c1 value
r#"{"add":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","partitionValues":{"c1":"1","c2":""},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#,
r#"{"add":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","partitionValues":{"c1":"2","c2":null},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#,
r#"{"add":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","partitionValues":{"c1":"3","c2":"a"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#,

// Test both null and "" produce valid nulls
r#"{"add":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","partitionValues":{"c1":"4","c2":""},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#,
r#"{"add":{"path":"c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet","partitionValues":{"c1":"5","c2":null},"size":452,"modificationTime":1670892998136,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":6},\"maxValues\":{\"c3\":6},\"nullCount\":{\"c3\":0}}"}}"#,
r#"{"add":{"path":"c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet","partitionValues":{"c1":"6","c2":"b"},"size":452,"modificationTime":1670892998137,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":4},\"maxValues\":{\"c3\":4},\"nullCount\":{\"c3\":0}}"}}"#,

// Gracefully handle missing partition values as null
r#"{"add":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","partitionValues":{"c1":"1"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#,
r#"{"add":{"path":"c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet","partitionValues":{"c1":"5"},"size":452,"modificationTime":1670892998136,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":6},\"maxValues\":{\"c3\":6},\"nullCount\":{\"c3\":0}}"}}"#,

]
.into();

let log_schema = get_log_schema().clone();
let batch = json_handler
.parse_json(
string_array_to_engine_data(json_strings),
log_schema.clone(),
)
.unwrap();

let expr = Arc::new(Expression::and(
Expression::unary(UnaryOperator::IsNull, Expression::column(["c2"])),
Expression::ge(Expression::column(["c1"]), Expression::literal(4)),
));

let schema = Arc::new(Schema::new(vec![
StructField::new("c1", DataType::INTEGER, true),
StructField::new("c2", DataType::STRING, true),
StructField::new("c3", DataType::INTEGER, true),
]));

let physical_predicate = Some((expr, schema));
let filter = PartitionSkippingFilter::new(
&engine,
physical_predicate,
&["c1".to_string(), "c2".to_string()],
)
.expect("Unable to create Partition Skipping Filter");

let actual = filter.apply(batch.as_ref())?;

let expected = vec![false, false, false, true, true, false, false, true];

assert_eq!(actual, expected);
Ok(())
}
}
Loading