Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51008][SQL] Add ResultStage for AQE #49715

Open
wants to merge 14 commits into
base: master
Choose a base branch
from

Conversation

liuzqt
Copy link
Contributor

@liuzqt liuzqt commented Jan 28, 2025

What changes were proposed in this pull request?

Added ResultQueryStageExec for AQE

How does the query plan look like in explain string:

AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   ResultQueryStage 2 ------> newly added
   +- *(5) Project [id#26L]
      +- *(5) SortMergeJoin [id#26L], [id#27L], Inner
         :- *(3) Sort [id#26L ASC NULLS FIRST], false, 0
         :  +- AQEShuffleRead coalesced
         :     +- ShuffleQueryStage 0
         :        +- Exchange hashpartitioning(id#26L, 200), ENSURE_REQUIREMENTS, [plan_id=247]
         :           +- *(1) Range (0, 25600, step=1, splits=10)
         +- *(4) Sort [id#27L ASC NULLS FIRST], false, 0
            +- AQEShuffleRead coalesced
               +- ShuffleQueryStage 1
                  +- Exchange hashpartitioning(id#27L, 200), ENSURE_REQUIREMENTS, [plan_id=257]
                     +- *(2) Ran...

How does the query plan look like in Spark UI:

Screenshot 2025-02-03 at 4 11 43 PM

Why are the changes needed?

Currently AQE framework is not fully self-contained since not all plan segments can be put into a query stage: the final "stage" basically executed as a nonAQE plan. This PR added a result query stage for AQE to unify the framework. With this change, we can build more query stage level features, one use case like #44013 (comment)

Does this PR introduce any user-facing change?

NO

How was this patch tested?

new unit tests.

Also exisiting tests which are impacted by this change are updated to keep their original test semantics.

Was this patch authored or co-authored using generative AI tooling?

NO

@github-actions github-actions bot added the SQL label Jan 28, 2025
@liuzqt liuzqt changed the title [SPARK-51008][SQL][WIP] Add ResultStage for AQE [SPARK-51008][SQL] Add ResultStage for AQE Feb 3, 2025
@liuzqt
Copy link
Contributor Author

liuzqt commented Feb 4, 2025

@cloud-fan

@cloud-fan
Copy link
Contributor

cc @ulysses-you

@liuzqt liuzqt requested a review from cloud-fan February 4, 2025 19:34
@@ -588,7 +639,7 @@ case class AdaptiveSparkPlanExec(
if (plan.children.isEmpty) {
CreateStageResult(newPlan = plan, allChildStagesMaterialized = true, newStages = Seq.empty)
} else {
val results = plan.children.map(createQueryStages)
val results = plan.children.map(createQueryStagesInternal)
CreateStageResult(
newPlan = plan.withNewChildren(results.map(_.newPlan)),
allChildStagesMaterialized = results.forall(_.allChildStagesMaterialized),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the new code is a bit hard to read. Not sure if there are some developing context.

Can we create result query stage here ? If the plan is root query and allChildStagesMaterialized then we wrap ResultQueryStage and it is not a materialized stage, so aqe will materialize it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds reasonable to me, cc @liuzqt

Copy link
Contributor

@ulysses-you ulysses-you left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this pr is just one of the stage level feature prs ?

@cloud-fan
Copy link
Contributor

@ulysses-you yes, after this PR, we can implement the proposed idea in #44013 (comment) and keep contexts in the AQE query stage.

@@ -579,23 +592,52 @@ case class AdaptiveSparkPlanExec(
allChildStagesMaterialized = false,
newStages = Seq(newStage))

case q: QueryStageExec =>
case q: QueryStageExec if q ne currentPhysicalPlan =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this condition protect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can have plan like this:

ShuffleQueryStage 0
+- Exchange hashpartitioning(key#17, 5), REPARTITION_BY_COL, [plan_id=89]
   +- *(1) SerializeFromObject [invoke(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key()) AS key#17, static_invoke(UTF8String.fromString(invoke(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value()))) AS value#18]
      +- Scan[obj#14]

where the root plan is a ShuffleQueryStageExec and we have to create a ResultQueryStage on top of it.
==>

ResultQueryStage 1
+- AQEShuffleRead coalesced
   +- ShuffleQueryStage 0
      +- Exchange hashpartitioning(key#17, 5), REPARTITION_BY_COL, [plan_id=89]
         +- *(1) SerializeFromObject [invoke(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key()) AS key#17, static_invoke(UTF8String.fromString(invoke(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value()))) AS value#18]
            +- S...

This refactor is equivalent to my previous implementation using createQueryStagesInternal and create result stage in the external createQueryStages

Copy link
Contributor

@cloud-fan cloud-fan Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see, let's add comments to explain it

// We can skip creating a new query stage if the given plan is already a query stage.
// Note: if this query stage is the root node, we still need to create a result query stage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, the caller always invokes createQueryStages with currentPhysicalPlan, so we know when to deal with the result stage. Now I feel the previous code is clearer. Maybe just name it better? e.g. createQueryStages and createNonResultQueryStages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed an even more complicated pattern from broken test: We create a new non-result query stage as the root node, and that query stage is immediately materialized due to stage reuse, so we have to create result stage right after. Current implementation can not handle such case, and fixing is might be hacky...

So yes I think maybe separating result and non-result query stage creation is a better option. I'll rename it and add some comments to clarify.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed an even more complicated pattern from broken test: We create a new non-result query stage as the root node, and that query stage is immediately materialized due to stage reuse, so we have to create result stage right after. Current implementation can not handle such case, and fixing is might be hacky...

So yes I think maybe separating result and non-result query stage creation is a better option. I'll rename it and add some comments to clarify.

assert(plan2.isInstanceOf[ResultQueryStageExec])
assert(plan1 ne plan2)
assert(plan1.asInstanceOf[ResultQueryStageExec].plan
.fastEquals(plan2.asInstanceOf[ResultQueryStageExec].plan))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should they be equal? I think these two result stages should have different handler functions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes they have different handler function. But the root plan they wrap should be the same(which is the original AQE root plan)

currentPhysicalPlan = newPhysicalPlan
currentLogicalPlan = newLogicalPlan
stagesToReplace = Seq.empty[QueryStageExec]
if (!currentPhysicalPlan.isInstanceOf[ResultQueryStageExec]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to skip ResultQueryStageExec ?

Copy link
Contributor

@cloud-fan cloud-fan Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Result stage is already the last step, there is nothing to reoptimize.

* Run `fun` on finalized physical plan
*/
def withFinalPlanUpdate[T](fun: SparkPlan => T): T = lock.synchronized {
_isFinalPlan = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so when we call df.collect multi-times, we will re-optimize final stage multi-times. It is due to for each call we need to wrap new ResultQueryStageExec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case we construct QueryResultStageExec directly and won't re-optimize it: https://github.com/apache/spark/pull/49715/files#diff-ec42cd27662f3f528832c298a60fffa1d341feb04aa1d8c80044b70cbe0ebbfcR536

}
_isFinalPlan = true
finalPlanUpdate
currentPhysicalPlan.asInstanceOf[ResultQueryStageExec].resultOption.get().get.asInstanceOf[T]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it mean we would cache result data ? is it expected ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, this is actually a side effect of all QueryStageExec...

We can implement a "fetch-oncesemantic which only fetch once at the end of AQE loop. But still we can not prevent user from accessing it multiple times as long as they can access theResultQueryStageExec` node from the query plan.

@cloud-fan what do you think

Copy link
Contributor

@cloud-fan cloud-fan Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good catch! This stops the result from being GCed if the users throw away the result of df.collect() but still keep the df around.

Maybe the final outcome of a ResultStage should be Unit which is only used to trigger the final plan calculation. The caller side is still responsible for running the function to get the result.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proposal above can also simplify things: once a result stage is created, we never need to recreate it as the final plan is finalized. It's similar to the def getFinalPhysicalPlan() style before.

@liuzqt liuzqt requested a review from cloud-fan February 7, 2025 02:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants