File tree 2 files changed +39
-3
lines changed
2 files changed +39
-3
lines changed Original file line number Diff line number Diff line change @@ -28,6 +28,12 @@ static CHECKPOINT_FILE_PATTERN: LazyLock<Regex> =
28
28
static DELTA_FILE_PATTERN : LazyLock < Regex > = LazyLock :: new ( || Regex :: new ( r"^\d+\.json$" ) . unwrap ( ) ) ;
29
29
static CRC_FILE_PATTERN : LazyLock < Regex > =
30
30
LazyLock :: new ( || Regex :: new ( r"^(\.\d+(\.crc|\.json)|\d+)\.crc$" ) . unwrap ( ) ) ;
31
+ static LAST_CHECKPOINT_FILE_PATTERN : LazyLock < Regex > =
32
+ LazyLock :: new ( || Regex :: new ( r"^_last_checkpoint$" ) . unwrap ( ) ) ;
33
+ static LAST_VACUUM_INFO_FILE_PATTERN : LazyLock < Regex > =
34
+ LazyLock :: new ( || Regex :: new ( r"^_last_vacuum_info$" ) . unwrap ( ) ) ;
35
+ static DELETION_VECTOR_FILE_PATTERN : LazyLock < Regex > =
36
+ LazyLock :: new ( || Regex :: new ( r".*\.bin$" ) . unwrap ( ) ) ;
31
37
pub ( super ) static TOMBSTONE_SCHEMA : LazyLock < StructType > =
32
38
LazyLock :: new ( || StructType :: new ( vec ! [ ActionType :: Remove . schema_field( ) . clone( ) ] ) ) ;
33
39
@@ -66,6 +72,24 @@ pub(crate) trait PathExt {
66
72
. map ( |name| CRC_FILE_PATTERN . captures ( name) . is_some ( ) )
67
73
. unwrap ( )
68
74
}
75
+
76
+ fn is_last_checkpoint_file ( & self ) -> bool {
77
+ self . filename ( )
78
+ . map ( |name| LAST_CHECKPOINT_FILE_PATTERN . captures ( name) . is_some ( ) )
79
+ . unwrap_or ( false )
80
+ }
81
+
82
+ fn is_last_vacuum_info_file ( & self ) -> bool {
83
+ self . filename ( )
84
+ . map ( |name| LAST_VACUUM_INFO_FILE_PATTERN . captures ( name) . is_some ( ) )
85
+ . unwrap_or ( false )
86
+ }
87
+
88
+ fn is_deletion_vector_file ( & self ) -> bool {
89
+ self . filename ( )
90
+ . map ( |name| DELETION_VECTOR_FILE_PATTERN . captures ( name) . is_some ( ) )
91
+ . unwrap_or ( false )
92
+ }
69
93
}
70
94
71
95
impl PathExt for Path {
Original file line number Diff line number Diff line change @@ -267,11 +267,23 @@ pub trait LogStore: Send + Sync + AsAny {
267
267
while let Some ( res) = stream. next ( ) . await {
268
268
match res {
269
269
Ok ( meta) => {
270
- // crc files are valid files according to the protocol
271
- if meta. location . is_crc_file ( ) {
270
+ // Valid but optional files.
271
+ if meta. location . is_crc_file ( )
272
+ || meta. location . is_last_checkpoint_file ( )
273
+ || meta. location . is_last_vacuum_info_file ( )
274
+ || meta. location . is_deletion_vector_file ( )
275
+ {
272
276
continue ;
273
277
}
274
- return Ok ( meta. location . is_commit_file ( ) || meta. location . is_checkpoint_file ( ) ) ;
278
+ let is_valid =
279
+ meta. location . is_commit_file ( ) || meta. location . is_checkpoint_file ( ) ;
280
+ if !is_valid {
281
+ warn ! (
282
+ "Expected a valid delta file. Found {}" ,
283
+ meta. location. filename( ) . unwrap_or( "<empty>" )
284
+ )
285
+ }
286
+ return Ok ( is_valid) ;
275
287
}
276
288
Err ( ObjectStoreError :: NotFound { .. } ) => return Ok ( false ) ,
277
289
Err ( err) => return Err ( err. into ( ) ) ,
You can’t perform that action at this time.
0 commit comments