Skip to content

Commit

Permalink
Keep the latest expired checkpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Felipe Pessoto <[email protected]>
  • Loading branch information
felipepessoto committed Feb 11, 2025
1 parent bd1c935 commit 0394394
Show file tree
Hide file tree
Showing 2 changed files with 310 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,38 @@ trait MetadataCleanup extends DeltaLogging {
private def listExpiredDeltaLogs(fileCutOffTime: Long): Iterator[FileStatus] = {
val latestCheckpoint = readLastCheckpointFile()
if (latestCheckpoint.isEmpty) return Iterator.empty
val threshold = latestCheckpoint.get.version - 1L

def listExpiredDeltaLogsInternal(threshold: Long): Iterator[FileStatus] = {
val files = store.listFrom(listingPrefix(logPath, 0), newDeltaHadoopConf())
.filter(f => isCheckpointFile(f) || isDeltaFile(f))

new BufferingLogDeletionIterator(
files, fileCutOffTime, threshold, getDeltaFileOrCheckpointVersion)
}

// Get latest expired checkpoint version
val latestExpiredCheckpointVersion = {
val cutOffCheckpoint = listExpiredDeltaLogsInternal(latestCheckpoint.get.version - 1)
.filter(f => isCheckpointFile(f))
.map(FileNames.checkpointVersion)
.foldLeft(Option.empty[Long]) { (a, c) => if (a.getOrElse(-1L) < c) { Some(c) } else { a } }

logInfo("Cut off checkpoint version: " + cutOffCheckpoint)

recordDeltaEvent(this, "delta.log.cleanup.stats", data = Map(
"cutOffCheckpoint" -> cutOffCheckpoint.getOrElse(-1L),
"latestCheckpoint" -> latestCheckpoint.get.version))

if(cutOffCheckpoint.isEmpty) {
logInfo("No expired checkpoint file found. Returning empty iterator.")
return Iterator.empty
} else {
// Math.min for sanity check
Math.min(cutOffCheckpoint.get, latestCheckpoint.get.version) - 1L
}
}

listExpiredDeltaLogsInternal(latestExpiredCheckpointVersion)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,289 @@ class DeltaRetentionSuite extends QueryTest
}
}

