Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Add dataframe reader options to unblock non-additive schema changes #4126

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -2312,6 +2312,10 @@
"Please check if you want to manually propagate the schema change(s) to the sink table before we proceed with stream processing using the finalized schema at version <currentSchemaChangeVersion>.",
"Once you have fixed the schema of the sink table or have decided there is no need to fix, you can set the following configuration(s) to unblock the non-additive schema change(s) and continue stream processing.",
"",
"Using dataframe reader option(s):",
"<readerOptions>",
"",
"Using SQL configuration(s):",
"To unblock for this particular stream just for this series of schema change(s):",
"<unblockChangeConfs>",
"To unblock for this particular stream:",
Expand All @@ -2331,6 +2335,10 @@
"Please check if you want to update your streaming query before we proceed with stream processing using the finalized schema at version <currentSchemaChangeVersion>.",
"Once you have updated your streaming query or have decided there is no need to update it, you can set the following configuration to unblock the type change(s) and continue stream processing.",
"",
"Using dataframe reader option:",
"<readerOptions>",
"",
"Using SQL configuration:",
"To unblock for this particular stream just for this series of schema change(s):",
"<unblockChangeConfs>",
"To unblock for this particular stream:",
Expand Down
10 changes: 10 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3120,7 +3120,11 @@ trait DeltaErrorsBase
previousSchemaChangeVersion: Long,
currentSchemaChangeVersion: Long,
checkpointHash: Int,
readerOptionsUnblock: Seq[String],
sqlConfsUnblock: Seq[String]): Throwable = {
val readerOptions = readerOptionsUnblock.map { option =>
s""" .option("$option", "true")"""
}.mkString("\n")
val unblockChangeConfs = sqlConfsUnblock.map { conf =>
s""" SET $conf.ckpt_$checkpointHash = $currentSchemaChangeVersion;"""
}.mkString("\n")
Expand All @@ -3138,6 +3142,7 @@ trait DeltaErrorsBase
previousSchemaChangeVersion.toString,
currentSchemaChangeVersion.toString,
currentSchemaChangeVersion.toString,
readerOptions,
unblockChangeConfs,
unblockStreamConfs,
unblockAllConfs
Expand All @@ -3149,6 +3154,7 @@ trait DeltaErrorsBase
previousSchemaChangeVersion: Long,
currentSchemaChangeVersion: Long,
checkpointHash: Int,
readerOptionsUnblock: Seq[String],
sqlConfsUnblock: Seq[String],
wideningTypeChanges: Seq[TypeChange]): Throwable = {

Expand All @@ -3157,6 +3163,9 @@ trait DeltaErrorsBase
s"${change.toType.sql}"
}.mkString("\n")

val readerOptions = readerOptionsUnblock.map { option =>
s""" .option("$option", "true")"""
}.mkString("\n")
val unblockChangeConfs = sqlConfsUnblock.map { conf =>
s""" SET $conf.ckpt_$checkpointHash = $currentSchemaChangeVersion;"""
}.mkString("\n")
Expand All @@ -3174,6 +3183,7 @@ trait DeltaErrorsBase
currentSchemaChangeVersion.toString,
wideningTypeChangesStr,
currentSchemaChangeVersion.toString,
readerOptions,
unblockChangeConfs,
unblockStreamConfs,
unblockAllConfs
Expand Down
13 changes: 13 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,15 @@ trait DeltaReadOptions extends DeltaOptionParser {
val schemaTrackingLocation = options.get(SCHEMA_TRACKING_LOCATION)

val sourceTrackingId = options.get(STREAMING_SOURCE_TRACKING_ID)

val allowSourceColumnRename = options.get(ALLOW_SOURCE_COLUMN_RENAME)
.exists(toBoolean(_, ALLOW_SOURCE_COLUMN_RENAME))

val allowSourceColumnDrop = options.get(ALLOW_SOURCE_COLUMN_DROP)
.exists(toBoolean(_, ALLOW_SOURCE_COLUMN_DROP))

val allowSourceColumnTypeChange = options.get(ALLOW_SOURCE_COLUMN_TYPE_CHANGE)
.exists(toBoolean(_, ALLOW_SOURCE_COLUMN_TYPE_CHANGE))
}


