Skip to content

Commit bd1c935

Browse files
[Spark] Fix Utils.isTesting calls to use Delta implementation (#4074)
#### Which Delta project/connector is this regarding? - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Some classes were using the `org.apache.spark.util.Utils` instead of the `org.apache.spark.sql.delta.util.Utils` The Spark implementation uses the SPARK_TESTING environment variable, while the Delta implementation uses the DELTA_TESTING key. Spark: https://github.com/apache/spark/blob/51fb84a54982719209c19136b1d72d2ef44726ee/core/src/main/scala/org/apache/spark/util/Utils.scala#L1878 Delta: https://github.com/delta-io/delta/blob/221d95cd69fdb9ff3f69cdd842c5c13ed47fd687/spark/src/main/scala/org/apache/spark/sql/delta/util/Utils.scala#L58 It means the unit tests are currently running non test code path, because we only set the DELTA_TESTING: https://github.com/delta-io/delta/blob/221d95cd69fdb9ff3f69cdd842c5c13ed47fd687/build.sbt#L466 https://github.com/delta-io/delta/blob/221d95cd69fdb9ff3f69cdd842c5c13ed47fd687/run-tests.py#L93 ## How was this patch tested? Unit tests ## Does this PR introduce _any_ user-facing changes? No Signed-off-by: Felipe Pessoto <[email protected]> Co-authored-by: Venki Korukanti <[email protected]>
1 parent 961d39a commit bd1c935

File tree

11 files changed

+33
-27
lines changed

11 files changed

+33
-27
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.spark.sql.delta.storage.LogStore
3434
import org.apache.spark.sql.delta.util.{DeltaFileOperations, DeltaLogGroupingIterator, FileNames}
3535
import org.apache.spark.sql.delta.util.FileNames._
3636
import org.apache.spark.sql.delta.util.JsonUtils
37+
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
3738
import org.apache.hadoop.conf.Configuration
3839
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
3940
import org.apache.hadoop.mapred.{JobConf, TaskAttemptContextImpl, TaskAttemptID}
@@ -280,7 +281,7 @@ trait Checkpoints extends DeltaLogging {
280281
data = Map("exception" -> e.getMessage(), "stackTrace" -> e.getStackTrace())
281282
)
282283
logWarning(log"Error when writing checkpoint-related files", e)
283-
val throwError = Utils.isTesting ||
284+
val throwError = DeltaUtils.isTesting ||
284285
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CHECKPOINT_THROW_EXCEPTION_WHEN_FAILED)
285286
if (throwError) throw e
286287
}
@@ -1081,7 +1082,7 @@ object Checkpoints
10811082
// overrides the final path even if it already exists. So we use exists here to handle that
10821083
// case.
10831084
// TODO: Remove isTesting and fs.exists check after fixing LocalFS
1084-
if (Utils.isTesting && fs.exists(finalPath)) {
1085+
if (DeltaUtils.isTesting && fs.exists(finalPath)) {
10851086
false
10861087
} else {
10871088
fs.rename(tempPath, finalPath)

spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.sql.delta.stats.DeletedRecordCountsHistogram
3535
import org.apache.spark.sql.delta.stats.FileSizeHistogram
3636
import org.apache.spark.sql.delta.storage.LogStore
3737
import org.apache.spark.sql.delta.util.{FileNames, JsonUtils}
38+
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
3839
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
3940
import org.apache.hadoop.fs.FileStatus
4041
import org.apache.hadoop.fs.Path
@@ -440,7 +441,7 @@ trait RecordChecksum extends DeltaLogging {
440441
deltaLog,
441442
opType = "delta.allFilesInCrc.checksumMismatch.aggregated",
442443
data = eventData)
443-
if (Utils.isTesting) {
444+
if (DeltaUtils.isTesting) {
444445
throw new IllegalStateException("Incrementally Computed State failed checksum check" +
445446
s" for commit $attemptVersion [$eventData]")
446447
}
@@ -825,7 +826,7 @@ trait ValidateChecksum extends DeltaLogging { self: Snapshot =>
825826
this.deltaLog,
826827
opType = "delta.allFilesInCrc.checksumMismatch.differentAllFiles",
827828
data = eventData)
828-
if (Utils.isTesting) throw new IllegalStateException(message)
829+
if (DeltaUtils.isTesting) throw new IllegalStateException(message)
829830
false
830831
}
831832
/**

spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
3838
import org.apache.spark.sql.delta.sources._
3939
import org.apache.spark.sql.delta.storage.LogStoreProvider
4040
import org.apache.spark.sql.delta.util.FileNames
41+
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
4142
import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification}
4243
import org.apache.hadoop.conf.Configuration
4344
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
@@ -152,7 +153,8 @@ class DeltaLog private(
152153

153154
private[delta] def shouldVerifyIncrementalCommit: Boolean = {
154155
spark.conf.get(DeltaSQLConf.INCREMENTAL_COMMIT_VERIFY) ||
155-
(Utils.isTesting && spark.conf.get(DeltaSQLConf.INCREMENTAL_COMMIT_FORCE_VERIFY_IN_TESTS))
156+
(DeltaUtils.isTesting
157+
&& spark.conf.get(DeltaSQLConf.INCREMENTAL_COMMIT_FORCE_VERIFY_IN_TESTS))
156158
}
157159

158160
/** The unique identifier for this table. */

spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala

+7-6
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
3535
import org.apache.spark.sql.delta.util.FileNames._
3636
import org.apache.spark.sql.delta.util.JsonUtils
3737
import org.apache.spark.sql.delta.util.threads.DeltaThreadPool
38+
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
3839
import com.fasterxml.jackson.annotation.JsonIgnore
3940
import io.delta.storage.commit.{Commit, GetCommitsResponse}
4041
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
@@ -44,7 +45,7 @@ import org.apache.spark.internal.MDC
4445
import org.apache.spark.sql.SparkSession
4546
import org.apache.spark.sql.catalyst.TableIdentifier
4647
import org.apache.spark.sql.catalyst.catalog.CatalogTable
47-
import org.apache.spark.util.{ThreadUtils, Utils}
48+
import org.apache.spark.util.ThreadUtils
4849

4950
/**
5051
* Wraps the most recently updated snapshot along with the timestamp the update was started.
@@ -270,7 +271,7 @@ trait SnapshotManagement { self: DeltaLog =>
270271
deltaLog = this,
271272
opType = CoordinatedCommitsUsageLogs.FS_COMMIT_COORDINATOR_LISTING_UNEXPECTED_GAPS,
272273
data = eventData)
273-
if (Utils.isTesting) {
274+
if (DeltaUtils.isTesting) {
274275
throw new IllegalStateException(
275276
s"Delta table at $dataPath unexpectedly still requires additional file-system listing " +
276277
s"after an additional file-system listing was already performed to reconcile the gap " +
@@ -646,7 +647,7 @@ trait SnapshotManagement { self: DeltaLog =>
646647
deltaLog = this,
647648
opType = "delta.getLogSegmentForVersion.compactedDeltaValidationFailed",
648649
data = eventData)
649-
if (Utils.isTesting) {
650+
if (DeltaUtils.isTesting) {
650651
assert(false, s"Validation around Compacted deltas failed while creating Snapshot. " +
651652
s"[${JsonUtils.toJson(eventData)}]")
652653
}
@@ -1071,7 +1072,7 @@ trait SnapshotManagement { self: DeltaLog =>
10711072
catalogTableOpt)
10721073
}
10731074
} catch {
1074-
case NonFatal(e) if !Utils.isTesting =>
1075+
case NonFatal(e) if !DeltaUtils.isTesting =>
10751076
// Failed to schedule the future -- fail in testing, but just log it in prod.
10761077
recordDeltaEvent(this, "delta.snapshot.asyncUpdateFailed", data = Map("exception" -> e))
10771078
}
@@ -1200,7 +1201,7 @@ trait SnapshotManagement { self: DeltaLog =>
12001201
/** Installs the given `newSnapshot` as the `currentSnapshot` */
12011202
protected def installSnapshot(newSnapshot: Snapshot, updateTimestamp: Long): Snapshot = {
12021203
if (!snapshotLock.isHeldByCurrentThread) {
1203-
if (Utils.isTesting) {
1204+
if (DeltaUtils.isTesting) {
12041205
throw new RuntimeException("DeltaLog snapshot replaced without taking lock")
12051206
}
12061207
recordDeltaEvent(this, "delta.update.unsafeReplace")
@@ -1292,7 +1293,7 @@ trait SnapshotManagement { self: DeltaLog =>
12921293
// NOTE: Validation is a no-op with incremental commit disabled.
12931294
newSnapshot.validateChecksum(Map("context" -> checksumContext))
12941295
} catch {
1295-
case _: IllegalStateException if !Utils.isTesting => false
1296+
case _: IllegalStateException if !DeltaUtils.isTesting => false
12961297
}
12971298

12981299
if (!crcIsValid) {

spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSourceUtils, De
3939
import org.apache.spark.sql.delta.stats.StatisticsCollection
4040
import org.apache.spark.sql.delta.tablefeatures.DropFeature
4141
import org.apache.spark.sql.delta.util.PartitionUtils
42+
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
4243
import org.apache.spark.sql.util.ScalaExtensions._
4344
import org.apache.hadoop.fs.Path
4445

@@ -155,7 +156,7 @@ class DeltaCatalog extends DelegatingCatalogExtension
155156
// Note: Spark generates the table location for managed tables in
156157
// `DeltaCatalog#delegate#createTable`, so `isManagedLocation` should never be true if
157158
// Unity Catalog is not involved. For safety we also check `isUnityCatalog` here.
158-
val respectManagedLoc = isUnityCatalog || org.apache.spark.util.Utils.isTesting
159+
val respectManagedLoc = isUnityCatalog || DeltaUtils.isTesting
159160
val tableType = if (location.isEmpty || (isManagedLocation && respectManagedLoc)) {
160161
CatalogTableType.MANAGED
161162
} else {

spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -768,7 +768,7 @@ trait VacuumCommandImpl extends DeltaCommand {
768768
protected def setCommitClock(deltaLog: DeltaLog, version: Long) = {
769769
// This is done to make sure that the commit timestamp reflects the one provided by the clock
770770
// object.
771-
if (Utils.isTesting) {
771+
if (DeltaUtils.isTesting) {
772772
val fs = deltaLog.logPath.getFileSystem(deltaLog.newDeltaHadoopConf())
773773
val filePath = DeltaCommitFileProvider(deltaLog.update()).deltaFile(version)
774774
if (fs.exists(filePath)) {

spark/src/main/scala/org/apache/spark/sql/delta/files/DeltaFileFormatWriter.scala

+4-3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import java.util.{Date, UUID}
2020

2121
import org.apache.spark.sql.delta.DeltaOptions
2222
import org.apache.spark.sql.delta.logging.DeltaLogKeys
23+
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
2324
import org.apache.hadoop.conf.Configuration
2425
import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
2526
import org.apache.hadoop.mapreduce._
@@ -188,7 +189,7 @@ object DeltaFileFormatWriter extends LoggingShims {
188189
// 1) When the planned write config is disabled.
189190
// 2) When the concurrent writers are enabled (in this case the required ordering of a
190191
// V1 write command will be empty).
191-
if (Utils.isTesting) outputOrderingMatched = orderingMatched
192+
if (DeltaUtils.isTesting) outputOrderingMatched = orderingMatched
192193

193194
if (writeFilesOpt.isDefined) {
194195
// build `WriteFilesSpec` for `WriteFiles`
@@ -248,7 +249,7 @@ object DeltaFileFormatWriter extends LoggingShims {
248249
}
249250

250251
// In testing, this is the only way to get hold of the actually executed plan written to file
251-
if (Utils.isTesting) executedPlan = Some(planToExecute)
252+
if (DeltaUtils.isTesting) executedPlan = Some(planToExecute)
252253

253254
val rdd = planToExecute.execute()
254255

@@ -331,7 +332,7 @@ object DeltaFileFormatWriter extends LoggingShims {
331332
val description = writeFilesSpec.description
332333

333334
// In testing, this is the only way to get hold of the actually executed plan written to file
334-
if (Utils.isTesting) executedPlan = Some(planForWrites)
335+
if (DeltaUtils.isTesting) executedPlan = Some(planForWrites)
335336

336337
writeAndCommit(job, description, committer) {
337338
val rdd = planForWrites.executeWrite(writeFilesSpec)

spark/src/main/scala/org/apache/spark/sql/delta/metering/DeltaLogging.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,13 @@ import org.apache.spark.sql.delta.actions.Metadata
3333
import org.apache.spark.sql.delta.logging.DeltaLogKeys
3434
import org.apache.spark.sql.delta.util.DeltaProgressReporter
3535
import org.apache.spark.sql.delta.util.JsonUtils
36+
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
3637
import org.apache.spark.sql.util.ScalaExtensions._
3738

3839
import org.apache.hadoop.fs.Path
3940

4041
import org.apache.spark.SparkThrowable
4142
import org.apache.spark.internal.{LoggingShims, MDC, MessageWithContext}
42-
import org.apache.spark.util.Utils
4343

4444
/**
4545
* Convenience wrappers for logging that include delta specific options and
@@ -153,7 +153,7 @@ trait DeltaLogging
153153
data: AnyRef = null,
154154
path: Option[Path] = None)
155155
: Unit = {
156-
if (Utils.isTesting) {
156+
if (DeltaUtils.isTesting) {
157157
assert(check, msg)
158158
} else if (!check) {
159159
recordDeltaEvent(

spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import scala.util.control.NonFatal
2323
import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaColumnMappingMode, DeltaErrors, DeltaLog, GeneratedColumn, NoMapping, TypeWidening, TypeWideningMode}
2424
import org.apache.spark.sql.delta.{RowCommitVersion, RowId}
2525
import org.apache.spark.sql.delta.ClassicColumnConversions._
26+
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
2627
import org.apache.spark.sql.delta.actions.Protocol
2728
import org.apache.spark.sql.delta.commands.cdc.CDCReader
2829
import org.apache.spark.sql.delta.logging.DeltaLogKeys
@@ -43,7 +44,6 @@ import org.apache.spark.sql.execution.streaming.IncrementalExecution
4344
import org.apache.spark.sql.functions.{col, struct}
4445
import org.apache.spark.sql.internal.SQLConf
4546
import org.apache.spark.sql.types._
46-
import org.apache.spark.util.Utils
4747

4848
object SchemaUtils extends DeltaLogging {
4949
// We use case insensitive resolution while writing into Delta
@@ -290,7 +290,7 @@ def normalizeColumnNamesInDataType(
290290
// The integral types can be cast to each other later on.
291291
sourceDataType
292292
case _ =>
293-
if (Utils.isTesting) {
293+
if (DeltaUtils.isTesting) {
294294
assert(sourceDataType == tableDataType,
295295
s"Types without nesting should match but $sourceDataType != $tableDataType")
296296
} else if (sourceDataType != tableDataType) {

spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/ZCube.scala

+2-3
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,17 @@ import scala.collection.mutable.ArrayBuffer
2020

2121
import org.apache.spark.sql.delta.actions.AddFile
2222
import org.apache.spark.sql.delta.commands.optimize.AddFileWithNumRecords
23+
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
2324
import org.apache.spark.sql.delta.zorder.ZCubeInfo
2425
import org.apache.spark.sql.delta.zorder.ZCubeInfo.{getForFile => getZCubeInfo}
2526

26-
import org.apache.spark.util.Utils
27-
2827
/**
2928
* Collection of files that were produced by the same job in a run of the clustering command.
3029
*/
3130
case class ZCube(files: Seq[AddFile]) {
3231
require(files.nonEmpty)
3332

34-
if (Utils.isTesting) {
33+
if (DeltaUtils.isTesting) {
3534
assert(files.forall(getZCubeInfo(_) == Some(zCubeInfo)))
3635
}
3736

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.network.util.ByteUnit
2424
import org.apache.spark.sql.catalyst.FileSourceOptions
2525
import org.apache.spark.sql.internal.SQLConf
2626
import org.apache.spark.storage.StorageLevel
27-
import org.apache.spark.util.Utils
27+
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
2828

2929
/**
3030
* [[SQLConf]] entries for Delta features.
@@ -2423,21 +2423,21 @@ trait DeltaSQLConfBase {
24232423
.internal()
24242424
.doc("If true, post-commit hooks will by default throw an exception when they fail.")
24252425
.booleanConf
2426-
.createWithDefault(Utils.isTesting)
2426+
.createWithDefault(DeltaUtils.isTesting)
24272427

24282428
val TEST_FILE_NAME_PREFIX =
24292429
buildStaticConf("testOnly.dataFileNamePrefix")
24302430
.internal()
24312431
.doc("[TEST_ONLY]: The prefix to use for the names of all Parquet data files.")
24322432
.stringConf
2433-
.createWithDefault(if (Utils.isTesting) "test%file%prefix-" else "")
2433+
.createWithDefault(if (DeltaUtils.isTesting) "test%file%prefix-" else "")
24342434

24352435
val TEST_DV_NAME_PREFIX =
24362436
buildStaticConf("testOnly.dvFileNamePrefix")
24372437
.internal()
24382438
.doc("[TEST_ONLY]: The prefix to use for the names of all Deletion Vector files.")
24392439
.stringConf
2440-
.createWithDefault(if (Utils.isTesting) "test%dv%prefix-" else "")
2440+
.createWithDefault(if (DeltaUtils.isTesting) "test%dv%prefix-" else "")
24412441

24422442
///////////
24432443
// UTC TIMESTAMP PARTITION VALUES

0 commit comments

Comments
 (0)