diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSQLSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSQLSuite.scala index bc47aa9ed1b..2213bfd04d0 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSQLSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSQLSuite.scala @@ -38,6 +38,11 @@ class MergeIntoSQLSuite extends MergeIntoSuiteBase import testImplicits._ + override def excluded: Seq[String] = super.excluded ++ Seq( + // Schema evolution SQL syntax is not yet supported + "schema evolution enabled for the current command" + ) + test("CTE as a source in MERGE") { withTable("source") { Seq((1, 1), (0, 3)).toDF("key1", "value").write.saveAsTable("source") @@ -386,10 +391,8 @@ class MergeIntoSQLSuite extends MergeIntoSuiteBase trait MergeIntoSQLColumnMappingSuiteBase extends DeltaColumnMappingSelectedTestMixin { override protected def runOnlyTests: Seq[String] = - Seq( - "schema evolution - new nested column with update non-* and insert * - " + - "array of struct - longer target - on via DeltaSQLConf" - ) + Seq("schema evolution - new nested column with update non-* and insert * - " + + "array of struct - longer target") } class MergeIntoSQLIdColumnMappingSuite extends MergeIntoSQLSuite diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSchemaEvolutionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSchemaEvolutionSuite.scala index 07ab309ad8c..f582b0a4088 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSchemaEvolutionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSchemaEvolutionSuite.scala @@ -88,7 +88,7 @@ trait MergeIntoSchemaEvolutionMixin { } } - test(s"schema evolution - $name - on via DeltaSQLConf") { + test(s"schema evolution - $name") { withSQLConf((confs :+ (DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key, "true")): _*) { executeMergeAndAssert(expected, expectErrorContains) } @@ -1003,6 +1003,39 @@ trait MergeIntoSchemaEvolutionBaseTests { .toDF("key", "value"), expectErrorWithoutEvolutionContains = "cannot resolve s.value in UPDATE clause") + test("schema evolution enabled for the current command") { + withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "false") { + withTable("target", "source") { + Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value") + .write.format("delta").saveAsTable("target") + Seq((1, 1, 1), (2, 2, 2)).toDF("key", "value", "extra") + .write.format("delta").saveAsTable("source") + + // Should fail without schema evolution + val e = intercept[org.apache.spark.sql.AnalysisException] { + executeMerge( + "target", + "source", + "target.key = source.key", + update("extra = -1"), insert("*")) + } + assert(e.getErrorClass === "DELTA_MERGE_UNRESOLVED_EXPRESSION") + assert(e.getMessage.contains("resolve extra in UPDATE clause")) + + // Should succeed with schema evolution + executeMergeWithSchemaEvolution( + "target", + "source", + "target.key = source.key", + update("extra = -1"), insert("*")) + checkAnswer( + spark.table("target"), + Seq[(Integer, Integer, Integer)]((0, 0, null), (1, 10, -1), (2, 2, 2), (3, 30, null)) + .toDF("key", "value", "extra")) + } + } + } + testNestedStructsEvolution("nested field assignment qualified with source alias")( target = Seq("""{ "a": 1, "t": { "a": 2 } }"""), source = Seq("""{ "a": 3, "t": { "a": 5 } }"""), diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoTestUtils.scala index b3465029ad5..6f2839d7dc9 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoTestUtils.scala @@ -48,6 +48,12 @@ trait MergeIntoTestUtils extends DeltaDMLTestUtils with MergeHelpers { cond: String, clauses: MergeClause*): Unit + protected def executeMergeWithSchemaEvolution( + tgt: String, + src: String, + cond: String, + clauses: MergeClause*): Unit + protected def withCrossJoinEnabled(body: => Unit): Unit = { withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { body } } @@ -104,6 +110,15 @@ trait MergeIntoSQLTestUtils extends DeltaSQLTestUtils with MergeIntoTestUtils { val clausesStr = clauses.map(_.sql).mkString("\n") sql(s"MERGE INTO $tgt USING $src ON $cond\n" + clausesStr) } + + override protected def executeMergeWithSchemaEvolution( + tgt: String, + src: String, + cond: String, + clauses: MergeClause*): Unit = { + throw new UnsupportedOperationException( + "The SQL syntax [WITH SCHEMA EVOLUTION] is not yet supported.") + } } trait MergeIntoScalaTestUtils extends MergeIntoTestUtils { @@ -130,6 +145,13 @@ trait MergeIntoScalaTestUtils extends MergeIntoTestUtils { clauses: MergeClause*): Unit = getMergeBuilder(tgt, src, cond, clauses: _*).execute() + override protected def executeMergeWithSchemaEvolution( + tgt: String, + src: String, + cond: String, + clauses: MergeClause*): Unit = + getMergeBuilder(tgt, src, cond, clauses: _*).withSchemaEvolution().execute() + private def getMergeBuilder( tgt: String, src: String,