Expand Down Expand Up @@ -289,6 +298,10 @@ object DeltaOptions extends DeltaLogging {
*/
val STREAMING_SOURCE_TRACKING_ID = "streamingSourceTrackingId"

val ALLOW_SOURCE_COLUMN_DROP = "allowSourceColumnDrop"
val ALLOW_SOURCE_COLUMN_RENAME = "allowSourceColumnRename"
val ALLOW_SOURCE_COLUMN_TYPE_CHANGE = "allowSourceColumnTypeChange"

/**
* An option to control if delta will write partition columns to data files
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ object DeltaDataSource extends DatabricksLogging {

DeltaSourceMetadataTrackingLog.create(
spark, schemaTrackingLocation, sourceSnapshot,
Option(options.get(DeltaOptions.STREAMING_SOURCE_TRACKING_ID)),
parameters,
sourceMetadataPathOpt,
mergeConsecutiveSchemaChanges
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,48 +460,61 @@ object DeltaSourceMetadataEvolutionSupport {
val isDrop: Boolean
val isTypeWidening: Boolean
val sqlConfsUnblock: Seq[String]
val readerOptionsUnblock: Seq[String]
}

// Single types of schema change, typically caused by a single ALTER TABLE operation.
private case object SchemaChangeRename extends SchemaChangeType {
override val name = "RENAME COLUMN"
override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_RENAME)
override val (isRename, isDrop, isTypeWidening) = (true, false, false)
override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_RENAME)
override val readerOptionsUnblock: Seq[String] = Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_RENAME)
}
private case object SchemaChangeDrop extends SchemaChangeType {
override val name = "DROP COLUMN"
override val (isRename, isDrop, isTypeWidening) = (false, true, false)
override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_DROP)
override val readerOptionsUnblock: Seq[String] = Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_DROP)
}
private case object SchemaChangeTypeWidening extends SchemaChangeType {
override val name = "TYPE WIDENING"
override val (isRename, isDrop, isTypeWidening) = (false, false, true)
override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_TYPE_CHANGE)
override val readerOptionsUnblock: Seq[String] =
Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_TYPE_CHANGE)
}

// Combinations of rename, drop and type change -> can be caused by a complete overwrite.
private case object SchemaChangeRenameAndDrop extends SchemaChangeType {
override val name = "RENAME AND DROP COLUMN"
override val (isRename, isDrop, isTypeWidening) = (true, true, false)
override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_RENAME_DROP)
override val readerOptionsUnblock: Seq[String] =
Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_RENAME, DeltaOptions.ALLOW_SOURCE_COLUMN_DROP)
}
private case object SchemaChangeRenameAndTypeWidening extends SchemaChangeType {
override val name = "RENAME AND TYPE WIDENING"
override val (isRename, isDrop, isTypeWidening) = (true, false, true)
override val sqlConfsUnblock: Seq[String] =
Seq(SQL_CONF_UNBLOCK_RENAME, SQL_CONF_UNBLOCK_TYPE_CHANGE)
override val readerOptionsUnblock: Seq[String] =
Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_RENAME, DeltaOptions.ALLOW_SOURCE_COLUMN_DROP)
}
private case object SchemaChangeDropAndTypeWidening extends SchemaChangeType {
override val name = "DROP AND TYPE WIDENING"
override val (isRename, isDrop, isTypeWidening) = (false, true, true)
override val sqlConfsUnblock: Seq[String] =
Seq(SQL_CONF_UNBLOCK_DROP, SQL_CONF_UNBLOCK_TYPE_CHANGE)
override val readerOptionsUnblock: Seq[String] =
Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_DROP, DeltaOptions.ALLOW_SOURCE_COLUMN_TYPE_CHANGE)
}
private case object SchemaChangeRenameAndDropAndTypeWidening extends SchemaChangeType {
override val name = "RENAME, DROP AND TYPE WIDENING"
override val (isRename, isDrop, isTypeWidening) = (true, true, true)
override val sqlConfsUnblock: Seq[String] =
Seq(SQL_CONF_UNBLOCK_RENAME_DROP, SQL_CONF_UNBLOCK_TYPE_CHANGE)
override val readerOptionsUnblock: Seq[String] =
Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_DROP, DeltaOptions.ALLOW_SOURCE_COLUMN_TYPE_CHANGE)
}

private final val allSchemaChangeTypes = Seq(
Expand Down Expand Up @@ -546,6 +559,7 @@ object DeltaSourceMetadataEvolutionSupport {
private def isChangeUnblocked(
spark: SparkSession,
change: SchemaChangeType,
options: DeltaOptions,
checkpointHash: Int,
schemaChangeVersion: Long): Boolean = {

Expand All @@ -561,11 +575,13 @@ object DeltaSourceMetadataEvolutionSupport {
validConfKeysValuePair.exists(p => getConf(p._1).contains(p._2))
}

val isBlockedRename = change.isRename && !isUnblockedBySQLConf(SQL_CONF_UNBLOCK_RENAME) &&
val isBlockedRename = change.isRename && !options.allowSourceColumnRename &&
!isUnblockedBySQLConf(SQL_CONF_UNBLOCK_RENAME) &&
!isUnblockedBySQLConf(SQL_CONF_UNBLOCK_RENAME_DROP)
val isBlockedDrop = change.isDrop && !isUnblockedBySQLConf(SQL_CONF_UNBLOCK_DROP) &&
val isBlockedDrop = change.isDrop && !options.allowSourceColumnDrop &&
!isUnblockedBySQLConf(SQL_CONF_UNBLOCK_DROP) &&
!isUnblockedBySQLConf(SQL_CONF_UNBLOCK_RENAME_DROP)
val isBlockedTypeChange = change.isTypeWidening &&
val isBlockedTypeChange = change.isTypeWidening && !options.allowSourceColumnTypeChange &&
!isUnblockedBySQLConf(SQL_CONF_UNBLOCK_TYPE_CHANGE)

!isBlockedRename && !isBlockedDrop && !isBlockedTypeChange
Expand Down Expand Up @@ -620,9 +636,11 @@ object DeltaSourceMetadataEvolutionSupport {
// scalastyle:on
protected[sources] def validateIfSchemaChangeCanBeUnblockedWithSQLConf(
spark: SparkSession,
parameters: Map[String, String],
metadataPath: String,
currentSchema: PersistedMetadata,
previousSchema: PersistedMetadata): Unit = {
val options = new DeltaOptions(parameters, spark.sessionState.conf)
val checkpointHash = getCheckpointHash(metadataPath)

// The start version of a possible series of consecutive schema changes.
Expand All @@ -644,7 +662,7 @@ object DeltaSourceMetadataEvolutionSupport {
determineNonAdditiveSchemaChangeType(
spark, currentSchema.dataSchema, previousSchema.dataSchema).foreach { change =>
if (!isChangeUnblocked(
spark, change, checkpointHash, currentSchemaChangeVersion)) {
spark, change, options, checkpointHash, currentSchemaChangeVersion)) {
// Throw error to prompt user to set the correct confs
change match {
case SchemaChangeTypeWidening =>
Expand All @@ -656,6 +674,7 @@ object DeltaSourceMetadataEvolutionSupport {
previousSchemaChangeVersion,
currentSchemaChangeVersion,
checkpointHash,
change.readerOptionsUnblock,
change.sqlConfsUnblock,
wideningTypeChanges)

Expand All @@ -665,6 +684,7 @@ object DeltaSourceMetadataEvolutionSupport {
previousSchemaChangeVersion,
currentSchemaChangeVersion,
checkpointHash,
change.readerOptionsUnblock,
change.sqlConfsUnblock)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.sources
// scalastyle:off import.ordering.noEmptyLine
import java.io.InputStream

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.spark.sql.delta.streaming.{JsonSchemaSerializer, PartitionAndDataSchema, SchemaTrackingLog}
Expand All @@ -33,6 +34,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
// scalastyle:on import.ordering.noEmptyLine

/**
Expand Down Expand Up @@ -240,10 +242,12 @@ object DeltaSourceMetadataTrackingLog extends Logging {
sparkSession: SparkSession,
rootMetadataLocation: String,
sourceSnapshot: SnapshotDescriptor,
sourceTrackingId: Option[String] = None,
parameters: Map[String, String],
sourceMetadataPathOpt: Option[String] = None,
mergeConsecutiveSchemaChanges: Boolean = false,
initMetadataLogEagerly: Boolean = true): DeltaSourceMetadataTrackingLog = {
val options = new CaseInsensitiveStringMap(parameters.asJava)
val sourceTrackingId = Option(options.get(DeltaOptions.STREAMING_SOURCE_TRACKING_ID))
val metadataTrackingLocation = fullMetadataTrackingLocation(
rootMetadataLocation, sourceSnapshot.deltaLog.tableId, sourceTrackingId)
val log = new DeltaSourceMetadataTrackingLog(
Expand Down Expand Up @@ -296,7 +300,8 @@ object DeltaSourceMetadataTrackingLog extends Logging {
(log.getPreviousTrackedMetadata, log.getCurrentTrackedMetadata, sourceMetadataPathOpt) match {
case (Some(prev), Some(curr), Some(metadataPath)) =>
DeltaSourceMetadataEvolutionSupport
.validateIfSchemaChangeCanBeUnblockedWithSQLConf(sparkSession, metadataPath, curr, prev)
.validateIfSchemaChangeCanBeUnblockedWithSQLConf(
sparkSession, parameters, metadataPath, curr, prev)
case _ =>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2720,6 +2720,7 @@ trait DeltaErrorsSuiteBase
nonAdditiveSchemaChangeOpType = "RENAME AND TYPE WIDENING",
previousSchemaChangeVersion = 0,
currentSchemaChangeVersion = 1,
readerOptionsUnblock = Seq("allowSourceColumnRename", "allowSourceColumnTypeChange"),
sqlConfsUnblock = Seq(
"spark.databricks.delta.streaming.allowSourceColumnRename",
"spark.databricks.delta.streaming.allowSourceColumnTypeChange"),
Expand All @@ -2731,6 +2732,9 @@ trait DeltaErrorsSuiteBase
"opType" -> "RENAME AND TYPE WIDENING",
"previousSchemaChangeVersion" -> "0",
"currentSchemaChangeVersion" -> "1",
"readerOptions" ->
s""" .option("allowSourceColumnRename", "true")
| .option("allowSourceColumnTypeChange", "true")""".stripMargin,
"unblockChangeConfs" ->
s""" SET spark.databricks.delta.streaming.allowSourceColumnRename.ckpt_15 = 1;
| SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_15 = 1;""".stripMargin,
Expand All @@ -2748,6 +2752,7 @@ trait DeltaErrorsSuiteBase
throw DeltaErrors.cannotContinueStreamingTypeWidening(
previousSchemaChangeVersion = 0,
currentSchemaChangeVersion = 1,
readerOptionsUnblock = Seq("allowSourceColumnTypeChange"),
sqlConfsUnblock = Seq("spark.databricks.delta.streaming.allowSourceColumnTypeChange"),
checkpointHash = 15,
wideningTypeChanges = Seq(TypeChange(None, IntegerType, LongType, Seq("a"))))
Expand All @@ -2758,6 +2763,7 @@ trait DeltaErrorsSuiteBase
"previousSchemaChangeVersion" -> "0",
"currentSchemaChangeVersion" -> "1",
"wideningTypeChanges" -> " a: INT -> BIGINT",
"readerOptions" -> " .option(\"allowSourceColumnTypeChange\", \"true\")",
"unblockChangeConfs" ->
" SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_15 = 1;",
"unblockStreamConfs" ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ trait StreamingSchemaEvolutionSuiteBase extends ColumnMappingStreamingTestUtils
initializeEagerly: Boolean = true
)(implicit log: DeltaLog): DeltaSourceMetadataTrackingLog =
DeltaSourceMetadataTrackingLog.create(
spark, getDefaultSchemaLocation.toString, log.update(), sourceTrackingId,
spark, getDefaultSchemaLocation.toString, log.update(),
parameters = sourceTrackingId.map(DeltaOptions.STREAMING_SOURCE_TRACKING_ID -> _).toMap,
initMetadataLogEagerly = initializeEagerly)

protected def getDefaultCheckpoint(implicit log: DeltaLog): Path =
Expand Down Expand Up @@ -534,8 +535,10 @@ trait StreamingSchemaEvolutionSuiteBase extends ColumnMappingStreamingTestUtils
// of the case; True concurrent execution would require commit service to protected against.
val schemaLocation = getDefaultSchemaLocation.toString
val snapshot = log.update()
val schemaLog1 = DeltaSourceMetadataTrackingLog.create(spark, schemaLocation, snapshot)
val schemaLog2 = DeltaSourceMetadataTrackingLog.create(spark, schemaLocation, snapshot)
val schemaLog1 = DeltaSourceMetadataTrackingLog.create(
spark, schemaLocation, snapshot, parameters = Map.empty)
val schemaLog2 = DeltaSourceMetadataTrackingLog.create(
spark, schemaLocation, snapshot, Map.empty)
val newSchema =
PersistedMetadata("1", 1,
makeMetadata(new StructType(), partitionSchema = new StructType()),
Expand Down Expand Up @@ -1607,9 +1610,9 @@ trait StreamingSchemaEvolutionSuiteBase extends ColumnMappingStreamingTestUtils

// Both schema log initialized
def schemaLog1: DeltaSourceMetadataTrackingLog = DeltaSourceMetadataTrackingLog.create(
spark, schemaLog1Location, log.update())
spark, schemaLog1Location, log.update(), parameters = Map.empty)
def schemaLog2: DeltaSourceMetadataTrackingLog = DeltaSourceMetadataTrackingLog.create(
spark, schemaLog2Location, log.update())
spark, schemaLog2Location, log.update(), parameters = Map.empty)

// The schema log initializes @ v5 with schema <a, b>
testStream(df)(
Expand Down
Loading
Loading