Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parametrize ListArray inner field #15162

Open
comphead opened this issue Mar 11, 2025 · 15 comments
Open

Parametrize ListArray inner field #15162

comphead opened this issue Mar 11, 2025 · 15 comments
Labels
enhancement New feature or request

Comments

@comphead
Copy link
Contributor

comphead commented Mar 11, 2025

Is your feature request related to a problem or challenge?

In Apache DataFusion Comet during implementation to handle ARRAY types from Apache Spark it was found that the inner field hardcoded name is different is Arrow-rs and Apache Spark.

The inner ListType field is hardcoded to item in https://github.com/apache/arrow-rs/blob/f4fde769ab6e1a9b75f890b7f8b47bc22800830b/arrow-schema/src/field.rs#L130

However it is a element for Apache Spark

scala> spark.sql("select array(1, 2, 3)").printSchema
root
 |-- array(1, 2, 3): array (nullable = false)
 |    |-- element: integer (containsNull = false)

Because of this discrepancy the schema failed when the record batch gets created

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 309.0 failed 1 times, most
 recent failure: Lost task 0.0 in stage 309.0 (TID 797) (Mac-1741305812954.local executor driver): 
org.apache.comet.CometNativeException: Invalid argument error: column types must match schema types, 
expected List(Field { name: "element", data_type: Int8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }) but found List(Field { name: "item", data_type: Int8, nullable: true, dict_id: 0, dict_is_ordered: false, 
metadata: {} }) at column index 0

In DataFusion the List creation method Field::new_list_field with hardcoded field name is heavily used. The ticket idea is to find a way how to parametrize this.

  • Replace Field::new_list_field with Field::new which gives an opportunity to provide a custom name. However those methods are often called from the context where is no SessionContext exist and thus there is no possibility to access to config variable where new name can be parametrized
  • Make the name parametrized in arrow-rs, unfortunately there is no external config in arrow-rs. It is possible to leverage ENV vars but this is usually not a good way to go
  • Change RecordBatch::try_new and for ListTypes avoid checking inner naming just check the inner datatype and other fields except name

Related apache/datafusion-comet#1456

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

@comphead comphead added the enhancement New feature or request label Mar 11, 2025
@comphead
Copy link
Contributor Author

@alamb @tustvold appreciate if you can chime in

@tustvold
Copy link
Contributor

tustvold commented Mar 11, 2025

I'm not familiar with how comet interops with Spark, but it looks like whatever component is wrapping the spark execution is incorrectly exposing the schema of its outputs? Provided the components correctly advertise the schema of their outputs, I would expect DF's coercion machinery to handle the rest...

Edit: Unless the requirement is for the output of the DF plan to match what schema of the equivalent Spark plan??

@comphead
Copy link
Contributor Author

comphead commented Mar 11, 2025

Thanks @tustvold the requirement is to customize hardcoded inner field for ListType which is hardcoded item now.
So DF and arrow-rs creates column schema for data arrays for ListType

List(Field { name: "item", data_type: Int8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} })

Comet using the DataFusion physical plan expressions directly, there is no coercion phase and schema from Apache Spark for the same comes as

List(Field { name: "element", data_type: Int8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} })

When RecordBatch created

https://github.com/apache/arrow-rs/blob/f4fde769ab6e1a9b75f890b7f8b47bc22800830b/arrow-array/src/record_batch.rs#L331

it checks both schema(easy to modify) and column arrays schema and the error is thrown if both schemas doesn't match, in this specific case it doesn't match by inner list field name item vs element. Do you see any solution on Arrow-rs side to let downstream projects to redefine LIST_FIELD_DEFAULT_NAME value to the custom one?

@zhuqi-lucas
Copy link
Contributor

zhuqi-lucas commented Mar 11, 2025

  1. Adding a config to support relax name checking is a good solution to me:

If field A and field B only the name is different, but we only check other part besides name, so we can pass it.

  1. Besides above solution, workaround may be calling the with_name function when using the field? For example:
    # use arrow_schema::*;
    let field = Field::new("c1", DataType::Int64, false)
        .with_name("c2");
    
     assert_eq!(field.name(), "c2");

I am not sure if spark has a chance to call with_name?

@tustvold
Copy link
Contributor

tustvold commented Mar 11, 2025

Comet using the DataFusion physical plan expressions directly, there is no coercion phase and schema from Apache Spark for the same comes as

IMO this is the issue, the spark code is not returning data with the same schema as is then used to construct the RecordBatch. Where does the schema provided to the RecordBatch constructor come from? IMO either this schema needs to be updated to match what spark is actually returning, or the spark code needs to be updated to return the expected schema (e.g. by coercing on output).

Adding a config to support relax name checking is a good solution to me

This sounds great in theory, although I've yet to see a coherent approach to how this would work. Ultimately the field name is part of the schema in much the same way as StringViewArray and StringArray are different types, one can layer a logical schema type on top, but at some point something actually needs to coerce the types.

@comphead
Copy link
Contributor Author

comphead commented Mar 11, 2025

