Skip to content

Commit 34f9f0b

Browse files
authored
[Spark] Extend streaming tests with coordinated commits (1/2) (#4039)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> + Unify the `modifyCommitTimestamp` helper functions to avoid code duplication. + Extend existing streaming tests to run with coordinated commits ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Run the new tests ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No
1 parent bd1c935 commit 34f9f0b

10 files changed

+161
-132
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,8 @@ object CoordinatedCommitsUtils extends DeltaLogging {
165165
protocol: Protocol,
166166
failIfImplUnavailable: Boolean): Option[CommitCoordinatorClient] = {
167167
metadata.coordinatedCommitsCoordinatorName.flatMap { commitCoordinatorStr =>
168-
assert(protocol.isFeatureSupported(CoordinatedCommitsTableFeature))
168+
assert(protocol.isFeatureSupported(CoordinatedCommitsTableFeature),
169+
"coordinated commits table feature is not supported")
169170
val coordinatorConf = metadata.coordinatedCommitsCoordinatorConf
170171
val coordinatorOpt = CommitCoordinatorProvider.getCommitCoordinatorClientOpt(
171172
commitCoordinatorStr, coordinatorConf, spark)

spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSQLSuite.scala

+4-3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package org.apache.spark.sql.delta
1818

1919
import java.util.Date
2020

21+
import org.apache.spark.sql.delta.DeltaTestUtils.modifyCommitTimestamp
2122
import org.apache.spark.sql.delta.actions.Protocol
2223
import org.apache.spark.sql.delta.sources.DeltaSQLConf
2324
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
@@ -226,9 +227,9 @@ class DeltaCDCSQLSuite extends DeltaCDCSuiteBase with DeltaColumnMappingTestUtil
226227
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tbl))
227228

228229
val currentTime = new Date().getTime
229-
modifyDeltaTimestamp(deltaLog, 0, currentTime - 100000)
230-
modifyDeltaTimestamp(deltaLog, 1, currentTime)
231-
modifyDeltaTimestamp(deltaLog, 2, currentTime + 100000)
230+
modifyCommitTimestamp(deltaLog, 0, currentTime - 100000)
231+
modifyCommitTimestamp(deltaLog, 1, currentTime)
232+
modifyCommitTimestamp(deltaLog, 2, currentTime + 100000)
232233

233234
val readDf = sql(s"SELECT * FROM table_changes('$tbl', 0, now())")
234235
checkCDCAnswer(

spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala

+22-18
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,23 @@
1616

1717
package org.apache.spark.sql.delta
1818

19-
import java.io.File
2019
import java.sql.Timestamp
2120
import java.text.SimpleDateFormat
2221
import java.util.Date
2322

2423
import scala.language.implicitConversions
2524

25+
import org.apache.spark.sql.delta.DeltaTestUtils.modifyCommitTimestamp
2626
import org.apache.spark.sql.delta.actions.AddCDCFile
2727
import org.apache.spark.sql.delta.commands.cdc.CDCReader
2828
import org.apache.spark.sql.delta.sources.{DeltaSourceOffset, DeltaSQLConf}
2929
import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest}
3030
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
31-
import org.apache.spark.sql.delta.util.{FileNames, JsonUtils}
31+
import org.apache.spark.sql.delta.util.JsonUtils
3232
import io.delta.tables._
3333
import org.apache.hadoop.fs.Path
3434

35-
import org.apache.spark.{SparkConf, SparkThrowable}
36-
import org.apache.spark.sql.DataFrame
35+
import org.apache.spark.SparkConf
3736
import org.apache.spark.sql.functions._
3837
import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger}
3938
import org.apache.spark.sql.types.StructType
@@ -48,16 +47,6 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest
4847
override protected def sparkConf: SparkConf = super.sparkConf
4948
.set(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey, "true")
5049

