From 370ed5d9578d66c01802af49c3134c7cbd381272 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Tue, 19 Mar 2024 09:11:10 +0100 Subject: [PATCH 1/6] Type Evolution in MERGE --- .../sql/delta/ResolveDeltaMergeInto.scala | 15 +- .../apache/spark/sql/delta/TypeWidening.scala | 13 + .../schema/ImplicitMetadataOperation.scala | 12 +- .../sql/delta/schema/SchemaMergingUtils.scala | 9 +- .../spark/sql/delta/DeltaErrorsSuite.scala | 5 +- .../spark/sql/delta/DeltaTestUtils.scala | 22 +- .../DeltaTypeWideningAutomaticSuite.scala | 303 ++++++++++++++++++ .../sql/delta/DeltaTypeWideningSuite.scala | 93 +++--- .../delta/MergeIntoSchemaEvolutionSuite.scala | 4 +- .../spark/sql/delta/MergeIntoTestUtils.scala | 21 -- 10 files changed, 429 insertions(+), 68 deletions(-) create mode 100644 spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningAutomaticSuite.scala diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala b/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala index a8d1def58ce..918a4f671f1 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala @@ -234,7 +234,7 @@ object ResolveDeltaMergeInto { // schema before merging it with the target schema. We don't consider NOT MATCHED BY SOURCE // clauses since these can't by definition reference source columns and thus can't introduce // new columns in the target schema. - val actions = (matchedClauses ++ notMatchedClauses).flatMap(_.actions) + val actions = (resolvedMatchedClauses ++ resolvedNotMatchedClauses).flatMap(_.actions) val assignments = actions.collect { case a: DeltaMergeAction => a.targetColNameParts } val containsStarAction = actions.exists { case _: UnresolvedStar => true @@ -278,6 +278,15 @@ object ResolveDeltaMergeInto { }) val migrationSchema = filterSchema(source.schema, Seq.empty) + val allowTypeWidening = EliminateSubqueryAliases(target) match { + case DeltaFullTable(_, index) => + TypeWidening.isEnabled( + index.snapshotAtAnalysis.protocol, + index.snapshotAtAnalysis.metadata + ) + case o => throw DeltaErrors.notADeltaSourceException("MERGE", Some(o)) + } + // The implicit conversions flag allows any type to be merged from source to target if Spark // SQL considers the source type implicitly castable to the target. Normally, mergeSchemas // enforces Parquet-level write compatibility, which would mean an INT source can't be merged @@ -285,7 +294,9 @@ object ResolveDeltaMergeInto { SchemaMergingUtils.mergeSchemas( target.schema, migrationSchema, - allowImplicitConversions = true) + allowImplicitConversions = true, + allowTypeWidening = allowTypeWidening + ) } else { target.schema } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala index 2efbdf11e47..ffcdc83ba54 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala @@ -62,6 +62,19 @@ object TypeWidening { case _ => false } + /** + * Returns whether the given type change is eligible for **automatic** widening. Only a subset of + * supported type changes are considered for automatic widening. + */ + def isAutomaticTypeChangeSupported(fromType: AtomicType, toType: AtomicType): Boolean = + (fromType, toType) match { + case (from, to) if !isTypeChangeSupported(from, to) => false + case (from, to) if from == to => true + case (ByteType, ShortType) => true + case (ByteType | ShortType, IntegerType) => true + case _ => false + } + /** * Filter the given list of files to only keep files that were written before the latest type * change, if any. These older files contain a column or field with a type that is different than diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala b/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala index 9b108f9715e..20320440e1f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala @@ -122,7 +122,14 @@ trait ImplicitMetadataOperation extends DeltaLogging { if (rearrangeOnly) { throw DeltaErrors.unexpectedDataChangeException("Change the Delta table schema") } - txn.updateMetadata(txn.metadata.copy(schemaString = mergedSchema.json + + val schemaWithTypeWideningMetadata = TypeWideningMetadata.addTypeWideningMetadata( + txn, + schema = mergedSchema, + oldSchema = txn.metadata.schema + ) + + txn.updateMetadata(txn.metadata.copy(schemaString = schemaWithTypeWideningMetadata.json )) } else if (isNewSchema || isNewPartitioning ) { @@ -201,7 +208,8 @@ object ImplicitMetadataOperation { SchemaMergingUtils.mergeSchemas( txn.metadata.schema, dataSchema, - fixedTypeColumns = fixedTypeColumns) + fixedTypeColumns = fixedTypeColumns, + allowTypeWidening = TypeWidening.isEnabled(txn.protocol, txn.metadata)) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala index e57f181e73e..1bac6adfad4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala @@ -20,14 +20,13 @@ import java.util.Locale import scala.util.control.NonFatal -import org.apache.spark.sql.delta.DeltaAnalysisException +import org.apache.spark.sql.delta.{DeltaAnalysisException, TypeWidening} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion, UnresolvedAttribute} import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap -import org.apache.spark.sql.types.{ArrayType, ByteType, DataType, DecimalType, IntegerType, MapType, NullType, ShortType, StructField, StructType} +import org.apache.spark.sql.types._ /** * Utils to merge table schema with data schema. @@ -168,6 +167,7 @@ object SchemaMergingUtils { dataSchema: StructType, allowImplicitConversions: Boolean = false, keepExistingType: Boolean = false, + allowTypeWidening: Boolean = false, fixedTypeColumns: Set[String] = Set.empty, caseSensitive: Boolean = false): StructType = { checkColumnNameDuplication(dataSchema, "in the data to save", caseSensitive) @@ -232,6 +232,9 @@ object SchemaMergingUtils { // Simply keeps the existing type for primitive types case (current, update) if keepExistingType => current + case (current: AtomicType, update: AtomicType) if allowTypeWidening && + TypeWidening.isAutomaticTypeChangeSupported(current, update) => update + // If implicit conversions are allowed, that means we can use any valid implicit cast to // perform the merge. case (current, update) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala index b7fbad30351..88848ede56a 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala @@ -1432,7 +1432,10 @@ trait DeltaErrorsSuiteBase val e = intercept[DeltaAnalysisException] { val s1 = StructType(Seq(StructField("c0", IntegerType, true))) val s2 = StructType(Seq(StructField("c0", StringType, false))) - SchemaMergingUtils.mergeSchemas(s1, s2, false, false, Set("c0")) + SchemaMergingUtils.mergeSchemas(s1, s2, + allowImplicitConversions = false, + keepExistingType = false, + allowTypeWidening = false, Set("c0")) } checkErrorMessage(e, Some("DELTA_GENERATED_COLUMNS_DATA_TYPE_MISMATCH"), Some("42K09"), Some("Column c0 is a generated column or a column used by a generated " + diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala index 0d684300bfc..abfe74c426b 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala @@ -41,10 +41,11 @@ import org.apache.spark.scheduler.{JobFailed, SparkListener, SparkListenerJobEnd import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.util.quietly +import org.apache.spark.sql.catalyst.util.{quietly, FailFastMode} import org.apache.spark.sql.execution.{FileSourceScanExec, QueryExecution, RDDScanExec, SparkPlan, WholeStageCodegenExec} import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.QueryExecutionListener import org.apache.spark.util.Utils @@ -467,6 +468,8 @@ trait DeltaDMLTestUtils with BeforeAndAfterEach { self: SharedSparkSession => + import testImplicits._ + protected var tempDir: File = _ protected var deltaLog: DeltaLog = _ @@ -515,6 +518,23 @@ trait DeltaDMLTestUtils } } + /** + * Parse the input JSON data into a dataframe, one row per input element. + * Throws an exception on malformed inputs or records that don't comply with the provided schema. + */ + protected def readFromJSON(data: Seq[String], schema: StructType = null): DataFrame = { + if (schema != null) { + spark.read + .schema(schema) + .option("mode", FailFastMode.name) + .json(data.toDS) + } else { + spark.read + .option("mode", FailFastMode.name) + .json(data.toDS) + } + } + protected def readDeltaTable(path: String): DataFrame = { spark.read.format("delta").load(path) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningAutomaticSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningAutomaticSuite.scala new file mode 100644 index 00000000000..61edf9c3322 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningAutomaticSuite.scala @@ -0,0 +1,303 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{DataFrame, QueryTest, SaveMode} +import org.apache.spark.sql.functions.{col, lit} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy +import org.apache.spark.sql.types._ + + +/** + * Suite covering widening columns and fields type as part of automatic schema evolution when the + * type widening table feature is supported. + */ +class DeltaTypeWideningAutomaticSuite + extends QueryTest + with DeltaDMLTestUtils + with DeltaSQLCommandTest + with DeltaTypeWideningTestMixin + with DeltaMergeIntoAutomaticTypeEvolutionTests { + + protected override def sparkConf: SparkConf = { + super.sparkConf + .set(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key, "true") + } +} + +/** + * Tests covering type widening during schema evolution in MERGE INTO. + */ +trait DeltaMergeIntoAutomaticTypeEvolutionTests + extends MergeIntoSQLTestUtils + with MergeIntoSchemaEvolutionMixin + with DeltaTypeWideningTestCases { + self: QueryTest with DeltaTypeWideningTestMixin with DeltaDMLTestUtils => + + import testImplicits._ + + for { + testCase <- supportedTestCases + } { + test(s"automatic type widening in merge ${testCase.fromType.sql} -> ${testCase.toType.sql}") { + withTable("source") { + testCase.additionalValuesDF.write.format("delta").saveAsTable("source") + append(testCase.initialValuesDF) + + // We mainly want to ensure type widening is correctly applied to the schema. We use a + // trivial insert only merge to make it easier to validate results. + executeMerge( + tgt = s"delta.`$tempPath` t", + src = "source", + cond = "0 = 1", + clauses = insert("*")) + + assert(readDeltaTable(tempPath).schema("value").dataType === testCase.toType) + checkAnswer( + readDeltaTable(tempPath).select("value").sort("value"), + testCase.expectedResult.select($"value".cast(testCase.toType)).sort("value")) + } + } + } + + for { + testCase <- unsupportedTestCases + } { + test(s"unsupported automatic type widening in merge " + + s"${testCase.fromType.sql} -> ${testCase.toType.sql}") { + withTable("source") { + testCase.additionalValuesDF.write.format("delta").saveAsTable("source") + append(testCase.initialValuesDF) + + // Test cases for some of the unsupported type changes may overflow while others only have + // values that can be implicitly cast to the narrower type - e.g. double ->float. + // We set storeAssignmentPolicy to LEGACY to ignore overflows, this test only ensures + // that the table schema didn't evolve. + withSQLConf(SQLConf.STORE_ASSIGNMENT_POLICY.key -> StoreAssignmentPolicy.LEGACY.toString) { + executeMerge( + tgt = s"delta.`$tempPath` t", + src = "source", + cond = "0 = 1", + clauses = insert("*")) + assert(readDeltaTable(tempPath).schema("value").dataType === testCase.fromType) + } + } + } + } + + test("type widening isn't applied in merge when schema evolution is disabled") { + withTable("source") { + sql(s"CREATE TABLE delta.`$tempPath` (a short) USING DELTA") + sql("CREATE TABLE source (a int) USING DELTA") + sql("INSERT INTO source VALUES (1), (2)") + + withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "false") { + // Merge integer values. This should succeed and downcast the values to short. + executeMerge( + tgt = s"delta.`$tempPath` t", + src = "source", + cond = "0 = 1", + clauses = insert("*") + ) + assert(readDeltaTable(tempPath).schema("a").dataType === ShortType) + checkAnswer(readDeltaTable(tempPath), + Seq(1, 2).toDF("a").select($"a".cast(ShortType))) + } + + // Check that we would actually widen if schema evolution was enabled. + withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "true") { + executeMerge( + tgt = s"delta.`$tempPath` t", + src = "source", + cond = "0 = 1", + clauses = insert("*") + ) + assert(readDeltaTable(tempPath).schema("a").dataType === IntegerType) + checkAnswer(readDeltaTable(tempPath), Seq(1, 2, 1, 2).toDF("a")) + } + } + } + + /** + * Wrapper around testNestedStructsEvolution that constrains the result with and without schema + * evolution to be the same: the schema is different but the values should be the same. + */ + protected def testTypeEvolution(name: String)( + target: Seq[String], + source: Seq[String], + targetSchema: StructType, + sourceSchema: StructType, + cond: String = "t.key = s.key", + clauses: Seq[MergeClause] = Seq.empty, + result: Seq[String], + resultSchema: StructType): Unit = + testNestedStructsEvolution(name)( + target, + source, + targetSchema, + sourceSchema, + cond, + clauses, + result, + resultWithoutEvolution = result, + resultSchema = resultSchema) + + + testTypeEvolution("change top-level column short -> int with update")( + target = Seq("""{ "a": 0 }""", """{ "a": 10 }"""), + source = Seq("""{ "a": 0 }""", """{ "a": 20 }"""), + targetSchema = new StructType().add("a", ShortType), + sourceSchema = new StructType().add("a", IntegerType), + cond = "t.a = s.a", + clauses = update("a = s.a + 1") :: Nil, + result = Seq("""{ "a": 1 }""", """{ "a": 10 }"""), + resultSchema = new StructType() + .add("a", IntegerType, nullable = true, metadata(1, ShortType, IntegerType)) + ) + + testTypeEvolution("change top-level column short -> int with insert")( + target = Seq("""{ "a": 0 }""", """{ "a": 10 }"""), + source = Seq("""{ "a": 0 }""", """{ "a": 20 }"""), + targetSchema = new StructType().add("a", ShortType), + sourceSchema = new StructType().add("a", IntegerType), + cond = "t.a = s.a", + clauses = insert("(a) VALUES (s.a)") :: Nil, + result = Seq("""{ "a": 0 }""", """{ "a": 10 }""", """{ "a": 20 }"""), + resultSchema = new StructType() + .add("a", IntegerType, nullable = true, metadata(1, ShortType, IntegerType)) + ) + + testTypeEvolution("updating using narrower value doesn't evolve schema")( + target = Seq("""{ "a": 0 }""", """{ "a": 10 }"""), + source = Seq("""{ "a": 0 }""", """{ "a": 20 }"""), + targetSchema = new StructType().add("a", IntegerType), + sourceSchema = new StructType().add("a", ShortType), + cond = "t.a = s.a", + clauses = update("a = s.a + 1") :: Nil, + result = Seq("""{ "a": 1 }""", """{ "a": 10 }"""), + resultSchema = new StructType().add("a", IntegerType) + ) + + testTypeEvolution("only columns in assignments are widened")( + target = Seq("""{ "a": 0, "b": 5 }""", """{ "a": 10, "b": 15 }"""), + source = Seq("""{ "a": 0, "b": 6 }""", """{ "a": 20, "b": 16 }"""), + targetSchema = new StructType() + .add("a", ShortType) + .add("b", ShortType), + sourceSchema = new StructType() + .add("a", IntegerType) + .add("b", IntegerType), + cond = "t.a = s.a", + clauses = update("a = s.a + 1") :: Nil, + result = Seq( + """{ "a": 1, "b": 5 }""", """{ "a": 10, "b": 15 }"""), + resultSchema = new StructType() + .add("a", IntegerType, nullable = true, metadata(1, ShortType, IntegerType)) + .add("b", ShortType) + ) + + testTypeEvolution("automatic widening of struct field with struct assignment")( + target = Seq("""{ "s": { "a": 1 } }""", """{ "s": { "a": 10 } }"""), + source = Seq("""{ "s": { "a": 1 } }""", """{ "s": { "a": 20 } }"""), + targetSchema = new StructType() + .add("s", new StructType() + .add("a", ShortType)), + sourceSchema = new StructType() + .add("s", new StructType() + .add("a", IntegerType)), + cond = "t.s.a = s.s.a", + clauses = update("t.s.a = s.s.a + 1") :: Nil, + result = Seq("""{ "s": { "a": 2 } }""", """{ "s": { "a": 10 } }"""), + resultSchema = new StructType() + .add("s", new StructType() + .add("a", IntegerType, nullable = true, metadata(1, ShortType, IntegerType))) + ) + + testTypeEvolution("automatic widening of struct field with field assignment")( + target = Seq("""{ "s": { "a": 1 } }""", """{ "s": { "a": 10 } }"""), + source = Seq("""{ "s": { "a": 1 } }""", """{ "s": { "a": 20 } }"""), + targetSchema = new StructType() + .add("s", new StructType() + .add("a", ShortType)), + sourceSchema = new StructType() + .add("s", new StructType() + .add("a", IntegerType)), + cond = "t.s.a = s.s.a", + clauses = update("t.s.a = s.s.a + 1") :: Nil, + result = Seq("""{ "s": { "a": 2 } }""", """{ "s": { "a": 10 } }"""), + resultSchema = new StructType() + .add("s", new StructType() + .add("a", IntegerType, nullable = true, metadata(1, ShortType, IntegerType))) + ) + + testTypeEvolution("automatic widening of map value")( + target = Seq("""{ "m": { "a": 1 } }"""), + source = Seq("""{ "m": { "a": 2 } }"""), + targetSchema = new StructType() + .add("m", MapType(StringType, ShortType)), + sourceSchema = new StructType() + .add("m", MapType(StringType, IntegerType)), + // Can't compare maps + cond = "1 = 1", + clauses = update("t.m = s.m") :: Nil, + result = Seq("""{ "m": { "a": 2 } }"""), + resultSchema = new StructType() + .add("m", + MapType(StringType, IntegerType), + nullable = true, + metadata(1, ShortType, IntegerType, Seq("value"))) + ) + + testTypeEvolution("automatic widening of array element")( + target = Seq("""{ "a": [1, 2] }"""), + source = Seq("""{ "a": [3, 4] }"""), + targetSchema = new StructType() + .add("a", ArrayType(ShortType)), + sourceSchema = new StructType() + .add("a", ArrayType(IntegerType)), + cond = "t.a != s.a", + clauses = update("t.a = s.a") :: Nil, + result = Seq("""{ "a": [3, 4] }"""), + resultSchema = new StructType() + .add("a", + ArrayType(IntegerType), + nullable = true, + metadata(1, ShortType, IntegerType, Seq("element"))) + ) + + testTypeEvolution("multiple automatic widening")( + target = Seq("""{ "a": 1, "b": 2 }"""), + source = Seq("""{ "a": 1, "b": 4 }""", """{ "a": 5, "b": 6 }"""), + targetSchema = new StructType() + .add("a", ByteType) + .add("b", ShortType), + sourceSchema = new StructType() + .add("a", ShortType) + .add("b", IntegerType), + cond = "t.a = s.a", + clauses = update("*") :: insert("*") :: Nil, + result = Seq("""{ "a": 1, "b": 4 }""", """{ "a": 5, "b": 6 }"""), + resultSchema = new StructType() + .add("a", ShortType, nullable = true, metadata(1, ByteType, ShortType)) + .add("b", IntegerType, nullable = true, metadata(1, ShortType, IntegerType)) + ) +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala index 9fb52146ee2..0099aeca5fb 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala @@ -67,15 +67,22 @@ trait DeltaTypeWideningTestMixin extends SharedSparkSession { protected def enableTypeWidening(tablePath: String, enabled: Boolean = true): Unit = sql(s"ALTER TABLE delta.`$tablePath` " + s"SET TBLPROPERTIES('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = '${enabled.toString}')") + + protected def metadata( + version: Long, + fromType: AtomicType, + toType: AtomicType, + path: Seq[String] = Seq.empty): Metadata = + new MetadataBuilder() + .putMetadataArray( + "delta.typeChanges", Array(TypeChange(version, fromType, toType, path).toMetadata)) + .build() } /** - * Trait collecting a subset of tests providing core coverage for type widening using ALTER TABLE - * CHANGE COLUMN TYPE. + * Trait collecting supported and unsupported type change test cases. */ -trait DeltaTypeWideningAlterTableTests extends QueryErrorsBase { - self: QueryTest with ParquetTest with DeltaTypeWideningTestMixin with DeltaDMLTestUtils => - +trait DeltaTypeWideningTestCases { self: SharedSparkSession => import testImplicits._ /** @@ -119,7 +126,7 @@ trait DeltaTypeWideningAlterTableTests extends QueryErrorsBase { // Type changes that are supported by all Parquet readers. Byte, Short, Int are all stored as // INT32 in parquet so these changes are guaranteed to be supported. - private val supportedTestCases: Seq[TypeEvolutionTestCase] = Seq( + protected val supportedTestCases: Seq[TypeEvolutionTestCase] = Seq( SupportedTypeEvolutionTestCase(ByteType, ShortType, Seq(1, -1, Byte.MinValue, Byte.MaxValue, null.asInstanceOf[Byte]), Seq(4, -4, Short.MinValue, Short.MaxValue, null.asInstanceOf[Short])), @@ -131,35 +138,6 @@ trait DeltaTypeWideningAlterTableTests extends QueryErrorsBase { Seq(4, -4, Int.MinValue, Int.MaxValue, null.asInstanceOf[Int])) ) - for { - testCase <- supportedTestCases - partitioned <- BOOLEAN_DOMAIN - } { - test(s"type widening ${testCase.fromType.sql} -> ${testCase.toType.sql}, " + - s"partitioned=$partitioned") { - def writeData(df: DataFrame): Unit = if (partitioned) { - // The table needs to have at least 1 non-partition column, use a dummy one. - append(df.withColumn("dummy", lit(1)), partitionBy = Seq("value")) - } else { - append(df) - } - - writeData(testCase.initialValuesDF) - sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN value TYPE ${testCase.toType.sql}") - withAllParquetReaders { - assert(readDeltaTable(tempPath).schema("value").dataType === testCase.toType) - checkAnswer(readDeltaTable(tempPath).select("value").sort("value"), - testCase.initialValuesDF.select($"value".cast(testCase.toType)).sort("value")) - } - writeData(testCase.additionalValuesDF) - withAllParquetReaders { - checkAnswer( - readDeltaTable(tempPath).select("value").sort("value"), - testCase.expectedResult.sort("value")) - } - } - } - /** * Represents the input of an unsupported type change test. Handles converting the test values * from scala types to a dataframe. Additional values to insert are always empty since the type @@ -183,7 +161,7 @@ trait DeltaTypeWideningAlterTableTests extends QueryErrorsBase { } // Test type changes that aren't supported. - private val unsupportedTestCases: Seq[TypeEvolutionTestCase] = Seq( + protected val unsupportedTestCases: Seq[TypeEvolutionTestCase] = Seq( UnsupportedTypeEvolutionTestCase(IntegerType, ByteType, Seq(1, 2, Int.MinValue)), UnsupportedTypeEvolutionTestCase(LongType, IntegerType, @@ -192,6 +170,10 @@ trait DeltaTypeWideningAlterTableTests extends QueryErrorsBase { Seq(987654321.987654321d, Double.NaN, Double.NegativeInfinity, Double.PositiveInfinity, Double.MinPositiveValue, Double.MinValue, Double.MaxValue)), + UnsupportedTypeEvolutionTestCase(ByteType, DecimalType(2, 0), + Seq(1, -1, Byte.MinValue)), + UnsupportedTypeEvolutionTestCase(ShortType, DecimalType(4, 0), + Seq(1, -1, Short.MinValue)), UnsupportedTypeEvolutionTestCase(IntegerType, DecimalType(9, 0), Seq(1, -1, Int.MinValue)), UnsupportedTypeEvolutionTestCase(LongType, DecimalType(19, 0), @@ -219,6 +201,45 @@ trait DeltaTypeWideningAlterTableTests extends QueryErrorsBase { DecimalType(Decimal.MAX_INT_DIGITS + 3, 1), Seq(BigDecimal("-67.89"), BigDecimal("9" * (Decimal.MAX_LONG_DIGITS - 2) + ".99"))) ) +} + +/** + * Trait collecting a subset of tests providing core coverage for type widening using ALTER TABLE + * CHANGE COLUMN TYPE. + */ +trait DeltaTypeWideningAlterTableTests extends QueryErrorsBase with DeltaTypeWideningTestCases { + self: QueryTest with ParquetTest with DeltaTypeWideningTestMixin with DeltaDMLTestUtils => + + import testImplicits._ + + for { + testCase <- supportedTestCases + partitioned <- BOOLEAN_DOMAIN + } { + test(s"type widening ${testCase.fromType.sql} -> ${testCase.toType.sql}, " + + s"partitioned=$partitioned") { + def writeData(df: DataFrame): Unit = if (partitioned) { + // The table needs to have at least 1 non-partition column, use a dummy one. + append(df.withColumn("dummy", lit(1)), partitionBy = Seq("value")) + } else { + append(df) + } + + writeData(testCase.initialValuesDF) + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN value TYPE ${testCase.toType.sql}") + withAllParquetReaders { + assert(readDeltaTable(tempPath).schema("value").dataType === testCase.toType) + checkAnswer(readDeltaTable(tempPath).select("value").sort("value"), + testCase.initialValuesDF.select($"value".cast(testCase.toType)).sort("value")) + } + writeData(testCase.additionalValuesDF) + withAllParquetReaders { + checkAnswer( + readDeltaTable(tempPath).select("value").sort("value"), + testCase.expectedResult.sort("value")) + } + } + } for { testCase <- unsupportedTestCases diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSchemaEvolutionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSchemaEvolutionSuite.scala index 07ab309ad8c..e6997a6b08c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSchemaEvolutionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSchemaEvolutionSuite.scala @@ -83,13 +83,13 @@ trait MergeIntoSchemaEvolutionMixin { } test(s"schema evolution - $name - with evolution disabled") { - withSQLConf(confs: _*) { + withSQLConf(confs :+ (DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key, "false"): _*) { executeMergeAndAssert(expectedWithoutEvolution, expectErrorWithoutEvolutionContains) } } test(s"schema evolution - $name - on via DeltaSQLConf") { - withSQLConf((confs :+ (DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key, "true")): _*) { + withSQLConf(confs :+ (DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key, "true"): _*) { executeMergeAndAssert(expected, expectErrorContains) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoTestUtils.scala index b3465029ad5..59ba84e1e94 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoTestUtils.scala @@ -20,10 +20,8 @@ import org.apache.spark.sql.delta.test.DeltaSQLTestUtils import io.delta.tables._ import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.catalyst.util.FailFastMode import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.StructType /** * Base trait collecting helper methods to run MERGE tests. Merge test suite will want to mix in @@ -33,8 +31,6 @@ import org.apache.spark.sql.types.StructType trait MergeIntoTestUtils extends DeltaDMLTestUtils with MergeHelpers { self: SharedSparkSession => - import testImplicits._ - protected def executeMerge( target: String, source: String, @@ -51,23 +47,6 @@ trait MergeIntoTestUtils extends DeltaDMLTestUtils with MergeHelpers { protected def withCrossJoinEnabled(body: => Unit): Unit = { withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { body } } - - /** - * Parse the input JSON data into a dataframe, one row per input element. - * Throws an exception on malformed inputs or records that don't comply with the provided schema. - */ - protected def readFromJSON(data: Seq[String], schema: StructType = null): DataFrame = { - if (schema != null) { - spark.read - .schema(schema) - .option("mode", FailFastMode.name) - .json(data.toDS) - } else { - spark.read - .option("mode", FailFastMode.name) - .json(data.toDS) - } - } } trait MergeIntoSQLTestUtils extends DeltaSQLTestUtils with MergeIntoTestUtils { From 7824e9f635a5726a0b66c86e76a528655c6dde49 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Tue, 19 Mar 2024 10:06:14 +0100 Subject: [PATCH 2/6] Address comments --- .../sql/delta/ResolveDeltaMergeInto.scala | 2 +- .../apache/spark/sql/delta/TypeWidening.scala | 6 +-- .../sql/delta/schema/SchemaMergingUtils.scala | 2 +- .../spark/sql/delta/DeltaErrorsSuite.scala | 4 +- ...ltaTypeWideningSchemaEvolutionSuite.scala} | 39 +++++++++++++------ .../sql/delta/DeltaTypeWideningSuite.scala | 9 +++-- 6 files changed, 40 insertions(+), 22 deletions(-) rename spark/src/test/scala/org/apache/spark/sql/delta/{DeltaTypeWideningAutomaticSuite.scala => DeltaTypeWideningSchemaEvolutionSuite.scala} (88%) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala b/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala index 918a4f671f1..d85ce76377e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala @@ -284,7 +284,7 @@ object ResolveDeltaMergeInto { index.snapshotAtAnalysis.protocol, index.snapshotAtAnalysis.metadata ) - case o => throw DeltaErrors.notADeltaSourceException("MERGE", Some(o)) + case other => throw DeltaErrors.notADeltaSourceException("MERGE", Some(other)) } // The implicit conversions flag allows any type to be merged from source to target if Spark diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala index ffcdc83ba54..8a7b0822f83 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala @@ -63,10 +63,10 @@ object TypeWidening { } /** - * Returns whether the given type change is eligible for **automatic** widening. Only a subset of - * supported type changes are considered for automatic widening. + * Returns whether the given type change can be applied during schema evolution. Only a + * subset of supported type changes are considered for schema evolution. */ - def isAutomaticTypeChangeSupported(fromType: AtomicType, toType: AtomicType): Boolean = + def isTypeChangeSupportedForSchemaEvolution(fromType: AtomicType, toType: AtomicType): Boolean = (fromType, toType) match { case (from, to) if !isTypeChangeSupported(from, to) => false case (from, to) if from == to => true diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala index 1bac6adfad4..a34a351d45c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala @@ -233,7 +233,7 @@ object SchemaMergingUtils { case (current, update) if keepExistingType => current case (current: AtomicType, update: AtomicType) if allowTypeWidening && - TypeWidening.isAutomaticTypeChangeSupported(current, update) => update + TypeWidening.isTypeChangeSupportedForSchemaEvolution(current, update) => update // If implicit conversions are allowed, that means we can use any valid implicit cast to // perform the merge. diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala index 88848ede56a..22b52bbf442 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala @@ -1435,7 +1435,9 @@ trait DeltaErrorsSuiteBase SchemaMergingUtils.mergeSchemas(s1, s2, allowImplicitConversions = false, keepExistingType = false, - allowTypeWidening = false, Set("c0")) + allowTypeWidening = false, + Set("c0") + ) } checkErrorMessage(e, Some("DELTA_GENERATED_COLUMNS_DATA_TYPE_MISMATCH"), Some("42K09"), Some("Column c0 is a generated column or a column used by a generated " + diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningAutomaticSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSchemaEvolutionSuite.scala similarity index 88% rename from spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningAutomaticSuite.scala rename to spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSchemaEvolutionSuite.scala index 61edf9c3322..ddcf1958007 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningAutomaticSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSchemaEvolutionSuite.scala @@ -31,12 +31,12 @@ import org.apache.spark.sql.types._ * Suite covering widening columns and fields type as part of automatic schema evolution when the * type widening table feature is supported. */ -class DeltaTypeWideningAutomaticSuite +class DeltaTypeWideningSchemaEvolutionSuite extends QueryTest with DeltaDMLTestUtils with DeltaSQLCommandTest with DeltaTypeWideningTestMixin - with DeltaMergeIntoAutomaticTypeEvolutionTests { + with DeltaMergeIntoTypeWideningSchemaEvolutionTests { protected override def sparkConf: SparkConf = { super.sparkConf @@ -47,7 +47,7 @@ class DeltaTypeWideningAutomaticSuite /** * Tests covering type widening during schema evolution in MERGE INTO. */ -trait DeltaMergeIntoAutomaticTypeEvolutionTests +trait DeltaMergeIntoTypeWideningSchemaEvolutionTests extends MergeIntoSQLTestUtils with MergeIntoSchemaEvolutionMixin with DeltaTypeWideningTestCases { @@ -171,7 +171,8 @@ trait DeltaMergeIntoAutomaticTypeEvolutionTests clauses = update("a = s.a + 1") :: Nil, result = Seq("""{ "a": 1 }""", """{ "a": 10 }"""), resultSchema = new StructType() - .add("a", IntegerType, nullable = true, metadata(1, ShortType, IntegerType)) + .add("a", IntegerType, nullable = true, + metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)) ) testTypeEvolution("change top-level column short -> int with insert")( @@ -183,7 +184,8 @@ trait DeltaMergeIntoAutomaticTypeEvolutionTests clauses = insert("(a) VALUES (s.a)") :: Nil, result = Seq("""{ "a": 0 }""", """{ "a": 10 }""", """{ "a": 20 }"""), resultSchema = new StructType() - .add("a", IntegerType, nullable = true, metadata(1, ShortType, IntegerType)) + .add("a", IntegerType, nullable = true, + metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)) ) testTypeEvolution("updating using narrower value doesn't evolve schema")( @@ -211,7 +213,8 @@ trait DeltaMergeIntoAutomaticTypeEvolutionTests result = Seq( """{ "a": 1, "b": 5 }""", """{ "a": 10, "b": 15 }"""), resultSchema = new StructType() - .add("a", IntegerType, nullable = true, metadata(1, ShortType, IntegerType)) + .add("a", IntegerType, nullable = true, + metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)) .add("b", ShortType) ) @@ -229,7 +232,8 @@ trait DeltaMergeIntoAutomaticTypeEvolutionTests result = Seq("""{ "s": { "a": 2 } }""", """{ "s": { "a": 10 } }"""), resultSchema = new StructType() .add("s", new StructType() - .add("a", IntegerType, nullable = true, metadata(1, ShortType, IntegerType))) + .add("a", IntegerType, nullable = true, + metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType))) ) testTypeEvolution("automatic widening of struct field with field assignment")( @@ -246,7 +250,8 @@ trait DeltaMergeIntoAutomaticTypeEvolutionTests result = Seq("""{ "s": { "a": 2 } }""", """{ "s": { "a": 10 } }"""), resultSchema = new StructType() .add("s", new StructType() - .add("a", IntegerType, nullable = true, metadata(1, ShortType, IntegerType))) + .add("a", IntegerType, nullable = true, + metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType))) ) testTypeEvolution("automatic widening of map value")( @@ -264,7 +269,11 @@ trait DeltaMergeIntoAutomaticTypeEvolutionTests .add("m", MapType(StringType, IntegerType), nullable = true, - metadata(1, ShortType, IntegerType, Seq("value"))) + metadata = typeWideningMetadata( + version = 1, + from = ShortType, + to = IntegerType, + path = Seq("value"))) ) testTypeEvolution("automatic widening of array element")( @@ -281,7 +290,11 @@ trait DeltaMergeIntoAutomaticTypeEvolutionTests .add("a", ArrayType(IntegerType), nullable = true, - metadata(1, ShortType, IntegerType, Seq("element"))) + metadata = typeWideningMetadata( + version = 1, + from = ShortType, + to = IntegerType, + path =Seq("element"))) ) testTypeEvolution("multiple automatic widening")( @@ -297,7 +310,9 @@ trait DeltaMergeIntoAutomaticTypeEvolutionTests clauses = update("*") :: insert("*") :: Nil, result = Seq("""{ "a": 1, "b": 4 }""", """{ "a": 5, "b": 6 }"""), resultSchema = new StructType() - .add("a", ShortType, nullable = true, metadata(1, ByteType, ShortType)) - .add("b", IntegerType, nullable = true, metadata(1, ShortType, IntegerType)) + .add("a", ShortType, nullable = true, + metadata = typeWideningMetadata(version = 1, from = ByteType, to = ShortType)) + .add("b", IntegerType, nullable = true, + metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)) ) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala index 0099aeca5fb..a586fc2e905 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala @@ -68,14 +68,15 @@ trait DeltaTypeWideningTestMixin extends SharedSparkSession { sql(s"ALTER TABLE delta.`$tablePath` " + s"SET TBLPROPERTIES('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = '${enabled.toString}')") - protected def metadata( + /** Short-hand to create type widening metadata for struct fields. */ + protected def typeWideningMetadata( version: Long, - fromType: AtomicType, - toType: AtomicType, + from: AtomicType, + to: AtomicType, path: Seq[String] = Seq.empty): Metadata = new MetadataBuilder() .putMetadataArray( - "delta.typeChanges", Array(TypeChange(version, fromType, toType, path).toMetadata)) + "delta.typeChanges", Array(TypeChange(version, from, to, path).toMetadata)) .build() } From 90f639bf87cbadbcd44ac627fc0fb4360f38e6a0 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Tue, 19 Mar 2024 10:32:18 +0100 Subject: [PATCH 3/6] Improve matching delta table in MERGE resolution --- .../spark/sql/delta/ResolveDeltaMergeInto.scala | 11 ++++------- .../delta/DeltaTypeWideningSchemaEvolutionSuite.scala | 2 +- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala b/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala index d85ce76377e..4f441ec50d6 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala @@ -278,13 +278,10 @@ object ResolveDeltaMergeInto { }) val migrationSchema = filterSchema(source.schema, Seq.empty) - val allowTypeWidening = EliminateSubqueryAliases(target) match { - case DeltaFullTable(_, index) => - TypeWidening.isEnabled( - index.snapshotAtAnalysis.protocol, - index.snapshotAtAnalysis.metadata - ) - case other => throw DeltaErrors.notADeltaSourceException("MERGE", Some(other)) + val allowTypeWidening = target.exists { + case DeltaTable(fileIndex) => + TypeWidening.isEnabled(fileIndex.protocol, fileIndex.metadata) + case _ => false } // The implicit conversions flag allows any type to be merged from source to target if Spark diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSchemaEvolutionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSchemaEvolutionSuite.scala index ddcf1958007..e42ac9f5fbc 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSchemaEvolutionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSchemaEvolutionSuite.scala @@ -294,7 +294,7 @@ trait DeltaMergeIntoTypeWideningSchemaEvolutionTests version = 1, from = ShortType, to = IntegerType, - path =Seq("element"))) + path = Seq("element"))) ) testTypeEvolution("multiple automatic widening")( From 1445c6f5494875109cc8fdb3aca55c25f751179f Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Thu, 21 Mar 2024 17:16:50 +0100 Subject: [PATCH 4/6] Handle concurrent transaction enabling/disabling feature --- .../apache/spark/sql/delta/TypeWidening.scala | 16 ++++- .../sql/delta/commands/MergeIntoCommand.scala | 9 +++ ...eltaTypeWideningSchemaEvolutionSuite.scala | 59 +++++++++++++++++-- 3 files changed, 77 insertions(+), 7 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala index 8a7b0822f83..7a563232647 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala @@ -47,6 +47,20 @@ object TypeWidening { isEnabled } + /** + * Checks that the type widening table property wasn't disabled or enabled between the two given + * states, throws an errors if it was. + */ + def ensureFeatureConsistentlyEnabled( + protocol: Protocol, + metadata: Metadata, + otherProtocol: Protocol, + otherMetadata: Metadata): Unit = { + if (isEnabled(protocol, metadata) != isEnabled(otherProtocol, otherMetadata)) { + throw DeltaErrors.metadataChangedException(None) + } + } + /** * Returns whether the given type change is eligible for widening. This only checks atomic types. * It is the responsibility of the caller to recurse into structs, maps and arrays. @@ -68,8 +82,8 @@ object TypeWidening { */ def isTypeChangeSupportedForSchemaEvolution(fromType: AtomicType, toType: AtomicType): Boolean = (fromType, toType) match { - case (from, to) if !isTypeChangeSupported(from, to) => false case (from, to) if from == to => true + case (from, to) if !isTypeChangeSupported(from, to) => false case (ByteType, ShortType) => true case (ByteType | ShortType, IntegerType) => true case _ => false diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala index d2b4b1b283c..2723f8b0135 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala @@ -91,6 +91,15 @@ case class MergeIntoCommand( atAnalysis = target.schema, latestSchema = deltaTxn.metadata.schema) } + // Check that type widening wasn't enabled/disabled between analysis and the start of the + // transaction. + TypeWidening.ensureFeatureConsistentlyEnabled( + protocol = targetFileIndex.protocol, + metadata = targetFileIndex.metadata, + otherProtocol = deltaTxn.protocol, + otherMetadata = deltaTxn.metadata + ) + if (canMergeSchema) { updateMetadata( spark, deltaTxn, migratedSchema.getOrElse(target.schema), diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSchemaEvolutionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSchemaEvolutionSuite.scala index e42ac9f5fbc..58e0e7dff5f 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSchemaEvolutionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSchemaEvolutionSuite.scala @@ -20,8 +20,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.SparkConf -import org.apache.spark.sql.{DataFrame, QueryTest, SaveMode} -import org.apache.spark.sql.functions.{col, lit} +import org.apache.spark.sql.QueryTest import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy import org.apache.spark.sql.types._ @@ -58,7 +57,7 @@ trait DeltaMergeIntoTypeWideningSchemaEvolutionTests for { testCase <- supportedTestCases } { - test(s"automatic type widening in merge ${testCase.fromType.sql} -> ${testCase.toType.sql}") { + test(s"MERGE - automatic type widening ${testCase.fromType.sql} -> ${testCase.toType.sql}") { withTable("source") { testCase.additionalValuesDF.write.format("delta").saveAsTable("source") append(testCase.initialValuesDF) @@ -82,7 +81,7 @@ trait DeltaMergeIntoTypeWideningSchemaEvolutionTests for { testCase <- unsupportedTestCases } { - test(s"unsupported automatic type widening in merge " + + test(s"MERGE - unsupported automatic type widening " + s"${testCase.fromType.sql} -> ${testCase.toType.sql}") { withTable("source") { testCase.additionalValuesDF.write.format("delta").saveAsTable("source") @@ -104,7 +103,28 @@ trait DeltaMergeIntoTypeWideningSchemaEvolutionTests } } - test("type widening isn't applied in merge when schema evolution is disabled") { + test("MERGE - type widening isn't applied when it's disabled") { + withTable("source") { + sql(s"CREATE TABLE delta.`$tempPath` (a short) USING DELTA") + sql("CREATE TABLE source (a int) USING DELTA") + sql("INSERT INTO source VALUES (1), (2)") + enableTypeWidening(tempPath, enabled = false) + withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "true") { + // Merge integer values. This should succeed and downcast the values to short. + executeMerge( + tgt = s"delta.`$tempPath` t", + src = "source", + cond = "0 = 1", + clauses = insert("*") + ) + assert(readDeltaTable(tempPath).schema("a").dataType === ShortType) + checkAnswer(readDeltaTable(tempPath), + Seq(1, 2).toDF("a").select($"a".cast(ShortType))) + } + } + } + + test("MERGE - type widening isn't applied when schema evolution is disabled") { withTable("source") { sql(s"CREATE TABLE delta.`$tempPath` (a short) USING DELTA") sql("CREATE TABLE source (a int) USING DELTA") @@ -150,7 +170,7 @@ trait DeltaMergeIntoTypeWideningSchemaEvolutionTests clauses: Seq[MergeClause] = Seq.empty, result: Seq[String], resultSchema: StructType): Unit = - testNestedStructsEvolution(name)( + testNestedStructsEvolution(s"MERGE - $name")( target, source, targetSchema, @@ -315,4 +335,31 @@ trait DeltaMergeIntoTypeWideningSchemaEvolutionTests .add("b", IntegerType, nullable = true, metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)) ) + + for (enabled <- BOOLEAN_DOMAIN) + test(s"MERGE - fail if type widening gets ${if (enabled) "enabled" else "disabled"} by a " + + "concurrent transaction") { + sql(s"CREATE TABLE delta.`$tempPath` (a short) USING DELTA") + enableTypeWidening(tempPath, !enabled) + val target = io.delta.tables.DeltaTable.forPath(tempPath) + import testImplicits._ + val merge = target.as("target") + .merge( + source = Seq(1L).toDF("a").as("source"), + condition = "target.a = source.a") + .whenNotMatched().insertAll() + + // The MERGE operation was created with type widening enabled, which will apply during analysis. + // Disable type widening so that the actual execution runs with type widening disabled. + enableTypeWidening(tempPath, enabled) + checkError( + exception = intercept[DeltaRuntimeException] { merge.execute() }, + errorClass = "DELTA_TABLE_PROPERTY_CHANGED", + parameters = Map( + "key" -> "delta.enableTypeWidening", + "oldValue" -> (!enabled).toString, + "newValue" -> enabled.toString + ) + ) + } } From 0f1c6690b402ab307891b21783b8dbd967338f03 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Thu, 21 Mar 2024 17:31:03 +0100 Subject: [PATCH 5/6] Update test --- .../DeltaTypeWideningSchemaEvolutionSuite.scala | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSchemaEvolutionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSchemaEvolutionSuite.scala index 58e0e7dff5f..bf5317952c9 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSchemaEvolutionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSchemaEvolutionSuite.scala @@ -352,14 +352,8 @@ trait DeltaMergeIntoTypeWideningSchemaEvolutionTests // The MERGE operation was created with type widening enabled, which will apply during analysis. // Disable type widening so that the actual execution runs with type widening disabled. enableTypeWidening(tempPath, enabled) - checkError( - exception = intercept[DeltaRuntimeException] { merge.execute() }, - errorClass = "DELTA_TABLE_PROPERTY_CHANGED", - parameters = Map( - "key" -> "delta.enableTypeWidening", - "oldValue" -> (!enabled).toString, - "newValue" -> enabled.toString - ) - ) + intercept[MetadataChangedException] { + merge.execute() + } } } From af7f558025e9ab2adbfa8c1a821673a9af883b13 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Thu, 21 Mar 2024 22:01:58 +0100 Subject: [PATCH 6/6] Undo spurious changes --- .../delta/MergeIntoSchemaEvolutionSuite.scala | 37 ++++++++++++++++++- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSchemaEvolutionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSchemaEvolutionSuite.scala index e6997a6b08c..4768370a0e8 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSchemaEvolutionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSchemaEvolutionSuite.scala @@ -88,8 +88,8 @@ trait MergeIntoSchemaEvolutionMixin { } } - test(s"schema evolution - $name - on via DeltaSQLConf") { - withSQLConf(confs :+ (DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key, "true"): _*) { + test(s"schema evolution - $name") { + withSQLConf((confs :+ (DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key, "true")): _*) { executeMergeAndAssert(expected, expectErrorContains) } } @@ -1003,6 +1003,39 @@ trait MergeIntoSchemaEvolutionBaseTests { .toDF("key", "value"), expectErrorWithoutEvolutionContains = "cannot resolve s.value in UPDATE clause") + test("schema evolution enabled for the current command") { + withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "false") { + withTable("target", "source") { + Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value") + .write.format("delta").saveAsTable("target") + Seq((1, 1, 1), (2, 2, 2)).toDF("key", "value", "extra") + .write.format("delta").saveAsTable("source") + + // Should fail without schema evolution + val e = intercept[org.apache.spark.sql.AnalysisException] { + executeMerge( + "target", + "source", + "target.key = source.key", + update("extra = -1"), insert("*")) + } + assert(e.getErrorClass === "DELTA_MERGE_UNRESOLVED_EXPRESSION") + assert(e.getMessage.contains("resolve extra in UPDATE clause")) + + // Should succeed with schema evolution + executeMergeWithSchemaEvolution( + "target", + "source", + "target.key = source.key", + update("extra = -1"), insert("*")) + checkAnswer( + spark.table("target"), + Seq[(Integer, Integer, Integer)]((0, 0, null), (1, 10, -1), (2, 2, 2), (3, 30, null)) + .toDF("key", "value", "extra")) + } + } + } + testNestedStructsEvolution("nested field assignment qualified with source alias")( target = Seq("""{ "a": 1, "t": { "a": 2 } }"""), source = Seq("""{ "a": 3, "t": { "a": 5 } }"""),