diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala b/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala index d88a248b335..ca3bcab9d6b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala @@ -155,12 +155,38 @@ trait MetadataCleanup extends DeltaLogging { private def listExpiredDeltaLogs(fileCutOffTime: Long): Iterator[FileStatus] = { val latestCheckpoint = readLastCheckpointFile() if (latestCheckpoint.isEmpty) return Iterator.empty - val threshold = latestCheckpoint.get.version - 1L + + def listExpiredDeltaLogsInternal(threshold: Long): Iterator[FileStatus] = { val files = store.listFrom(listingPrefix(logPath, 0), newDeltaHadoopConf()) .filter(f => isCheckpointFile(f) || isDeltaFile(f)) new BufferingLogDeletionIterator( files, fileCutOffTime, threshold, getDeltaFileOrCheckpointVersion) + } + + // Get latest expired checkpoint version + val latestExpiredCheckpointVersion = { + val cutOffCheckpoint = listExpiredDeltaLogsInternal(latestCheckpoint.get.version - 1) + .filter(f => isCheckpointFile(f)) + .map(FileNames.checkpointVersion) + .foldLeft(Option.empty[Long]) { (a, c) => if (a.getOrElse(-1L) < c) { Some(c) } else { a } } + + logInfo("Cut off checkpoint version: " + cutOffCheckpoint) + + recordDeltaEvent(this, "delta.log.cleanup.stats", data = Map( + "cutOffCheckpoint" -> cutOffCheckpoint.getOrElse(-1L), + "latestCheckpoint" -> latestCheckpoint.get.version)) + + if(cutOffCheckpoint.isEmpty) { + logInfo("No expired checkpoint file found. Returning empty iterator.") + return Iterator.empty + } else { + // Math.min for sanity check + Math.min(cutOffCheckpoint.get, latestCheckpoint.get.version) - 1L + } + } + + listExpiredDeltaLogsInternal(latestExpiredCheckpointVersion) } /** diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala index 22acd234b94..ca64b05d593 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala @@ -507,6 +507,289 @@ class DeltaRetentionSuite extends QueryTest } } + (Seq(("Default", Seq.empty[(String, String)])) ++ CheckpointPolicy.ALL.map { + case CheckpointPolicy.Classic => + Seq( + ("Classic", Seq(DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey -> CheckpointPolicy.Classic.name)), + ("Multipart", Seq(DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey -> CheckpointPolicy.Classic.name, + DeltaSQLConf.DELTA_CHECKPOINT_PART_SIZE.key -> "1")) + ) + case CheckpointPolicy.V2 => + V2Checkpoint.Format.ALL_AS_STRINGS.map { v2CheckpointFormat => + (s"V2 $v2CheckpointFormat", + Seq(DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey -> CheckpointPolicy.V2.name, + DeltaSQLConf.CHECKPOINT_V2_TOP_LEVEL_FILE_FORMAT.key -> v2CheckpointFormat)) + } + }.flatten).foreach { case (chkConfigName, chkConfig) => + test(s"cleanup does not delete the latest checkpoint before the cutoff. Config: $chkConfigName.") { + withSQLConf(chkConfig: _*) { + withTempDir { tempDir => + val startTime = getStartTimeForRetentionTest + val clock = new ManualClock(startTime) + val actualTestStartTime = System.currentTimeMillis() + val tableReference = s"delta.`${tempDir.getCanonicalPath()}`" + val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath), clock) + val logPath = new File(log.logPath.toUri) + val minChksCount = if (chkConfigName == "Multipart") { 2 } else { 1 } + + // commit 0 + spark.sql( + s"""CREATE TABLE $tableReference (id Int) USING delta + | TBLPROPERTIES('delta.enableChangeDataFeed' = true) + """.stripMargin) + // Set time for commit 0 to ensure that the commits don't need timestamp adjustment. + val commit0Time = clock.getTimeMillis() + new File(FileNames.unsafeDeltaFile(log.logPath, 0).toUri).setLastModified(commit0Time) + new File(FileNames.checksumFile(log.logPath, 0).toUri).setLastModified(commit0Time) + + def commitNewVersion(version: Long) = { + spark.sql(s"INSERT INTO $tableReference VALUES (1)") + + val deltaFile = new File(FileNames.unsafeDeltaFile(log.logPath, version).toUri) + val time = clock.getTimeMillis() + version * 1000 + deltaFile.setLastModified(time) + val crcFile = new File(FileNames.checksumFile(log.logPath, version).toUri) + crcFile.setLastModified(time) + val chks = getCheckpointFiles(logPath) + .filter(f => FileNames.checkpointVersion(new Path(f.getCanonicalPath)) == version) + + if (version % 10 == 0) { + assert(chks.length >= minChksCount) + chks.foreach { chk => + assert(chk.exists()) + chk.setLastModified(time) + } + } else { assert(chks.isEmpty) } + } + + // Day 0: Add commits 1 to 15 --> creates 1 checkpoint at Day 0 for version 10 + (1L to 15L).foreach(commitNewVersion) + + // ensure that the checkpoint at version 10 exists + val checkpoint10Files = getCheckpointFiles(logPath) + .filter(f => FileNames.checkpointVersion(new Path(f.getCanonicalPath)) == 10) + assert(checkpoint10Files.length >= minChksCount) + assert(checkpoint10Files.forall(_.exists)) + val deltaFiles = (0 to 15).map { i => + new File(FileNames.unsafeDeltaFile(log.logPath, i).toUri) + } + deltaFiles.foreach { f => + assert(f.exists()) + } + + // Day 35: Add commits 16 to 25 --> creates a checkpoint at Day 35 for version 20 + clock.setTime(day(startTime, 35)) + (16L to 25L).foreach(commitNewVersion) + + assert(checkpoint10Files.forall(_.exists)) + deltaFiles.foreach { f => + assert(f.exists()) + } + + // auto cleanup is disabled in DeltaRetentionSuiteBase so tests have control when it happens + cleanUpExpiredLogs(log) + + // assert that the checkpoint from day 0 (at version 10) and all the commits after + // that are still there + assert(checkpoint10Files.forall(_.exists)) + deltaFiles.foreach { f => + val version = FileNames.deltaVersion(new Path(f.toString())) + if (version < 10) { + assert(!f.exists, version) + } else { + assert(f.exists, version) + } + } + + // Validate we can time travel to version >=10 + val earliestExpectedChkVersion = 10 + (0 to 25).map { version => + val sqlCommand = s"SELECT * FROM $tableReference VERSION AS OF $version" + if (version < earliestExpectedChkVersion) { + val ex = intercept[org.apache.spark.sql.delta.VersionNotFoundException] { + spark.sql(sqlCommand).collect() + } + assert(ex.userVersion === version) + assert(ex.earliest === earliestExpectedChkVersion) + assert(ex.latest === 25) + } else { + spark.sql(sqlCommand).collect() + } + } + + // Validate CDF - SELECT * FROM table_changes_by_path('table', X, Y) + (0 to 24).map { version => + val sqlCommand = s"SELECT * FROM table_changes_by_path('${tempDir.getCanonicalPath}', $version, 25)" + if (version < earliestExpectedChkVersion) { + val ex = intercept[org.apache.spark.sql.delta.DeltaFileNotFoundException] { + spark.sql(sqlCommand).collect() + } + } else { + spark.sql(sqlCommand).collect() + } + } + } + } + } + } + + test(s"cleanup does not delete the logs if no checkpoints exist") { + withTempDir { tempDir => + val startTime = getStartTimeForRetentionTest + val clock = new ManualClock(startTime) + val actualTestStartTime = System.currentTimeMillis() + val tableReference = s"delta.`${tempDir.getCanonicalPath()}`" + val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath), clock) + val logPath = new File(log.logPath.toUri) + + // commit 0 + spark.sql( + s"""CREATE TABLE $tableReference (id Int) USING delta + | TBLPROPERTIES('delta.enableChangeDataFeed' = true) + """.stripMargin) + // Set time for commit 0 to ensure that the commits don't need timestamp adjustment. + val commit0Time = clock.getTimeMillis() + new File(FileNames.unsafeDeltaFile(log.logPath, 0).toUri).setLastModified(commit0Time) + new File(FileNames.checksumFile(log.logPath, 0).toUri).setLastModified(commit0Time) + + def commitNewVersion(version: Long) = { + spark.sql(s"INSERT INTO $tableReference VALUES (1)") + + val deltaFile = new File(FileNames.unsafeDeltaFile(log.logPath, version).toUri) + val time = clock.getTimeMillis() + version * 1000 + deltaFile.setLastModified(time) + val crcFile = new File(FileNames.checksumFile(log.logPath, version).toUri) + crcFile.setLastModified(time) + } + + // Day 0: Add commits 1 to 9 + (1L to 9L).foreach(commitNewVersion) + + // Day 35: Add commit 10 --> creates a checkpoint at Day 35 for version 10 + clock.setTime(day(startTime, 35)) + commitNewVersion(10L) + + // checkpoint is created + val checkpoint10Files = getCheckpointFiles(logPath) + .filter(f => FileNames.checkpointVersion(new Path(f.getCanonicalPath)) == 10) + assert(checkpoint10Files.length == 1) + assert(checkpoint10Files.forall(_.exists)) + + val deltaFiles = (0 to 10).map { i => + new File(FileNames.unsafeDeltaFile(log.logPath, i).toUri) + } + + // ensure that the versions 0 to 10 exists before the cleanup + deltaFiles.foreach { f => + assert(f.exists()) + } + + cleanUpExpiredLogs(log) + + // ensure that the versions 0 to 9 still exists after the checkpoint + deltaFiles.foreach { f => + val version = FileNames.deltaVersion(new Path(f.toString())) + assert(f.exists, version) + } + + // Validate we can time travel to all versions + val earliestExpectedChkVersion = 0 + (0 to 10).map { version => + val sqlCommand = s"SELECT * FROM $tableReference VERSION AS OF $version" + if (version < earliestExpectedChkVersion) { + val ex = intercept[org.apache.spark.sql.delta.VersionNotFoundException] { + spark.sql(sqlCommand).collect() + } + assert(ex.userVersion === version) + assert(ex.earliest === earliestExpectedChkVersion) + assert(ex.latest === 10) + } else { + spark.sql(sqlCommand).collect() + } + } + + // Validate CDF - SELECT * FROM table_changes_by_path('table', X, Y) + (0 to 9).map { version => + val sqlCommand = s"SELECT * FROM table_changes_by_path('${tempDir.getCanonicalPath}', $version, 10)" + if (version < earliestExpectedChkVersion) { + val ex = intercept[org.apache.spark.sql.delta.DeltaFileNotFoundException] { + spark.sql(sqlCommand).collect() + } + } else { + spark.sql(sqlCommand).collect() + } + } + } + } + + test(s"cleanup does not delete the latest checkpoint before the cutoff, even if not strictly required") { + withTempDir { tempDir => + val startTime = getStartTimeForRetentionTest + val clock = new ManualClock(startTime) + val actualTestStartTime = System.currentTimeMillis() + val tableReference = s"delta.`${tempDir.getCanonicalPath()}`" + val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath), clock) + val logPath = new File(log.logPath.toUri) + + // commit 0 + spark.sql(s"CREATE TABLE $tableReference (id Int) USING delta") + // Set time for commit 0 to ensure that the commits don't need timestamp adjustment. + val commit0Time = clock.getTimeMillis() + new File(FileNames.unsafeDeltaFile(log.logPath, 0).toUri).setLastModified(commit0Time) + new File(FileNames.checksumFile(log.logPath, 0).toUri).setLastModified(commit0Time) + + def commitNewVersion(version: Long) = { + spark.sql(s"INSERT INTO $tableReference VALUES (1)") + + val deltaFile = new File(FileNames.unsafeDeltaFile(log.logPath, version).toUri) + val time = clock.getTimeMillis() + version * 1000 + deltaFile.setLastModified(time) + val crcFile = new File(FileNames.checksumFile(log.logPath, version).toUri) + crcFile.setLastModified(time) + } + + // Day 0: Add commits 1 to 9 + (1L to 9L).foreach(commitNewVersion) + + // Creates a checkpoint for version 9 + log.checkpoint() + val chk9 = getCheckpointFiles(logPath) + .filter(f => FileNames.checkpointVersion(new Path(f.getCanonicalPath)) == 9).head + assert(chk9.exists()) + chk9.setLastModified(clock.getTimeMillis() + 9 * 1000) + + // Day 35: Add commit 10 --> creates a checkpoint at Day 35 for version 10 + clock.setTime(day(startTime, 35)) + commitNewVersion(10L) + + // checkpoint is created + val chk10 = getCheckpointFiles(logPath) + .filter(f => FileNames.checkpointVersion(new Path(f.getCanonicalPath)) == 10).head + assert(chk10.exists()) + + val lastExpiredCheckpoint = 9 + + val deltaFiles = (lastExpiredCheckpoint to 10).map { i => + new File(FileNames.unsafeDeltaFile(log.logPath, i).toUri) + } + + // ensure that the versions 0 to 10 exists before the cleanup + deltaFiles.foreach { f => + assert(f.exists()) + } + + cleanUpExpiredLogs(log) + + // ensure that the version 0-9, including the checkpoint 9, + // still exists after the checkpoint 10 is created + deltaFiles.foreach { f => + val version = FileNames.deltaVersion(new Path(f.toString())) + assert(f.exists, version) + } + assert(chk9.exists) + } + } + test("Metadata cleanup respects requireCheckpointProtectionBeforeVersion") { // Commits should be cleaned up to the latest checkpoint. testRequireCheckpointProtectionBeforeVersion(