51-
/** Modify timestamp for a delta commit, used to test timestamp querying */
52-
def modifyDeltaTimestamp(deltaLog: DeltaLog, version: Long, time: Long): Unit = {
53-
val file = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri)
54-
file.setLastModified(time)
55-
val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri)
56-
if (crc.exists()) {
57-
crc.setLastModified(time)
58-
}
59-
}
60-
6150
/**
6251
* Create two tests for maxFilesPerTrigger and maxBytesPerTrigger
6352
*/
@@ -198,11 +187,11 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest
198187
// version 0
199188
Seq(1, 2, 3).toDF("id").write.delta(inputDir.toString)
200189
val deltaLog = DeltaLog.forTable(spark, inputDir.getAbsolutePath)
201-
modifyDeltaTimestamp(deltaLog, 0, 1000)
190+
modifyCommitTimestamp(deltaLog, 0, 1000)
202191

203192
// version 1
204193
Seq(-1).toDF("id").write.mode("append").delta(inputDir.toString)
205-
modifyDeltaTimestamp(deltaLog, 1, 2000)
194+
modifyCommitTimestamp(deltaLog, 1, 2000)
206195

207196
val deltaTable = io.delta.tables.DeltaTable.forPath(inputDir.getAbsolutePath)
208197
val startTs = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
@@ -231,7 +220,7 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest
231220
// version 0
232221
Seq(1, 2, 3, 4, 5, 6).toDF("id").write.delta(inputDir.toString)
233222
val deltaLog = DeltaLog.forTable(spark, inputDir.getAbsolutePath)
234-
modifyDeltaTimestamp(deltaLog, 0, 1000)
223+
modifyCommitTimestamp(deltaLog, 0, 1000)
235224

236225
val df1 = spark.readStream
237226
.option(DeltaOptions.CDC_READ_OPTION, "true")
@@ -278,7 +267,7 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest
278267
Seq(1, 2, 3).toDF("id").write.delta(inputDir.toString)
279268
val inputPath = inputDir.getAbsolutePath
280269
val deltaLog = DeltaLog.forTable(spark, inputPath)
281-
modifyDeltaTimestamp(deltaLog, 0, 1000)
270+
modifyCommitTimestamp(deltaLog, 0, 1000)
282271

283272
val deltaTable = io.delta.tables.DeltaTable.forPath(inputPath)
284273

@@ -1065,6 +1054,21 @@ class DeltaCDCStreamDeletionVectorSuite extends DeltaCDCStreamSuite
10651054
}
10661055

10671056
class DeltaCDCStreamSuite extends DeltaCDCStreamSuiteBase
1057+
class DeltaCDCStreamWithCoordinatedCommitsBatch1Suite
1058+
extends DeltaCDCStreamSuite {
1059+
override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(1)
1060+
}
1061+
1062+
class DeltaCDCStreamWithCoordinatedCommitsBatch10Suite
1063+
extends DeltaCDCStreamSuite {
1064+
override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(10)
1065+
}
1066+
1067+
class DeltaCDCStreamWithCoordinatedCommitsBatch100Suite
1068+
extends DeltaCDCStreamSuite {
1069+
override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(100)
1070+
}
1071+
10681072
abstract class DeltaCDCStreamColumnMappingSuiteBase extends DeltaCDCStreamSuite
10691073
with ColumnMappingStreamingBlockedWorkflowSuiteBase with DeltaColumnMappingSelectedTestMixin {
10701074

spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala

+36-63
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.Date
2323
import scala.collection.JavaConverters._
2424

2525
// scalastyle:off import.ordering.noEmptyLine
26-
import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN
26+
import org.apache.spark.sql.delta.DeltaTestUtils.{modifyCommitTimestamp, BOOLEAN_DOMAIN}
2727
import org.apache.spark.sql.delta.commands.cdc.CDCReader._
2828
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite
2929
import org.apache.spark.sql.delta.sources.DeltaSQLConf
@@ -108,16 +108,6 @@ abstract class DeltaCDCSuiteBase
108108
schemaMode: Option[DeltaBatchCDFSchemaMode] = Some(BatchCDFSchemaLegacy),
109109
readerOptions: Map[String, String] = Map.empty): DataFrame
110110

