Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support default values for columns in SchemaAdapter #15220

Open
adriangb opened this issue Mar 14, 2025 · 1 comment
Open

Support default values for columns in SchemaAdapter #15220

adriangb opened this issue Mar 14, 2025 · 1 comment
Labels
enhancement New feature or request

Comments

@adriangb
Copy link
Contributor

adriangb commented Mar 14, 2025

Is your feature request related to a problem or challenge?

From conversation with Andrew a couple days ago he mentioned this was an open feature request however I could not find an issue. @alamb do you remember who else was asking for this?

We have an implementation of this internally, it actually is more generic because we use it to generate columns from other columns, but it covers the use case of default values and it would be easy to make that API simple.

Essentially we declare:

pub trait MissingColumnGeneratorFactory: Debug + Send + Sync {
    /// Create a [`MissingColumnGenerator`] for the given `field` and `file_schema`.
    /// Returns None if the column cannot be generated by this generator.
    /// Otherwise, returns a [`MissingColumnGenerator`] that can generate the missing column.
    fn create(
        &self,
        field: &Field,
        file_schema: &Schema,
    ) -> Option<Arc<dyn MissingColumnGenerator + Send + Sync>>;
}

pub trait MissingColumnGenerator: Debug + Send + Sync {
    /// Generate a missing column for the given `field` from the provided `batch`.
    /// When this method is called `batch` will contain all of the columns declared as dependencies in `dependencies`.
    /// If the column cannot be generated, this method should return an error.
    /// Otherwise, it should return the generated column as an `ArrayRef`.
    /// No casting or post processing is done by this method, so the generated column should match the data type
    /// of the `field` it is being generated, otherwise an Err will be returned upstream.
    /// There is no guarantee about the order of the columns in the provided RecordBatch.
    fn generate(&self, batch: RecordBatch) -> datafusion_common::Result<ArrayRef>;

    /// Returns a list of column names that this generator depends on to generate the missing column.
    /// This is used when creating the `RecordBatch` to ensure that all dependencies are present before calling `generate`.
    /// The dependencies do not need to be declared in any particular order.
    fn dependencies(&self) -> Vec<String>;
}

And then you pass in one or more MissingColumnGeneratorFactory into SchemaAdapterFactory.

There was a lot of pain figuring out how to properly adjust projections to take into account the injected dependency columns, but we've done that work already on our end.

The other thing to note is that adjustments are needed in filter pushdown, specifically here:

/// After visiting all children, rewrite column references to nulls if
/// they are not in the file schema.
/// We do this because they won't be relevant if they're not in the file schema, since that's
/// the only thing we're dealing with here as this is only used for the parquet pushdown during
/// scanning
fn f_up(
&mut self,
expr: Arc<dyn PhysicalExpr>,
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
// if the expression is a column, is it in the file schema?
if self.file_schema.field_with_name(column.name()).is_err() {
return self
.table_schema
.field_with_name(column.name())
.and_then(|field| {
// Replace the column reference with a NULL (using the type from the table schema)
// e.g. `column = 'foo'` is rewritten be transformed to `NULL = 'foo'`
//
// See comments on `FilterCandidateBuilder` for more information
let null_value = ScalarValue::try_from(field.data_type())?;
Ok(Transformed::yes(Arc::new(Literal::new(null_value)) as _))
})
// If the column is not in the table schema, should throw the error
.map_err(|e| arrow_datafusion_err!(e));
}
}
Ok(Transformed::no(expr))
}

This last bit applies no matter if simpler defaults are being generated or more complex derived columns.

@adriangb adriangb added the enhancement New feature or request label Mar 14, 2025
@adriangb
Copy link
Contributor Author

The other thing to note is that adjustments are needed in filter pushdown

I’ll note that this is currently broken without any new features. I can give an example later but basically you make a custom SchemaMapper that eg instead of filling in nulls fills in default values. That’s doable with current public APIs. Then you get different behavior between filter push down and not.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant