Skip to content

Commit 2e4eae2

Browse files
committed
first
1 parent a6db4e0 commit 2e4eae2

File tree

3 files changed

+145
-67
lines changed

3 files changed

+145
-67
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala

+7-5
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
283283
val defaultType = if (isLiteVacuumEnabled) VacuumType.LITE else VacuumType.FULL
284284
val vacuumType = vacuumTypeOpt.map(VacuumType.withName).getOrElse(defaultType)
285285
val latestCommitVersionOutsideOfRetentionWindowOpt: Option[Long] =
286-
if (vacuumType == VacuumType.LITE) {
286+
if (spark.sessionState.conf.getConf(DeltaSQLConf.PERSIST_VACUUM_INFO_ENABLED)) {
287287
try {
288288
val timestamp = new Timestamp(deleteBeforeTimestamp)
289289
val commit = new DeltaHistoryManager(deltaLog).getActiveCommitAtTime(
@@ -529,10 +529,12 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
529529
// from 0
530530
// 2. Last vacuum info is there but metadata cleanup has cleaned up commit files since
531531
// the last Vacuum's latest commit version outside of the retention window.
532-
if (earliestCommitVersion != 0 &&
533-
latestCommitVersionOutsideOfRetentionWindowAsOfLastVacuumOpt
534-
.forall(_ < earliestCommitVersion)) {
535-
throw DeltaErrors.deltaCannotVacuumLite()
532+
if (spark.conf.get(DatabricksSQLConf.LITE_VACUUM_ERROR_ENABLED)) {
533+
if (earliestCommitVersion != 0 &&
534+
latestCommitVersionOutsideOfRetentionWindowAsOfLastVacuumOpt
535+
.forall(_ < earliestCommitVersion)) {
536+
throw DeltaErrors.deltaCannotVacuumLite()
537+
}
536538
}
537539

538540
// The start and the end commit versions give the range of commit files we want to look into

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala

+7
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,13 @@ trait DeltaSQLConfBase {
536536
.booleanConf
537537
.createWithDefault(false)
538538

539+
val PERSIST_VACUUM_INFO_ENABLED =
540+
buildConf("vacuum.info.persist.enabled")
541+
.doc("computes latestCommitVersionOutsideOfRetentionWindow and persists it in vacuum" +
542+
" info file even for FULL vacuum ")
543+
.booleanConf
544+
.createWithDefault(true)
545+
539546
val DELTA_VACUUM_RETENTION_CHECK_ENABLED =
540547
buildConf("retentionDurationCheck.enabled")
541548
.doc("Adds a check preventing users from running vacuum with a very short retention " +

spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala

+131-62
Original file line numberDiff line numberDiff line change
@@ -1486,6 +1486,53 @@ class DeltaVacuumSuite extends DeltaVacuumSuiteBase with DeltaSQLCommandTest {
14861486
}
14871487
}
14881488
}
1489+
1490+
test(s"Ensure vacuum info is persisted even after full Vacuum") {
1491+
val tableName = "testTable"
1492+
withTable(tableName) {
1493+
withSQLConf(
1494+
DeltaSQLConf.DELTA_VACUUM_RETENTION_CHECK_ENABLED.key -> "false",
1495+
DeltaSQLConf.LITE_VACUUM_ENABLED.key -> "false"
1496+
) {
1497+
spark.range(0, 50, step = 1, numPartitions = 5).write.format("delta")
1498+
.saveAsTable(tableName) // version 0
1499+
spark.sql(s"DELETE from $tableName") // version 1
1500+
spark.range(0, 50, step = 1, numPartitions = 5).write.format("delta").mode("append")
1501+
.saveAsTable(tableName) // version 2
1502+
1503+
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tableName))
1504+
val basePath = deltaLog.dataPath.toString
1505+
val table = DeltaTableV2(spark, new Path(basePath))
1506+
1507+
assertNumFiles(deltaLog, addFiles = 5, addFilesWithDVs = 0, dvFiles = 0,
1508+
dataFiles = 10)
1509+
1510+
// Vacuums until version 2 and persists that info in last Vacuum info
1511+
spark.sql(s"Vacuum $tableName FULL RETAIN 0 HOURS")
1512+
assertNumFiles(deltaLog, addFiles = 5, addFilesWithDVs = 0, dvFiles = 0,
1513+
dataFiles = 5)
1514+
1515+
spark.sql(s"DELETE from $tableName") // version 3
1516+
1517+
spark.range(0, 50, step = 1, numPartitions = 5).write.format("delta").mode("append")
1518+
.saveAsTable(tableName) // version 4
1519+
1520+
assertNumFiles(deltaLog, addFiles = 5, addFilesWithDVs = 0, dvFiles = 0,
1521+
dataFiles = 10)
1522+
1523+
// Checkpoints will allow us to construct the table snapshot
1524+
deltaLog.createCheckpointAtVersion(2L)
1525+
deleteCommitFile(table, 0L) // delete version 0
1526+
deleteCommitFile(table, 1L) // delete version 1
1527+
1528+
// If vacuum info is not persisted after the full Vacuum, the following LITE vacuum
1529+
// would have failed.
1530+
spark.sql(s"Vacuum $tableName LITE RETAIN 0 HOURS")
1531+
assertNumFiles(deltaLog, addFiles = 5, addFilesWithDVs = 0, dvFiles = 0,
1532+
dataFiles = 5)
1533+
}
1534+
}
1535+
}
14891536
}
14901537

14911538
class DeltaVacuumWithCoordinatedCommitsBatch100Suite extends DeltaVacuumSuite {
@@ -1509,75 +1556,97 @@ class DeltaLiteVacuumSuite
15091556
super.afterAll()
15101557
}
15111558

1512-
test("lite vacuum not possible - commit 0 is missing") {
1513-
withSQLConf(
1514-
DeltaSQLConf.DELTA_VACUUM_RETENTION_CHECK_ENABLED.key -> "false"
1515-
) {
1516-
withTempDir { dir =>
1517-
// create table versions 0 and 1
1518-
spark.range(10)
1519-
.write
1520-
.format("delta")
1521-
.save(dir.getAbsolutePath)
1522-
spark.range(10)
1523-
.write
1524-
.format("delta")
1525-
.mode("append")
1526-
.save(dir.getAbsolutePath)
1527-
val deltaTable = io.delta.tables.DeltaTable.forPath(dir.getAbsolutePath)
1528-
val table = DeltaTableV2(spark, new Path(dir.getAbsolutePath))
1529-
deltaTable.delete()
1530-
// Checkpoints will allow us to construct the table snapshot
1531-
table.deltaLog.createCheckpointAtVersion(2L)
1532-
deleteCommitFile(table, 0L) // delete version 0
1533-
1534-
val e = intercept[DeltaIllegalStateException] {
1535-
VacuumCommand.gc(spark, table, dryRun = true, retentionHours = Some(0))
1559+
Seq(true, false).foreach { errorEnabled =>
1560+
test(s"lite vacuum not possible - commit 0 is missing: errorEnabled: $errorEnabled ") {
1561+
withSQLConf(
1562+
DeltaSQLConf.DELTA_VACUUM_RETENTION_CHECK_ENABLED.key -> "false",
1563+
DatabricksSQLConf.LITE_VACUUM_ERROR_ENABLED.key -> errorEnabled.toString
1564+
) {
1565+
withTempDir { dir =>
1566+
// create table versions 0 and 1
1567+
spark.range(10)
1568+
.write
1569+
.format("delta")
1570+
.save(dir.getAbsolutePath)
1571+
spark.range(10)
1572+
.write
1573+
.format("delta")
1574+
.mode("append")
1575+
.save(dir.getAbsolutePath)
1576+
val deltaTable = io.delta.tables.DeltaTable.forPath(dir.getAbsolutePath)
1577+
val table = DeltaTableV2(spark, new Path(dir.getAbsolutePath))
1578+
deltaTable.delete()
1579+
assertNumFiles(table.deltaLog, addFiles = 0, addFilesWithDVs = 0, dvFiles = 0,
1580+
dataFiles = 4)
1581+
// Checkpoints will allow us to construct the table snapshot
1582+
table.deltaLog.createCheckpointAtVersion(2L)
1583+
deleteCommitFile(table, 0L) // delete version 0
1584+
1585+
if (errorEnabled) {
1586+
val e = intercept[DeltaIllegalStateException] {
1587+
VacuumCommand.gc(spark, table, dryRun = false, retentionHours = Some(0))
1588+
}
1589+
assert(e.getMessage.contains("VACUUM LITE cannot delete all eligible files as some files" +
1590+
" are not referenced by the Delta log. Please run VACUUM FULL."))
1591+
} else {
1592+
VacuumCommand.gc(spark, table, dryRun = false, retentionHours = Some(0))
1593+
assertNumFiles(table.deltaLog, addFiles = 0, addFilesWithDVs = 0, dvFiles = 0,
1594+
dataFiles = 0)
1595+
}
15361596
}
1537-
assert(e.getMessage.contains("VACUUM LITE cannot delete all eligible files as some files" +
1538-
" are not referenced by the Delta log. Please run VACUUM FULL."))
15391597
}
15401598
}
15411599
}
15421600

1543-
test("lite vacuum not possible - commits since last vacuum is missing") {
1544-
withSQLConf(
1545-
DeltaSQLConf.DELTA_VACUUM_RETENTION_CHECK_ENABLED.key -> "false"
1546-
) {
1547-
withTempDir { dir =>
1548-
// create table - version 0
1549-
spark.range(10)
1550-
.write
1551-
.format("delta")
1552-
.save(dir.getAbsolutePath)
1553-
val deltaTable = io.delta.tables.DeltaTable.forPath(dir.getAbsolutePath)
1554-
val table = DeltaTableV2(spark, new Path(dir.getAbsolutePath))
1555-
deltaTable.delete() // version 1
1556-
// The following Vacuum saves latestCommitVersionOutsideOfRetentionWindow as 1
1557-
VacuumCommand.gc(spark, table, dryRun = false, retentionHours = Some(0))
1558-
spark.range(10)
1559-
.write
1560-
.format("delta")
1561-
.mode("append")
1562-
.save(dir.getAbsolutePath) // version 2
1563-
deltaTable.delete() // version 3
1564-
// Checkpoint will allow us to construct the table snapshot
1565-
table.deltaLog.createCheckpointAtVersion(3L)
1566-
// Deleting version 0 shouldn't fail the vacuum since
1567-
// latestCommitVersionOutsideOfRetentionWindow is already at 1
1568-
deleteCommitFile(table, 0L)// delete version 0.
1569-
VacuumCommand.gc(spark, table, dryRun = true, retentionHours = Some(0))
1570-
// Since commit versions 1 and 2 are required for lite vacuum, deleting them will
1571-
// fail the command.
1572-
for (i <- 1 to 2) {
1573-
deleteCommitFile(table, i)
1574-
}
1575-
1576-
val e = intercept[DeltaIllegalStateException] {
1601+
Seq(true, false).foreach { errorEnabled =>
1602+
test(s"lite vacuum not possible - commits since last vacuum is missing:" +
1603+
s" errorEnabled: $errorEnabled") {
1604+
withSQLConf(
1605+
DeltaSQLConf.DELTA_VACUUM_RETENTION_CHECK_ENABLED.key -> "false",
1606+
DatabricksSQLConf.LITE_VACUUM_ERROR_ENABLED.key -> errorEnabled.toString
1607+
) {
1608+
withTempDir { dir =>
1609+
// create table - version 0
1610+
spark.range(10)
1611+
.write
1612+
.format("delta")
1613+
.save(dir.getAbsolutePath)
1614+
val deltaTable = io.delta.tables.DeltaTable.forPath(dir.getAbsolutePath)
1615+
val table = DeltaTableV2(spark, new Path(dir.getAbsolutePath))
1616+
deltaTable.delete() // version 1
1617+
// The following Vacuum saves latestCommitVersionOutsideOfRetentionWindow as 1
1618+
VacuumCommand.gc(spark, table, dryRun = false, retentionHours = Some(0))
1619+
spark.range(10)
1620+
.write
1621+
.format("delta")
1622+
.mode("append")
1623+
.save(dir.getAbsolutePath) // version 2
1624+
deltaTable.delete() // version 3
1625+
// Checkpoint will allow us to construct the table snapshot
1626+
table.deltaLog.createCheckpointAtVersion(3L)
1627+
assertNumFiles(table.deltaLog, addFiles = 0, addFilesWithDVs = 0, dvFiles = 0,
1628+
dataFiles = 2)
1629+
// Deleting version 0 shouldn't fail the vacuum since
1630+
// latestCommitVersionOutsideOfRetentionWindow is already at 1
1631+
deleteCommitFile(table, 0L) // delete version 0.
15771632
VacuumCommand.gc(spark, table, dryRun = true, retentionHours = Some(0))
1633+
// Since commit versions 1 and 2 are required for lite vacuum, deleting them will
1634+
// fail the command.
1635+
for (i <- 1 to 2) {
1636+
deleteCommitFile(table, i)
1637+
}
1638+
if (errorEnabled) {
1639+
val e = intercept[DeltaIllegalStateException] {
1640+
VacuumCommand.gc(spark, table, dryRun = false, retentionHours = Some(0))
1641+
}
1642+
assert(e.getMessage.contains("VACUUM LITE cannot delete all eligible files as some files" +
1643+
" are not referenced by the Delta log. Please run VACUUM FULL."))
1644+
} else {
1645+
VacuumCommand.gc(spark, table, dryRun = false, retentionHours = Some(0))
1646+
assertNumFiles(table.deltaLog, addFiles = 0, addFilesWithDVs = 0, dvFiles = 0,
1647+
dataFiles = 0)
1648+
}
15781649
}
1579-
assert(e.getMessage.contains("VACUUM LITE cannot delete all eligible files as some files" +
1580-
" are not referenced by the Delta log. Please run VACUUM FULL."))
15811650
}
15821651
}
15831652
}

0 commit comments

Comments
 (0)