111-
/** Modify timestamp for a delta commit, used to test timestamp querying */
112-
def modifyDeltaTimestamp(deltaLog: DeltaLog, version: Long, time: Long): Unit = {
113-
val file = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri)
114-
file.setLastModified(time)
115-
val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri)
116-
if (crc.exists()) {
117-
crc.setLastModified(time)
118-
}
119-
}
120-
121111
/** Create table utility method */
122112
def ctas(srcTbl: String, dstTbl: String, disableCDC: Boolean = false): Unit = {
123113
val readDf = cdcRead(new TableName(srcTbl), StartingVersion("0"), EndingVersion("1"))
@@ -252,14 +242,14 @@ abstract class DeltaCDCSuiteBase
252242

253243
// modify timestamps
254244
// version 0
255-
modifyDeltaTimestamp(deltaLog, 0, 0)
245+
modifyCommitTimestamp(deltaLog, 0, 0)
256246
val tsAfterV0 = dateFormat.format(new Date(1))
257247

258248
// version 1
259-
modifyDeltaTimestamp(deltaLog, 1, 1000)
249+
modifyCommitTimestamp(deltaLog, 1, 1000)
260250
val tsAfterV1 = dateFormat.format(new Date(1001))
261251

262-
modifyDeltaTimestamp(deltaLog, 2, 2000)
252+
modifyCommitTimestamp(deltaLog, 2, 2000)
263253

264254
val readDf = cdcRead(
265255
new TablePath(tempDir.getAbsolutePath),
@@ -278,9 +268,9 @@ abstract class DeltaCDCSuiteBase
278268
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
279269
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
280270

281-
modifyDeltaTimestamp(deltaLog, 0, 0)
282-
modifyDeltaTimestamp(deltaLog, 1, 10000)
283-
modifyDeltaTimestamp(deltaLog, 2, 20000)
271+
modifyCommitTimestamp(deltaLog, 0, 0)
272+
modifyCommitTimestamp(deltaLog, 1, 10000)
273+
modifyCommitTimestamp(deltaLog, 2, 20000)
284274

285275
val ts0 = dateFormat.format(new Date(2000))
286276
val readDf = cdcRead(
@@ -299,9 +289,9 @@ abstract class DeltaCDCSuiteBase
299289
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
300290
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
301291

302-
modifyDeltaTimestamp(deltaLog, 0, 0)
303-
modifyDeltaTimestamp(deltaLog, 1, 1000)
304-
modifyDeltaTimestamp(deltaLog, 2, 2000)
292+
modifyCommitTimestamp(deltaLog, 0, 0)
293+
modifyCommitTimestamp(deltaLog, 1, 1000)
294+
modifyCommitTimestamp(deltaLog, 2, 2000)
305295

306296
val ts0 = dateFormat.format(new Date(0))
307297
val readDf = cdcRead(
@@ -320,9 +310,9 @@ abstract class DeltaCDCSuiteBase
320310
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
321311
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
322312

323-
modifyDeltaTimestamp(deltaLog, 0, 4000)
324-
modifyDeltaTimestamp(deltaLog, 1, 8000)
325-
modifyDeltaTimestamp(deltaLog, 2, 12000)
313+
modifyCommitTimestamp(deltaLog, 0, 4000)
314+
modifyCommitTimestamp(deltaLog, 1, 8000)
315+
modifyCommitTimestamp(deltaLog, 2, 12000)
326316

327317
val ts0 = dateFormat.format(new Date(1000))
328318
val ts1 = dateFormat.format(new Date(3000))
@@ -341,9 +331,9 @@ abstract class DeltaCDCSuiteBase
341331
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
342332
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
343333

344-
modifyDeltaTimestamp(deltaLog, 0, 0)
345-
modifyDeltaTimestamp(deltaLog, 1, 4000)
346-
modifyDeltaTimestamp(deltaLog, 2, 8000)
334+
modifyCommitTimestamp(deltaLog, 0, 0)
335+
modifyCommitTimestamp(deltaLog, 1, 4000)
336+
modifyCommitTimestamp(deltaLog, 2, 8000)
347337

348338
val ts0 = dateFormat.format(new Date(1000))
349339
val ts1 = dateFormat.format(new Date(3000))
@@ -363,9 +353,9 @@ abstract class DeltaCDCSuiteBase
363353
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
364354
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
365355

366-
modifyDeltaTimestamp(deltaLog, 0, 0)
367-
modifyDeltaTimestamp(deltaLog, 1, 4000)
368-
modifyDeltaTimestamp(deltaLog, 2, 8000)
356+
modifyCommitTimestamp(deltaLog, 0, 0)
357+
modifyCommitTimestamp(deltaLog, 1, 4000)
358+
modifyCommitTimestamp(deltaLog, 2, 8000)
369359

370360
val ts0 = dateFormat.format(new Date(3000))
371361
val ts1 = dateFormat.format(new Date(5000))
@@ -385,9 +375,9 @@ abstract class DeltaCDCSuiteBase
385375
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
386376
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
387377

388-
modifyDeltaTimestamp(deltaLog, 0, 0)
389-
modifyDeltaTimestamp(deltaLog, 1, 4000)
390-
modifyDeltaTimestamp(deltaLog, 2, 8000)
378+
modifyCommitTimestamp(deltaLog, 0, 0)
379+
modifyCommitTimestamp(deltaLog, 1, 4000)
380+
modifyCommitTimestamp(deltaLog, 2, 8000)
391381

392382
val ts0 = dateFormat.format(new Date(3000))
393383
val ts1 = dateFormat.format(new Date(1000))
@@ -406,9 +396,9 @@ abstract class DeltaCDCSuiteBase
406396
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
407397
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
408398

409-
modifyDeltaTimestamp(deltaLog, 0, 0)
410-
modifyDeltaTimestamp(deltaLog, 1, 4000)
411-
modifyDeltaTimestamp(deltaLog, 2, 8000)
399+
modifyCommitTimestamp(deltaLog, 0, 0)
400+
modifyCommitTimestamp(deltaLog, 1, 4000)
401+
modifyCommitTimestamp(deltaLog, 2, 8000)
412402

413403
val ts0 = dateFormat.format(new Date(5000))
414404
val ts1 = dateFormat.format(new Date(3000))
@@ -449,7 +439,7 @@ abstract class DeltaCDCSuiteBase
449439
// Set commit time during Daylight savings time change.
450440
val restoreDate = "2022-11-06 01:42:44"
451441
val timestamp = dateFormat.parse(s"$restoreDate -0800").getTime
452-
modifyDeltaTimestamp(deltaLog, 0, timestamp)
442+
modifyCommitTimestamp(deltaLog, 0, timestamp)
453443

454444
// Verify DST is respected.
455445
val e = intercept[Exception] {
@@ -558,9 +548,9 @@ abstract class DeltaCDCSuiteBase
558548
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
559549
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
560550

561-
modifyDeltaTimestamp(deltaLog, 0, 0)
562-
modifyDeltaTimestamp(deltaLog, 1, 1000)
563-
modifyDeltaTimestamp(deltaLog, 2, 2000)
551+
modifyCommitTimestamp(deltaLog, 0, 0)
552+
modifyCommitTimestamp(deltaLog, 1, 1000)
553+
modifyCommitTimestamp(deltaLog, 2, 2000)
564554

565555
val ts0 = dateFormat.format(new Date(2000))
566556
val ts1 = dateFormat.format(new Date(1))
@@ -795,13 +785,13 @@ abstract class DeltaCDCSuiteBase
795785

796786
// modify timestamps
797787
// version 0
798-
modifyDeltaTimestamp(deltaLog, 0, 0)
788+
modifyCommitTimestamp(deltaLog, 0, 0)
799789

800790
// version 1
801-
modifyDeltaTimestamp(deltaLog, 1, 1000)
791+
modifyCommitTimestamp(deltaLog, 1, 1000)
802792

803793
// version 2
804-
modifyDeltaTimestamp(deltaLog, 2, 2000)
794+
modifyCommitTimestamp(deltaLog, 2, 2000)
805795
val tsStart = dateFormat.format(new Date(3000))
806796
val tsEnd = dateFormat.format(new Date(4000))
807797

@@ -825,13 +815,13 @@ abstract class DeltaCDCSuiteBase
825815

826816
// modify timestamps
827817
// version 0
828-
modifyDeltaTimestamp(deltaLog, 0, 0)
818+
modifyCommitTimestamp(deltaLog, 0, 0)
829819

830820
// version 1
831-
modifyDeltaTimestamp(deltaLog, 1, 1000)
821+
modifyCommitTimestamp(deltaLog, 1, 1000)
832822

833823
// version 2
834-
modifyDeltaTimestamp(deltaLog, 2, 2000)
824+
modifyCommitTimestamp(deltaLog, 2, 2000)
835825

836826
val tsStart = dateFormat.format(new Date(0))
837827
val tsEnd = dateFormat.format(new Date(4000))
@@ -1107,23 +1097,6 @@ class DeltaCDCScalaWithDeletionVectorsSuite extends DeltaCDCScalaSuite
11071097
}
11081098

11091099
class DeltaCDCScalaSuiteWithCoordinatedCommitsBatch10 extends DeltaCDCScalaSuite
1110-
with CoordinatedCommitsBaseSuite {
1111-
1112-
/** Modify timestamp for a delta commit, used to test timestamp querying */
1113-
override def modifyDeltaTimestamp(deltaLog: DeltaLog, version: Long, time: Long): Unit = {
1114-
val fileProvider = DeltaCommitFileProvider(deltaLog.snapshot)
1115-
val file = new File(fileProvider.deltaFile(version).toUri)
1116-
InCommitTimestampTestUtils.overwriteICTInDeltaFile(
1117-
deltaLog,
1118-
new Path(file.getPath),
1119-
Some(time))
1120-
file.setLastModified(time)
1121-
val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri)
1122-
if (crc.exists()) {
1123-
InCommitTimestampTestUtils.overwriteICTInCrc(deltaLog, version, Some(time))
1124-
crc.setLastModified(time)
1125-
}
1126-
}
1127-
1100+
with CoordinatedCommitsBaseSuite {
11281101
override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(10)
11291102
}

spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala

+2-27
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@ import scala.language.implicitConversions
2929
import com.databricks.spark.util.Log4jUsageLogger
3030
import org.apache.spark.sql.delta.DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED
3131
import org.apache.spark.sql.delta.DeltaHistoryManagerSuiteShims._
32-
import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile
32+
import org.apache.spark.sql.delta.DeltaTestUtils.{createTestAddFile, modifyCommitTimestamp}
3333
import org.apache.spark.sql.delta.catalog.DeltaTableV2
3434
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite
3535
import org.apache.spark.sql.delta.sources.DeltaSQLConf
3636
import org.apache.spark.sql.delta.stats.StatsUtils
3737
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
3838
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
39-
import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, FileNames}
39+
import org.apache.spark.sql.delta.util.FileNames
4040
import org.scalatest.GivenWhenThen
4141

4242
import org.apache.spark.{SparkConf, SparkException}
@@ -64,31 +64,6 @@ trait DeltaTimeTravelTests extends QueryTest
6464

6565
protected val timeFormatter = new SimpleDateFormat("yyyyMMddHHmmssSSS")
6666

67-
protected def modifyCommitTimestamp(deltaLog: DeltaLog, version: Long, ts: Long): Unit = {
68-
val filePath = DeltaCommitFileProvider(deltaLog.update()).deltaFile(version)
69-
val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri)
70-
if (isICTEnabledForNewTables) {
71-
InCommitTimestampTestUtils.overwriteICTInDeltaFile(deltaLog, filePath, Some(ts))
72-
if (FileNames.isUnbackfilledDeltaFile(filePath)) {
73-
// Also change the ICT in the backfilled file if it exists.
74-
val backfilledFilePath = FileNames.unsafeDeltaFile(deltaLog.logPath, version)
75-
val fs = backfilledFilePath.getFileSystem(deltaLog.newDeltaHadoopConf())
76-
if (fs.exists(backfilledFilePath)) {
77-
InCommitTimestampTestUtils.overwriteICTInDeltaFile(deltaLog, backfilledFilePath, Some(ts))
78-
}
79-
}
80-
if (crc.exists()) {
81-
InCommitTimestampTestUtils.overwriteICTInCrc(deltaLog, version, Some(ts))
82-
}
83-
} else {
84-
val file = new File(filePath.toUri)
85-
file.setLastModified(ts)
86-
if (crc.exists()) {
87-
crc.setLastModified(ts)
88-
}
89-
}
90-
}
91-
9267
protected def versionAsOf(table: String, version: Long): String = {
9368
s"$table version as of $version"
9469
}

0 commit comments

Comments
 (0)