(Seq(("Default", Seq.empty[(String, String)])) ++ CheckpointPolicy.ALL.map {
case CheckpointPolicy.Classic =>
Seq(
("Classic", Seq(DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey -> CheckpointPolicy.Classic.name)),
("Multipart", Seq(DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey -> CheckpointPolicy.Classic.name,
DeltaSQLConf.DELTA_CHECKPOINT_PART_SIZE.key -> "1"))
)
case CheckpointPolicy.V2 =>
V2Checkpoint.Format.ALL_AS_STRINGS.map { v2CheckpointFormat =>
(s"V2 $v2CheckpointFormat",
Seq(DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey -> CheckpointPolicy.V2.name,
DeltaSQLConf.CHECKPOINT_V2_TOP_LEVEL_FILE_FORMAT.key -> v2CheckpointFormat))
}
}.flatten).foreach { case (chkConfigName, chkConfig) =>
test(s"cleanup does not delete the latest checkpoint before the cutoff. Config: $chkConfigName.") {
withSQLConf(chkConfig: _*) {
withTempDir { tempDir =>
val startTime = getStartTimeForRetentionTest
val clock = new ManualClock(startTime)
val actualTestStartTime = System.currentTimeMillis()
val tableReference = s"delta.`${tempDir.getCanonicalPath()}`"
val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath), clock)
val logPath = new File(log.logPath.toUri)
val minChksCount = if (chkConfigName == "Multipart") { 2 } else { 1 }

// commit 0
spark.sql(
s"""CREATE TABLE $tableReference (id Int) USING delta
| TBLPROPERTIES('delta.enableChangeDataFeed' = true)
""".stripMargin)
// Set time for commit 0 to ensure that the commits don't need timestamp adjustment.
val commit0Time = clock.getTimeMillis()
new File(FileNames.unsafeDeltaFile(log.logPath, 0).toUri).setLastModified(commit0Time)
new File(FileNames.checksumFile(log.logPath, 0).toUri).setLastModified(commit0Time)

def commitNewVersion(version: Long) = {
spark.sql(s"INSERT INTO $tableReference VALUES (1)")

val deltaFile = new File(FileNames.unsafeDeltaFile(log.logPath, version).toUri)
val time = clock.getTimeMillis() + version * 1000
deltaFile.setLastModified(time)
val crcFile = new File(FileNames.checksumFile(log.logPath, version).toUri)
crcFile.setLastModified(time)
val chks = getCheckpointFiles(logPath)
.filter(f => FileNames.checkpointVersion(new Path(f.getCanonicalPath)) == version)

if (version % 10 == 0) {
assert(chks.length >= minChksCount)
chks.foreach { chk =>
assert(chk.exists())
chk.setLastModified(time)
}
} else { assert(chks.isEmpty) }
}

// Day 0: Add commits 1 to 15 --> creates 1 checkpoint at Day 0 for version 10
(1L to 15L).foreach(commitNewVersion)

// ensure that the checkpoint at version 10 exists
val checkpoint10Files = getCheckpointFiles(logPath)
.filter(f => FileNames.checkpointVersion(new Path(f.getCanonicalPath)) == 10)
assert(checkpoint10Files.length >= minChksCount)
assert(checkpoint10Files.forall(_.exists))
val deltaFiles = (0 to 15).map { i =>
new File(FileNames.unsafeDeltaFile(log.logPath, i).toUri)
}
deltaFiles.foreach { f =>
assert(f.exists())
}

// Day 35: Add commits 16 to 25 --> creates a checkpoint at Day 35 for version 20
clock.setTime(day(startTime, 35))
(16L to 25L).foreach(commitNewVersion)

assert(checkpoint10Files.forall(_.exists))
deltaFiles.foreach { f =>
assert(f.exists())
}

// auto cleanup is disabled in DeltaRetentionSuiteBase so tests have control when it happens
cleanUpExpiredLogs(log)

// assert that the checkpoint from day 0 (at version 10) and all the commits after
// that are still there
assert(checkpoint10Files.forall(_.exists))
deltaFiles.foreach { f =>
val version = FileNames.deltaVersion(new Path(f.toString()))
if (version < 10) {
assert(!f.exists, version)
} else {
assert(f.exists, version)
}
}

// Validate we can time travel to version >=10
val earliestExpectedChkVersion = 10
(0 to 25).map { version =>
val sqlCommand = s"SELECT * FROM $tableReference VERSION AS OF $version"
if (version < earliestExpectedChkVersion) {
val ex = intercept[org.apache.spark.sql.delta.VersionNotFoundException] {
spark.sql(sqlCommand).collect()
}
assert(ex.userVersion === version)
assert(ex.earliest === earliestExpectedChkVersion)
assert(ex.latest === 25)
} else {
spark.sql(sqlCommand).collect()
}
}

// Validate CDF - SELECT * FROM table_changes_by_path('table', X, Y)
(0 to 24).map { version =>
val sqlCommand = s"SELECT * FROM table_changes_by_path('${tempDir.getCanonicalPath}', $version, 25)"
if (version < earliestExpectedChkVersion) {
val ex = intercept[org.apache.spark.sql.delta.DeltaFileNotFoundException] {
spark.sql(sqlCommand).collect()
}
} else {
spark.sql(sqlCommand).collect()
}
}
}
}
}
}

test(s"cleanup does not delete the logs if no checkpoints exist") {
withTempDir { tempDir =>
val startTime = getStartTimeForRetentionTest
val clock = new ManualClock(startTime)
val actualTestStartTime = System.currentTimeMillis()
val tableReference = s"delta.`${tempDir.getCanonicalPath()}`"
val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath), clock)
val logPath = new File(log.logPath.toUri)

// commit 0
spark.sql(
s"""CREATE TABLE $tableReference (id Int) USING delta
| TBLPROPERTIES('delta.enableChangeDataFeed' = true)
""".stripMargin)
// Set time for commit 0 to ensure that the commits don't need timestamp adjustment.
val commit0Time = clock.getTimeMillis()
new File(FileNames.unsafeDeltaFile(log.logPath, 0).toUri).setLastModified(commit0Time)
new File(FileNames.checksumFile(log.logPath, 0).toUri).setLastModified(commit0Time)

def commitNewVersion(version: Long) = {
spark.sql(s"INSERT INTO $tableReference VALUES (1)")

val deltaFile = new File(FileNames.unsafeDeltaFile(log.logPath, version).toUri)
val time = clock.getTimeMillis() + version * 1000
deltaFile.setLastModified(time)
val crcFile = new File(FileNames.checksumFile(log.logPath, version).toUri)
crcFile.setLastModified(time)
}

// Day 0: Add commits 1 to 9
(1L to 9L).foreach(commitNewVersion)

// Day 35: Add commit 10 --> creates a checkpoint at Day 35 for version 10
clock.setTime(day(startTime, 35))
commitNewVersion(10L)

// checkpoint is created
val checkpoint10Files = getCheckpointFiles(logPath)
.filter(f => FileNames.checkpointVersion(new Path(f.getCanonicalPath)) == 10)
assert(checkpoint10Files.length == 1)
assert(checkpoint10Files.forall(_.exists))

val deltaFiles = (0 to 10).map { i =>
new File(FileNames.unsafeDeltaFile(log.logPath, i).toUri)
}

// ensure that the versions 0 to 10 exists before the cleanup
deltaFiles.foreach { f =>
assert(f.exists())
}

cleanUpExpiredLogs(log)

// ensure that the versions 0 to 9 still exists after the checkpoint
deltaFiles.foreach { f =>
val version = FileNames.deltaVersion(new Path(f.toString()))
assert(f.exists, version)
}

// Validate we can time travel to all versions
val earliestExpectedChkVersion = 0
(0 to 10).map { version =>
val sqlCommand = s"SELECT * FROM $tableReference VERSION AS OF $version"
if (version < earliestExpectedChkVersion) {
val ex = intercept[org.apache.spark.sql.delta.VersionNotFoundException] {
spark.sql(sqlCommand).collect()
}
assert(ex.userVersion === version)
assert(ex.earliest === earliestExpectedChkVersion)
assert(ex.latest === 10)
} else {
spark.sql(sqlCommand).collect()
}
}

// Validate CDF - SELECT * FROM table_changes_by_path('table', X, Y)
(0 to 9).map { version =>
val sqlCommand = s"SELECT * FROM table_changes_by_path('${tempDir.getCanonicalPath}', $version, 10)"
if (version < earliestExpectedChkVersion) {
val ex = intercept[org.apache.spark.sql.delta.DeltaFileNotFoundException] {
spark.sql(sqlCommand).collect()
}
} else {
spark.sql(sqlCommand).collect()
}
}
}
}

test(s"cleanup does not delete the latest checkpoint before the cutoff, even if not strictly required") {
withTempDir { tempDir =>
val startTime = getStartTimeForRetentionTest
val clock = new ManualClock(startTime)
val actualTestStartTime = System.currentTimeMillis()
val tableReference = s"delta.`${tempDir.getCanonicalPath()}`"
val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath), clock)
val logPath = new File(log.logPath.toUri)

// commit 0
spark.sql(s"CREATE TABLE $tableReference (id Int) USING delta")
// Set time for commit 0 to ensure that the commits don't need timestamp adjustment.
val commit0Time = clock.getTimeMillis()
new File(FileNames.unsafeDeltaFile(log.logPath, 0).toUri).setLastModified(commit0Time)
new File(FileNames.checksumFile(log.logPath, 0).toUri).setLastModified(commit0Time)

def commitNewVersion(version: Long) = {
spark.sql(s"INSERT INTO $tableReference VALUES (1)")

val deltaFile = new File(FileNames.unsafeDeltaFile(log.logPath, version).toUri)
val time = clock.getTimeMillis() + version * 1000
deltaFile.setLastModified(time)
val crcFile = new File(FileNames.checksumFile(log.logPath, version).toUri)
crcFile.setLastModified(time)
}

// Day 0: Add commits 1 to 9
(1L to 9L).foreach(commitNewVersion)

// Creates a checkpoint for version 9
log.checkpoint()
val chk9 = getCheckpointFiles(logPath)
.filter(f => FileNames.checkpointVersion(new Path(f.getCanonicalPath)) == 9).head
assert(chk9.exists())
chk9.setLastModified(clock.getTimeMillis() + 9 * 1000)

// Day 35: Add commit 10 --> creates a checkpoint at Day 35 for version 10
clock.setTime(day(startTime, 35))
commitNewVersion(10L)

// checkpoint is created
val chk10 = getCheckpointFiles(logPath)
.filter(f => FileNames.checkpointVersion(new Path(f.getCanonicalPath)) == 10).head
assert(chk10.exists())

val lastExpiredCheckpoint = 9

val deltaFiles = (lastExpiredCheckpoint to 10).map { i =>
new File(FileNames.unsafeDeltaFile(log.logPath, i).toUri)
}

// ensure that the versions 0 to 10 exists before the cleanup
deltaFiles.foreach { f =>
assert(f.exists())
}

cleanUpExpiredLogs(log)

// ensure that the version 0-9, including the checkpoint 9,
// still exists after the checkpoint 10 is created
deltaFiles.foreach { f =>
val version = FileNames.deltaVersion(new Path(f.toString()))
assert(f.exists, version)
}
assert(chk9.exists)
}
}

test("Metadata cleanup respects requireCheckpointProtectionBeforeVersion") {
// Commits should be cleaned up to the latest checkpoint.
testRequireCheckpointProtectionBeforeVersion(
Expand Down

0 comments on commit 0394394

Please sign in to comment.