Agree. Arrow-rs is not very configurable unlike to DataFusion, and to be honest I would love to see Arrow-rs support for external configs so it could be more flexible for common areas like Parquet reader(INT96 problem apache/arrow-rs#7220) or redefine some other behavior. But this is another topic. :)

IMO this is the issue, the spark code is not returning data with the same schema as is then used to construct the RecordBatch. Where does the schema provided to the RecordBatch constructor come from? IMO either this schema needs to be updated to match what spark is actually returning, or the spark code needs to be updated to return the expected schema (e.g. by coercing on output).

Apache Spark expects element to come back, this is hardcoded value the same as item in Arrow-rs and Apache Spark users rely on this naming, changing will break the Apache Spark users queries.

If talking about the specific case for now it is make_array function

Arc::new(Field::new_list_field(data_type, true)),

It this particular code the column arrays schema created as item although the schema is element, and in this place there is no SessionContext where we could read the external params and parametrize the Listtype with specific field name Field::new. It is possible to do in DataFusion although it is gonna be a huge change to cover all array functions, so the first idea was if it is possible to have a solution on arrow-rs

@tustvold
Copy link
Contributor

tustvold commented Mar 11, 2025

Arrow-rs is not very configurable unlike to DataFusion

If you have a concrete proposal, feel free to raise an issue. FWIW most kernels do take various options to alter their behaviour, a non-trivial number of which specifically exist for Spark compatibility.

It is possible to do in DataFusion although it is gonna be a huge change to cover all array functions, so the first idea was if it is possible to have a solution on arrow-rs

I think you misunderstand what I am saying, arrow can't be opinionated about what people are permitted to use as the list field name, as people can always do something different. Even if you made new_list_field default to using "element" rather than "item" it wouldn't resolve your problem as kernels could still construct lists with different naming.

FWIW arrow-rs doesn't use new_list_field outside of tests for this reason, and in fact I actually objected to its addition for this reason - apache/arrow-rs#4544 (comment), kernels that construct lists should make this attribute configurable. If DF has such kernels, then DF should make this behaviour configurable.

(Edit: or decide that use-cases that require a particular schema should coerce on output)

@comphead
Copy link
Contributor Author

comphead commented Mar 11, 2025

Even if you made new_list_field default to using "element" rather than "item" it wouldn't resolve your problem as kernels could still construct lists with different naming

Good point, yes.

DF has such kernels, then DF should make this behaviour configurable

Yeah, this comes as first option in the list

@alamb
Copy link
Contributor

alamb commented Mar 11, 2025

Thank you for bringing this up @comphead -- I think we have struggled with this issue for a while downstream in DataFusion

I think the core fix of this issue is not constructing DataType::List, but rather one of comparison

As @tustvold points out, the field name is arbitrary and not consistent across arrow implementations. Plumbing some way to change it around might work, but we'll be forever trying to find all the corner cases.

Thus in my opinion, rather than try and control the name of the field, a better approach is to change places where DataType::Lists are compared and ignore the field name unless is it is important

For example, the specific error that @comphead posted in this issue is

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 309.0 failed 1 times, most
 recent failure: Lost task 0.0 in stage 309.0 (TID 797) (Mac-1741305812954.local executor driver): 
org.apache.comet.CometNativeException: Invalid argument error: column types must match schema types, 
expected List(Field { name: "element", data_type: Int8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }) but found List(Field { name: "item", data_type: Int8, nullable: true, dict_id: 0, dict_is_ordered: false, 
metadata: {} }) at column index 0

It seems like the that error actually comes from RecordBatch construction within arrow-rs

https://github.com/apache/arrow-rs/blob/f4fde769ab6e1a9b75f890b7f8b47bc22800830b/arrow-array/src/record_batch.rs#L333

Perhaps we can relax this check / update RecordBatch::new() to align incoming DataType::List to match the schema 🤔

@comphead
Copy link
Contributor Author

Thanks @alamb this option is brought up as option 3 in option list above.

Change RecordBatch::try_new and for ListTypes avoid checking inner naming just check the inner datatype and other fields except name

I'm currently looking for any scenarios to verify if the inner field name check is reasonable.

@tustvold
Copy link
Contributor

tustvold commented Mar 11, 2025

I'm currently looking for any scenarios to verify if the inner field name check is reasonable.

The major ones are interop boundaries, arrow itself largely doesn't care, but other systems do (as evidenced by this issue). For example, writing a RecordBatch to parquet, sending it over the C data interface, etc... There are quite a lot of places that assume the RecordBatch schema / StructArray schema are the same. I'm pretty certain we can't relax this check without it causing subtle breakage.

IMO if spark has specific schema requirements, I'm not sure I see a way to avoid coercing at the boundary, it will be an indefinite game of wack-a-mole otherwise (not just for lists).

@comphead
Copy link
Contributor Author

Thanks @tustvold for explanation
Besides that just for my understanding is the Arrow specifies to carry 2 schemas? what is the reasoning behind it, is it for identifying broken batches in runtime or so?

@tustvold
Copy link
Contributor

I guess it is so that child arrays can be processed independently without needing to somehow propagate their schema down from the parent at every callsite.

But yes it is redundant, and this is why inconsistency causes problems at interop boundaries, as typically these just pick one to propogate.

@alamb
Copy link
Contributor

alamb commented Mar 13, 2025

IMO if spark has specific schema requirements, I'm not sure I see a way to avoid coercing at the boundary, it will be an indefinite game of wack-a-mole otherwise (not just for lists).

So the proposal as I understand it is to implement something like the follwing function that is called on all batches prior to returning to spark

/// Converts the schema of `batch` to one suitable for Spark's conventions
///
/// Note only converts the schema, no data is copied
///
/// Transformations applied:
/// * The name of the fields in `DataType::List` are changed to "element"
/// ....
fn coerce_schema_for_spark(batch: RecordBatch) -> Result<RecordBatch> {
...
}

@comphead
Copy link
Contributor Author

one of the options yes, as item is heavily used across datafusion physical layers there will be difficult to align schema without changes in DF. So I'll try to make this coercion before calling datafusion code. Schema would flow like this
Spark -> element -> coerce -> item - run DF code -> coerce -> element -> Spark. As we need to coerce only schema without copying/modifying arrays so should be relatively cheap

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants