Skip to content

Commit

Permalink
Address comments + more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
johanl-db committed Mar 22, 2024
1 parent 4c476d1 commit 3d08e11
Showing 1 changed file with 65 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,21 @@

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkConf
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.{DataFrame, Dataset, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap


/**
Expand Down Expand Up @@ -350,8 +357,8 @@ trait DeltaMergeIntoTypeWideningSchemaEvolutionTests
condition = "target.a = source.a")
.whenNotMatched().insertAll()

// The MERGE operation was created with type widening enabled, which will apply during analysis.
// Disable type widening so that the actual execution runs with type widening disabled.
// The MERGE operation was created with the previous type widening value, which will apply
// during analysis. Toggle type widening so that the actual MERGE runs with a different setting.
enableTypeWidening(tempPath, enabled)
intercept[MetadataChangedException] {
merge.execute()
Expand All @@ -360,17 +367,18 @@ trait DeltaMergeIntoTypeWideningSchemaEvolutionTests
}

/**
* Tests covering type widening during schema evolution in MERGE INTO.
* Tests covering type widening during schema evolution in INSERT.
*/
trait DeltaInsertTypeWideningSchemaEvolutionTeststs extends DeltaTypeWideningTestCases {
self: QueryTest with DeltaTypeWideningTestMixin with DeltaDMLTestUtils =>

import testImplicits._
import scala.collection.JavaConverters._

for {
testCase <- supportedTestCases
} {
test(s"automatic type widening in insert ${testCase.fromType.sql} -> ${testCase.toType.sql}") {
test(s"INSERT - automatic type widening ${testCase.fromType.sql} -> ${testCase.toType.sql}") {
append(testCase.initialValuesDF)
testCase.additionalValuesDF
.write
Expand All @@ -386,7 +394,7 @@ trait DeltaInsertTypeWideningSchemaEvolutionTeststs extends DeltaTypeWideningTes
for {
testCase <- unsupportedTestCases
} {
test(s"unsupported automatic type widening in insert " +
test(s"INSERT - unsupported automatic type widening " +
s"${testCase.fromType.sql} -> ${testCase.toType.sql}") {
append(testCase.initialValuesDF)
// Test cases for some of the unsupported type changes may overflow while others only have
Expand All @@ -401,7 +409,7 @@ trait DeltaInsertTypeWideningSchemaEvolutionTeststs extends DeltaTypeWideningTes
}
}

test("type widening isn't applied in insert when schema evolution is disabled") {
test("INSERT - type widening isn't applied when schema evolution is disabled") {
sql(s"CREATE TABLE delta.`$tempPath` (a short) USING DELTA")
withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "false") {
// Insert integer values. This should succeed and downcast the values to short.
Expand All @@ -419,6 +427,55 @@ trait DeltaInsertTypeWideningSchemaEvolutionTeststs extends DeltaTypeWideningTes
}
}

test("INSERT - type widening isn't applied when it's disabled") {
sql(s"CREATE TABLE delta.`$tempPath` (a short) USING DELTA")
enableTypeWidening(tempPath, enabled = false)
sql(s"INSERT INTO delta.`$tempPath` VALUES (1), (2)")
assert(readDeltaTable(tempPath).schema("a").dataType === ShortType)
checkAnswer(readDeltaTable(tempPath),
Seq(1, 2).toDF("a").select($"a".cast(ShortType)))
}

/**
* Short-hand to create a logical plan to insert into the table. This captures the state of the
* table at the time the method is called, e.p. the type widening property value that will be used
* during analysis.
*/
private def createInsertPlan(df: DataFrame): LogicalPlan = {
val relation = DataSourceV2Relation.create(
table = DeltaTableV2(spark, new Path(tempPath)),
catalog = None,
identifier = None,
options = new CaseInsensitiveStringMap(Map.empty[String, String].asJava)
)
AppendData.byPosition(relation, df.queryExecution.logical)
}

test(s"INSERT - fail if type widening gets enabled by a concurrent transaction") {
sql(s"CREATE TABLE delta.`$tempPath` (a short) USING DELTA")
enableTypeWidening(tempPath, enabled = false)
val insert = createInsertPlan(Seq(1).toDF("a"))
// Enabling type widening after analysis doesn't impact the insert operation: the data is
// already cast to conform to the current schema.
enableTypeWidening(tempPath, enabled = true)
Dataset.ofRows(spark, insert).collect()
assert(readDeltaTable(tempPath).schema == new StructType().add("a", ShortType))
checkAnswer(readDeltaTable(tempPath), Row(1))
}

test(s"INSERT - fail if type widening gets disabled by a concurrent transaction") {
sql(s"CREATE TABLE delta.`$tempPath` (a short) USING DELTA")
val insert = createInsertPlan(Seq(1).toDF("a"))
// Disabling type widening after analysis results in inserting data with a wider type into the
// table while type widening is actually disabled during execution. We do actually widen the
// table schema in that case because `short` and `int` are both stored as INT32 in parquet.
enableTypeWidening(tempPath, enabled = false)
Dataset.ofRows(spark, insert).collect()
assert(readDeltaTable(tempPath).schema == new StructType().add("a", IntegerType))
checkAnswer(readDeltaTable(tempPath), Row(1))
}


/**
* There are **many** different ways to run an insert:
* - Using SQL or the dataframe v1 and v2 APIs.
Expand Down Expand Up @@ -728,7 +785,7 @@ trait DeltaInsertTypeWideningSchemaEvolutionTeststs extends DeltaTypeWideningTes
)


testInsertTypeEvolution("nested struct type evolution with field upcast")(
testInsertTypeEvolution("nested struct type evolution with field upcast")(
initialSchemaDDL = "key int, s struct<x: int, y: short>",
initialJsonData = Seq("""{ "key": 1, "s": { "x": 1, "y": 2 } }"""),
partitionBy = Seq("key"),
Expand Down

0 comments on commit 3d08e11

Please sign in to comment.