diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index b40bdb3954e..664d5d6c0ea 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -2312,6 +2312,10 @@ "Please check if you want to manually propagate the schema change(s) to the sink table before we proceed with stream processing using the finalized schema at version .", "Once you have fixed the schema of the sink table or have decided there is no need to fix, you can set the following configuration(s) to unblock the non-additive schema change(s) and continue stream processing.", "", + "Using dataframe reader option(s):", + "", + "", + "Using SQL configuration(s):", "To unblock for this particular stream just for this series of schema change(s):", "", "To unblock for this particular stream:", @@ -2331,6 +2335,10 @@ "Please check if you want to update your streaming query before we proceed with stream processing using the finalized schema at version .", "Once you have updated your streaming query or have decided there is no need to update it, you can set the following configuration to unblock the type change(s) and continue stream processing.", "", + "Using dataframe reader option:", + "", + "", + "Using SQL configuration:", "To unblock for this particular stream just for this series of schema change(s):", "", "To unblock for this particular stream:", diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index d6656c59e51..9663c09430d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -3120,7 +3120,11 @@ trait DeltaErrorsBase previousSchemaChangeVersion: Long, currentSchemaChangeVersion: Long, checkpointHash: Int, + readerOptionsUnblock: Seq[String], sqlConfsUnblock: Seq[String]): Throwable = { + val readerOptions = readerOptionsUnblock.map { option => + s""" .option("$option", "true")""" + }.mkString("\n") val unblockChangeConfs = sqlConfsUnblock.map { conf => s""" SET $conf.ckpt_$checkpointHash = $currentSchemaChangeVersion;""" }.mkString("\n") @@ -3138,6 +3142,7 @@ trait DeltaErrorsBase previousSchemaChangeVersion.toString, currentSchemaChangeVersion.toString, currentSchemaChangeVersion.toString, + readerOptions, unblockChangeConfs, unblockStreamConfs, unblockAllConfs @@ -3149,6 +3154,7 @@ trait DeltaErrorsBase previousSchemaChangeVersion: Long, currentSchemaChangeVersion: Long, checkpointHash: Int, + readerOptionsUnblock: Seq[String], sqlConfsUnblock: Seq[String], wideningTypeChanges: Seq[TypeChange]): Throwable = { @@ -3157,6 +3163,9 @@ trait DeltaErrorsBase s"${change.toType.sql}" }.mkString("\n") + val readerOptions = readerOptionsUnblock.map { option => + s""" .option("$option", "true")""" + }.mkString("\n") val unblockChangeConfs = sqlConfsUnblock.map { conf => s""" SET $conf.ckpt_$checkpointHash = $currentSchemaChangeVersion;""" }.mkString("\n") @@ -3174,6 +3183,7 @@ trait DeltaErrorsBase currentSchemaChangeVersion.toString, wideningTypeChangesStr, currentSchemaChangeVersion.toString, + readerOptions, unblockChangeConfs, unblockStreamConfs, unblockAllConfs diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala index dd01fc0ba2f..ebcc978c8df 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala @@ -212,6 +212,15 @@ trait DeltaReadOptions extends DeltaOptionParser { val schemaTrackingLocation = options.get(SCHEMA_TRACKING_LOCATION) val sourceTrackingId = options.get(STREAMING_SOURCE_TRACKING_ID) + + val allowSourceColumnRename = options.get(ALLOW_SOURCE_COLUMN_RENAME) + .exists(toBoolean(_, ALLOW_SOURCE_COLUMN_RENAME)) + + val allowSourceColumnDrop = options.get(ALLOW_SOURCE_COLUMN_DROP) + .exists(toBoolean(_, ALLOW_SOURCE_COLUMN_DROP)) + + val allowSourceColumnTypeChange = options.get(ALLOW_SOURCE_COLUMN_TYPE_CHANGE) + .exists(toBoolean(_, ALLOW_SOURCE_COLUMN_TYPE_CHANGE)) } @@ -289,6 +298,10 @@ object DeltaOptions extends DeltaLogging { */ val STREAMING_SOURCE_TRACKING_ID = "streamingSourceTrackingId" + val ALLOW_SOURCE_COLUMN_DROP = "allowSourceColumnDrop" + val ALLOW_SOURCE_COLUMN_RENAME = "allowSourceColumnRename" + val ALLOW_SOURCE_COLUMN_TYPE_CHANGE = "allowSourceColumnTypeChange" + /** * An option to control if delta will write partition columns to data files */ diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala index f9a1e0d1e2c..00d3b6a105a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala @@ -451,7 +451,7 @@ object DeltaDataSource extends DatabricksLogging { DeltaSourceMetadataTrackingLog.create( spark, schemaTrackingLocation, sourceSnapshot, - Option(options.get(DeltaOptions.STREAMING_SOURCE_TRACKING_ID)), + parameters, sourceMetadataPathOpt, mergeConsecutiveSchemaChanges ) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupport.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupport.scala index 9c9687a17ee..6ea0a99ed6b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupport.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupport.scala @@ -460,23 +460,28 @@ object DeltaSourceMetadataEvolutionSupport { val isDrop: Boolean val isTypeWidening: Boolean val sqlConfsUnblock: Seq[String] + val readerOptionsUnblock: Seq[String] } // Single types of schema change, typically caused by a single ALTER TABLE operation. private case object SchemaChangeRename extends SchemaChangeType { override val name = "RENAME COLUMN" - override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_RENAME) override val (isRename, isDrop, isTypeWidening) = (true, false, false) + override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_RENAME) + override val readerOptionsUnblock: Seq[String] = Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_RENAME) } private case object SchemaChangeDrop extends SchemaChangeType { override val name = "DROP COLUMN" override val (isRename, isDrop, isTypeWidening) = (false, true, false) override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_DROP) + override val readerOptionsUnblock: Seq[String] = Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_DROP) } private case object SchemaChangeTypeWidening extends SchemaChangeType { override val name = "TYPE WIDENING" override val (isRename, isDrop, isTypeWidening) = (false, false, true) override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_TYPE_CHANGE) + override val readerOptionsUnblock: Seq[String] = + Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_TYPE_CHANGE) } // Combinations of rename, drop and type change -> can be caused by a complete overwrite. @@ -484,24 +489,32 @@ object DeltaSourceMetadataEvolutionSupport { override val name = "RENAME AND DROP COLUMN" override val (isRename, isDrop, isTypeWidening) = (true, true, false) override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_RENAME_DROP) + override val readerOptionsUnblock: Seq[String] = + Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_RENAME, DeltaOptions.ALLOW_SOURCE_COLUMN_DROP) } private case object SchemaChangeRenameAndTypeWidening extends SchemaChangeType { override val name = "RENAME AND TYPE WIDENING" override val (isRename, isDrop, isTypeWidening) = (true, false, true) override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_RENAME, SQL_CONF_UNBLOCK_TYPE_CHANGE) + override val readerOptionsUnblock: Seq[String] = + Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_RENAME, DeltaOptions.ALLOW_SOURCE_COLUMN_DROP) } private case object SchemaChangeDropAndTypeWidening extends SchemaChangeType { override val name = "DROP AND TYPE WIDENING" override val (isRename, isDrop, isTypeWidening) = (false, true, true) override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_DROP, SQL_CONF_UNBLOCK_TYPE_CHANGE) + override val readerOptionsUnblock: Seq[String] = + Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_DROP, DeltaOptions.ALLOW_SOURCE_COLUMN_TYPE_CHANGE) } private case object SchemaChangeRenameAndDropAndTypeWidening extends SchemaChangeType { override val name = "RENAME, DROP AND TYPE WIDENING" override val (isRename, isDrop, isTypeWidening) = (true, true, true) override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_RENAME_DROP, SQL_CONF_UNBLOCK_TYPE_CHANGE) + override val readerOptionsUnblock: Seq[String] = + Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_DROP, DeltaOptions.ALLOW_SOURCE_COLUMN_TYPE_CHANGE) } private final val allSchemaChangeTypes = Seq( @@ -546,6 +559,7 @@ object DeltaSourceMetadataEvolutionSupport { private def isChangeUnblocked( spark: SparkSession, change: SchemaChangeType, + options: DeltaOptions, checkpointHash: Int, schemaChangeVersion: Long): Boolean = { @@ -561,11 +575,13 @@ object DeltaSourceMetadataEvolutionSupport { validConfKeysValuePair.exists(p => getConf(p._1).contains(p._2)) } - val isBlockedRename = change.isRename && !isUnblockedBySQLConf(SQL_CONF_UNBLOCK_RENAME) && + val isBlockedRename = change.isRename && !options.allowSourceColumnRename && + !isUnblockedBySQLConf(SQL_CONF_UNBLOCK_RENAME) && !isUnblockedBySQLConf(SQL_CONF_UNBLOCK_RENAME_DROP) - val isBlockedDrop = change.isDrop && !isUnblockedBySQLConf(SQL_CONF_UNBLOCK_DROP) && + val isBlockedDrop = change.isDrop && !options.allowSourceColumnDrop && + !isUnblockedBySQLConf(SQL_CONF_UNBLOCK_DROP) && !isUnblockedBySQLConf(SQL_CONF_UNBLOCK_RENAME_DROP) - val isBlockedTypeChange = change.isTypeWidening && + val isBlockedTypeChange = change.isTypeWidening && !options.allowSourceColumnTypeChange && !isUnblockedBySQLConf(SQL_CONF_UNBLOCK_TYPE_CHANGE) !isBlockedRename && !isBlockedDrop && !isBlockedTypeChange @@ -620,9 +636,11 @@ object DeltaSourceMetadataEvolutionSupport { // scalastyle:on protected[sources] def validateIfSchemaChangeCanBeUnblockedWithSQLConf( spark: SparkSession, + parameters: Map[String, String], metadataPath: String, currentSchema: PersistedMetadata, previousSchema: PersistedMetadata): Unit = { + val options = new DeltaOptions(parameters, spark.sessionState.conf) val checkpointHash = getCheckpointHash(metadataPath) // The start version of a possible series of consecutive schema changes. @@ -644,7 +662,7 @@ object DeltaSourceMetadataEvolutionSupport { determineNonAdditiveSchemaChangeType( spark, currentSchema.dataSchema, previousSchema.dataSchema).foreach { change => if (!isChangeUnblocked( - spark, change, checkpointHash, currentSchemaChangeVersion)) { + spark, change, options, checkpointHash, currentSchemaChangeVersion)) { // Throw error to prompt user to set the correct confs change match { case SchemaChangeTypeWidening => @@ -656,6 +674,7 @@ object DeltaSourceMetadataEvolutionSupport { previousSchemaChangeVersion, currentSchemaChangeVersion, checkpointHash, + change.readerOptionsUnblock, change.sqlConfsUnblock, wideningTypeChanges) @@ -665,6 +684,7 @@ object DeltaSourceMetadataEvolutionSupport { previousSchemaChangeVersion, currentSchemaChangeVersion, checkpointHash, + change.readerOptionsUnblock, change.sqlConfsUnblock) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataTrackingLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataTrackingLog.scala index df3e4317424..c47074a043b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataTrackingLog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataTrackingLog.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.sources // scalastyle:off import.ordering.noEmptyLine import java.io.InputStream +import scala.collection.JavaConverters._ import scala.util.control.NonFatal import org.apache.spark.sql.delta.streaming.{JsonSchemaSerializer, PartitionAndDataSchema, SchemaTrackingLog} @@ -33,6 +34,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap // scalastyle:on import.ordering.noEmptyLine /** @@ -240,10 +242,12 @@ object DeltaSourceMetadataTrackingLog extends Logging { sparkSession: SparkSession, rootMetadataLocation: String, sourceSnapshot: SnapshotDescriptor, - sourceTrackingId: Option[String] = None, + parameters: Map[String, String], sourceMetadataPathOpt: Option[String] = None, mergeConsecutiveSchemaChanges: Boolean = false, initMetadataLogEagerly: Boolean = true): DeltaSourceMetadataTrackingLog = { + val options = new CaseInsensitiveStringMap(parameters.asJava) + val sourceTrackingId = Option(options.get(DeltaOptions.STREAMING_SOURCE_TRACKING_ID)) val metadataTrackingLocation = fullMetadataTrackingLocation( rootMetadataLocation, sourceSnapshot.deltaLog.tableId, sourceTrackingId) val log = new DeltaSourceMetadataTrackingLog( @@ -296,7 +300,8 @@ object DeltaSourceMetadataTrackingLog extends Logging { (log.getPreviousTrackedMetadata, log.getCurrentTrackedMetadata, sourceMetadataPathOpt) match { case (Some(prev), Some(curr), Some(metadataPath)) => DeltaSourceMetadataEvolutionSupport - .validateIfSchemaChangeCanBeUnblockedWithSQLConf(sparkSession, metadataPath, curr, prev) + .validateIfSchemaChangeCanBeUnblockedWithSQLConf( + sparkSession, parameters, metadataPath, curr, prev) case _ => } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala index b86cd20f256..353f59826ed 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala @@ -2720,6 +2720,7 @@ trait DeltaErrorsSuiteBase nonAdditiveSchemaChangeOpType = "RENAME AND TYPE WIDENING", previousSchemaChangeVersion = 0, currentSchemaChangeVersion = 1, + readerOptionsUnblock = Seq("allowSourceColumnRename", "allowSourceColumnTypeChange"), sqlConfsUnblock = Seq( "spark.databricks.delta.streaming.allowSourceColumnRename", "spark.databricks.delta.streaming.allowSourceColumnTypeChange"), @@ -2731,6 +2732,9 @@ trait DeltaErrorsSuiteBase "opType" -> "RENAME AND TYPE WIDENING", "previousSchemaChangeVersion" -> "0", "currentSchemaChangeVersion" -> "1", + "readerOptions" -> + s""" .option("allowSourceColumnRename", "true") + | .option("allowSourceColumnTypeChange", "true")""".stripMargin, "unblockChangeConfs" -> s""" SET spark.databricks.delta.streaming.allowSourceColumnRename.ckpt_15 = 1; | SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_15 = 1;""".stripMargin, @@ -2748,6 +2752,7 @@ trait DeltaErrorsSuiteBase throw DeltaErrors.cannotContinueStreamingTypeWidening( previousSchemaChangeVersion = 0, currentSchemaChangeVersion = 1, + readerOptionsUnblock = Seq("allowSourceColumnTypeChange"), sqlConfsUnblock = Seq("spark.databricks.delta.streaming.allowSourceColumnTypeChange"), checkpointHash = 15, wideningTypeChanges = Seq(TypeChange(None, IntegerType, LongType, Seq("a")))) @@ -2758,6 +2763,7 @@ trait DeltaErrorsSuiteBase "previousSchemaChangeVersion" -> "0", "currentSchemaChangeVersion" -> "1", "wideningTypeChanges" -> " a: INT -> BIGINT", + "readerOptions" -> " .option(\"allowSourceColumnTypeChange\", \"true\")", "unblockChangeConfs" -> " SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_15 = 1;", "unblockStreamConfs" -> diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala index d697052c49c..bf23b924561 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala @@ -222,7 +222,8 @@ trait StreamingSchemaEvolutionSuiteBase extends ColumnMappingStreamingTestUtils initializeEagerly: Boolean = true )(implicit log: DeltaLog): DeltaSourceMetadataTrackingLog = DeltaSourceMetadataTrackingLog.create( - spark, getDefaultSchemaLocation.toString, log.update(), sourceTrackingId, + spark, getDefaultSchemaLocation.toString, log.update(), + parameters = sourceTrackingId.map(DeltaOptions.STREAMING_SOURCE_TRACKING_ID -> _).toMap, initMetadataLogEagerly = initializeEagerly) protected def getDefaultCheckpoint(implicit log: DeltaLog): Path = @@ -534,8 +535,10 @@ trait StreamingSchemaEvolutionSuiteBase extends ColumnMappingStreamingTestUtils // of the case; True concurrent execution would require commit service to protected against. val schemaLocation = getDefaultSchemaLocation.toString val snapshot = log.update() - val schemaLog1 = DeltaSourceMetadataTrackingLog.create(spark, schemaLocation, snapshot) - val schemaLog2 = DeltaSourceMetadataTrackingLog.create(spark, schemaLocation, snapshot) + val schemaLog1 = DeltaSourceMetadataTrackingLog.create( + spark, schemaLocation, snapshot, parameters = Map.empty) + val schemaLog2 = DeltaSourceMetadataTrackingLog.create( + spark, schemaLocation, snapshot, Map.empty) val newSchema = PersistedMetadata("1", 1, makeMetadata(new StructType(), partitionSchema = new StructType()), @@ -1607,9 +1610,9 @@ trait StreamingSchemaEvolutionSuiteBase extends ColumnMappingStreamingTestUtils // Both schema log initialized def schemaLog1: DeltaSourceMetadataTrackingLog = DeltaSourceMetadataTrackingLog.create( - spark, schemaLog1Location, log.update()) + spark, schemaLog1Location, log.update(), parameters = Map.empty) def schemaLog2: DeltaSourceMetadataTrackingLog = DeltaSourceMetadataTrackingLog.create( - spark, schemaLog2Location, log.update()) + spark, schemaLog2Location, log.update(), parameters = Map.empty) // The schema log initializes @ v5 with schema testStream(df)( diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupportSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupportSuite.scala index 4669b1d2666..5ede40cfd13 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupportSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupportSuite.scala @@ -16,7 +16,7 @@ package org.apache.spark.sql.delta.sources -import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaTestUtilsBase, DeltaThrowable} +import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaOptions, DeltaTestUtilsBase, DeltaThrowable} import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.sql.test.SharedSparkSession @@ -117,28 +117,41 @@ class DeltaSourceMetadataEvolutionSupportSuite unblock: Seq[Seq[String]] = Seq.empty, confs: Seq[(String, String)] = Seq.empty): Unit = test(s"$name") { - def validate(): Unit = + def validate(parameters: Map[String, String]): Unit = DeltaSourceMetadataEvolutionSupport.validateIfSchemaChangeCanBeUnblockedWithSQLConf( spark, - metadataPath = "sourceMetadataPath", + parameters, + metadataPath = "sourceMetadataPath", currentSchema = persistedMetadata(toDDL, toPhysicalNames), previousSchema = persistedMetadata(fromDDL, fromPhysicalNames) ) withSQLConf(confs: _*) { expectedResult match { - case ExpectedResult.Success(_) => validate() + case ExpectedResult.Success(_) => validate(parameters = Map.empty) case ExpectedResult.Failure(checkError) => + // Run first without setting any configuration to unblock and check that the validation + // fails => column dropped, renamed or with changed type. val ex = intercept[DeltaThrowable] { - validate() + validate(parameters = Map.empty) } checkError(ex) // Verify that we can unblock using SQL confs for (u <- unblock) { withSQLConfUnblockedChanges(u) { - validate() + validate(parameters = Map.empty) } } + // Verify that we can unblock using dataframe reader options. + for (u <- unblock) { + val parameters = u.flatMap { + case "allowSourceColumnRenameAndDrop" => + Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_RENAME -> "true", + DeltaOptions.ALLOW_SOURCE_COLUMN_DROP -> "true") + case option => Seq(option -> "true") + } + validate(parameters.toMap) + } } } } @@ -600,6 +613,19 @@ class DeltaSourceMetadataEvolutionSupportSuite withSQLConfUnblockedChanges(Seq("allowSourceColumnRename", "allowSourceColumnDrop")) { DeltaSourceMetadataEvolutionSupport.validateIfSchemaChangeCanBeUnblockedWithSQLConf( spark, + parameters = Map.empty, + metadataPath = "sourceMetadataPath", + currentSchema = persistedMetadata("a int", Map(Seq("a") -> "b")), + previousSchema = persistedMetadata("a int, b int", Map.empty) + ) + } + } + + test("combining SQL confs and reader options to unblock is supported") { + withSQLConfUnblockedChanges(Seq("allowSourceColumnRename")) { + DeltaSourceMetadataEvolutionSupport.validateIfSchemaChangeCanBeUnblockedWithSQLConf( + spark, + parameters = Map("allowSourceColumnDrop" -> "true"), metadataPath = "sourceMetadataPath", currentSchema = persistedMetadata("a int", Map(Seq("a") -> "b")), previousSchema = persistedMetadata("a int, b int", Map.empty) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningStreamingSourceSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningStreamingSourceSuite.scala index ec93c5c1d43..4a9c813707a 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningStreamingSourceSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningStreamingSourceSuite.scala @@ -470,6 +470,8 @@ trait TypeWideningStreamingSourceTests "opType" -> "DROP AND TYPE WIDENING", "previousSchemaChangeVersion" -> "0", "currentSchemaChangeVersion" -> "2", + "readerOptions" -> ( + ".*allowSourceColumnDrop(.|\\n)*allowSourceColumnTypeChange.*"), "unblockChangeConfs" -> ".*allowSourceColumnDrop(.|\\n)*allowSourceColumnTypeChange.*", "unblockStreamConfs" ->