Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Check error classes in MERGE tests #4325

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1676,7 +1676,7 @@
},
"DELTA_MERGE_UNRESOLVED_EXPRESSION" : {
"message" : [
"Cannot resolve <sqlExpr> in <clause> given <cols>"
"Cannot resolve <sqlExpr> in <clause> given columns <cols>."
],
"sqlState" : "42601"
},
Expand Down Expand Up @@ -3187,11 +3187,6 @@
"<optionalPrefixMessage>Found unsupported expression <expression> while parsing target column name parts."
]
},
"_LEGACY_ERROR_TEMP_DELTA_0011" : {
"message" : [
"Failed to resolve plan."
]
},
"_LEGACY_ERROR_TEMP_DELTA_0012" : {
"message" : [
"Could not resolve expression: <expression>"
Expand Down
10 changes: 1 addition & 9 deletions spark/src/main/scala/io/delta/tables/DeltaMergeBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -306,15 +306,7 @@ class DeltaMergeBuilder private(
val resolvedMergeInto =
ResolveDeltaMergeInto.resolveReferencesAndSchema(mergePlan, sparkSession.sessionState.conf)(
tryResolveReferencesForExpressions(sparkSession))
if (!resolvedMergeInto.resolved) {
throw new ExtendedAnalysisException(
new DeltaAnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_DELTA_0011",
messageParameters = Array.empty
),
resolvedMergeInto
)
}

val strippedMergeInto = resolvedMergeInto.copy(
target = DeltaViewHelper.stripTempViewForMerge(resolvedMergeInto.target, SQLConf.get)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object ResolveDeltaMergeInto {
for (a <- expr.flatMap(_.references).filterNot(_.resolved)) {
// Note: This will throw error only on unresolved attribute issues,
// not other resolution errors like mismatched data types.
val cols = "columns " + plans.flatMap(_.output).map(_.sql).mkString(", ")
val cols = plans.flatMap(_.output).map(_.sql).mkString(", ")
throw new DeltaAnalysisException(
errorClass = "DELTA_MERGE_UNRESOLVED_EXPRESSION",
messageParameters = Array(a.sql, mergeClauseTypeStr, cols),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3240,19 +3240,6 @@ trait DeltaErrorsSuiteBase
s"name parts.")
)
}
{
val e = intercept[DeltaAnalysisException] {
throw new DeltaAnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_DELTA_0011",
messageParameters = Array.empty)
}
checkErrorMessage(
e,
Some("_LEGACY_ERROR_TEMP_DELTA_0011"),
None,
Some("Failed to resolve plan.")
)
}
{
val exprs = Seq("1".expr, "2".expr)
val e = intercept[DeltaAnalysisException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,17 @@
package org.apache.spark.sql.delta

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.sources.DeltaSQLConf

import org.apache.spark.sql.Row

trait MergeIntoNotMatchedBySourceSuite extends MergeIntoSuiteBase {
import testImplicits._

// All CDC suites run using MergeIntoSQLSuite only. The SQL API for NOT MATCHED BY SOURCE will
// only be available with Spark 3.4. In the meantime, we explicitly run NOT MATCHED BY SOURCE
// tests with CDF enabled and disabled against the Scala API. Use [[testExtendedMerge]
// instead once we can run tests against the SQL API.
/**
* Variant of `testExtendedMerge` that runs a MERGE INTO command, checks the expected result and
* additionally validate that the CDC produced is correct.
*/
protected def testExtendedMergeWithCDC(
name: String,
namePrefix: String = "not matched by source")(
Expand Down Expand Up @@ -65,74 +63,87 @@ trait MergeIntoNotMatchedBySourceSuite extends MergeIntoSuiteBase {
}

// Test analysis errors with NOT MATCHED BY SOURCE clauses.
testErrorsInUnlimitedClauses(
testMergeAnalysisException(
"error on multiple not matched by source update clauses without condition")(
mergeOn = "s.key = t.key",
updateNotMatched(condition = "t.key == 3", set = "value = 2 * value"),
updateNotMatched(set = "value = 3 * value"),
updateNotMatched(set = "value = 4 * value"))(
errorStrs = "when there are more than one not matched by source clauses in a merge " +
"statement, only the last not matched by source clause can omit the condition" :: Nil)
expectedErrorClass = "NON_LAST_NOT_MATCHED_BY_SOURCE_CLAUSE_OMIT_CONDITION",
expectedMessageParameters = Map.empty)

testErrorsInUnlimitedClauses(
testMergeAnalysisException(
"error on multiple not matched by source update/delete clauses without condition")(
mergeOn = "s.key = t.key",
updateNotMatched(condition = "t.key == 3", set = "value = 2 * value"),
deleteNotMatched(),
updateNotMatched(set = "value = 4 * value"))(
errorStrs = "when there are more than one not matched by source clauses in a merge " +
"statement, only the last not matched by source clause can omit the condition" :: Nil)
expectedErrorClass = "NON_LAST_NOT_MATCHED_BY_SOURCE_CLAUSE_OMIT_CONDITION",
expectedMessageParameters = Map.empty)

testErrorsInUnlimitedClauses(
testMergeAnalysisException(
"error on non-empty condition following empty condition in not matched by source " +
"update clauses")(
mergeOn = "s.key = t.key",
updateNotMatched(set = "value = 2 * value"),
updateNotMatched(condition = "t.key < 3", set = "value = value"))(
errorStrs = "when there are more than one not matched by source clauses in a merge " +
"statement, only the last not matched by source clause can omit the condition" :: Nil)
expectedErrorClass = "NON_LAST_NOT_MATCHED_BY_SOURCE_CLAUSE_OMIT_CONDITION",
expectedMessageParameters = Map.empty)

testErrorsInUnlimitedClauses(
testMergeAnalysisException(
"error on non-empty condition following empty condition in not matched by source " +
"delete clauses")(
mergeOn = "s.key = t.key",
deleteNotMatched(),
deleteNotMatched(condition = "t.key < 3"))(
errorStrs = "when there are more than one not matched by source clauses in a merge " +
"statement, only the last not matched by source clause can omit the condition" :: Nil)
expectedErrorClass = "NON_LAST_NOT_MATCHED_BY_SOURCE_CLAUSE_OMIT_CONDITION",
expectedMessageParameters = Map.empty)

testAnalysisErrorsInExtendedMerge("update not matched condition - unknown reference")(
testMergeAnalysisException("update not matched condition - unknown reference")(
mergeOn = "s.key = t.key",
updateNotMatched(condition = "unknownAttrib > 1", set = "tgtValue = tgtValue + 1"))(
// Should show unknownAttrib as invalid ref and (key, tgtValue, srcValue) as valid column names.
errorStrs = "UPDATE condition" :: "unknownAttrib" :: "key" :: "tgtValue" :: Nil)
expectedErrorClass = "DELTA_MERGE_UNRESOLVED_EXPRESSION",
expectedMessageParameters = Map(
"sqlExpr" -> "unknownAttrib",
"clause" -> "UPDATE condition",
"cols" -> "t.key, t.tgtValue"))

testAnalysisErrorsInExtendedMerge("update not matched condition - aggregation function")(
testMergeAnalysisException("update not matched condition - aggregation function")(
mergeOn = "s.key = t.key",
updateNotMatched(condition = "max(0) > 0", set = "tgtValue = tgtValue + 1"))(
errorStrs = "UPDATE condition" :: "aggregate functions are not supported" :: Nil)
expectedErrorClass = "DELTA_AGGREGATION_NOT_SUPPORTED",
expectedMessageParameters = Map(
"operation" -> "UPDATE condition of MERGE operation",
"predicate" -> "(condition = (max(0) > 0))."))

testAnalysisErrorsInExtendedMerge("update not matched condition - subquery")(
testMergeAnalysisException("update not matched condition - subquery")(
mergeOn = "s.key = t.key",
updateNotMatched(condition = "s.value in (select value from t)", set = "tgtValue = 1"))(
errorStrs = Nil
) // subqueries fail for unresolved reference to `t`
updateNotMatched(condition = "tgtValue in (select value from t)", set = "tgtValue = 1"))(
expectedErrorClass = "TABLE_OR_VIEW_NOT_FOUND",
expectedMessageParameters = Map("relationName" -> "`t`"))

testAnalysisErrorsInExtendedMerge("delete not matched condition - unknown reference")(
testMergeAnalysisException("delete not matched condition - unknown reference")(
mergeOn = "s.key = t.key",
deleteNotMatched(condition = "unknownAttrib > 1"))(
// Should show unknownAttrib as invalid ref and (key, tgtValue, srcValue) as valid column names.
errorStrs = "DELETE condition" :: "unknownAttrib" :: "key" :: "tgtValue" :: Nil)
expectedErrorClass = "DELTA_MERGE_UNRESOLVED_EXPRESSION",
expectedMessageParameters = Map(
"sqlExpr" -> "unknownAttrib",
"clause" -> "DELETE condition",
"cols" -> "t.key, t.tgtValue"))

testAnalysisErrorsInExtendedMerge("delete not matched condition - aggregation function")(
testMergeAnalysisException("delete not matched condition - aggregation function")(
mergeOn = "s.key = t.key",
deleteNotMatched(condition = "max(0) > 0"))(
errorStrs = "DELETE condition" :: "aggregate functions are not supported" :: Nil)
expectedErrorClass = "DELTA_AGGREGATION_NOT_SUPPORTED",
expectedMessageParameters = Map(
"operation" -> "DELETE condition of MERGE operation",
"predicate" -> "(condition = (max(0) > 0))."))

testAnalysisErrorsInExtendedMerge("delete not matched condition - subquery")(
testMergeAnalysisException("delete not matched condition - subquery")(
mergeOn = "s.key = t.key",
deleteNotMatched(condition = "s.srcValue in (select tgtValue from t)"))(
errorStrs = Nil) // subqueries fail for unresolved reference to `t`
deleteNotMatched(condition = "tgtValue in (select tgtValue from t)"))(
expectedErrorClass = "TABLE_OR_VIEW_NOT_FOUND",
expectedMessageParameters = Map("relationName" -> "`t`"))

// Test correctness with NOT MATCHED BY SOURCE clauses.
testExtendedMergeWithCDC("all 3 types of match clauses without conditions")(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ class MergeIntoScalaSuite extends MergeIntoSuiteBase
// scalastyle:on line.size.limit
)

// Maps expected error classes to actual error classes. Used to handle error classes that are
// different when running using SQL vs. Scala.
override protected val mappedErrorClasses: Map[String, String] = Map(
"NON_LAST_MATCHED_CLAUSE_OMIT_CONDITION" -> "DELTA_NON_LAST_MATCHED_CLAUSE_OMIT_CONDITION",
"NON_LAST_NOT_MATCHED_BY_TARGET_CLAUSE_OMIT_CONDITION" ->
"DELTA_NON_LAST_NOT_MATCHED_CLAUSE_OMIT_CONDITION",
"NON_LAST_NOT_MATCHED_BY_SOURCE_CLAUSE_OMIT_CONDITION" ->
"DELTA_NON_LAST_NOT_MATCHED_BY_SOURCE_CLAUSE_OMIT_CONDITION"
)

test("basic scala API") {
withTable("source") {
Expand Down
Loading
Loading