@@ -164,7 +164,7 @@ impl LogReplayScanner {
164
164
let mut add_paths = HashSet :: default ( ) ;
165
165
let mut has_cdc_action = false ;
166
166
let mut timestamp = commit_file. location . last_modified ;
167
- for actions in action_iter {
167
+ for ( i , actions) in action_iter. enumerate ( ) {
168
168
let actions = actions?;
169
169
170
170
let mut visitor = PreparePhaseVisitor {
@@ -174,6 +174,7 @@ impl LogReplayScanner {
174
174
commit_timestamp : & mut timestamp,
175
175
protocol : None ,
176
176
metadata_info : None ,
177
+ is_first_batch : i == 0 ,
177
178
} ;
178
179
visitor. visit_rows_of ( actions. as_ref ( ) ) ?;
179
180
@@ -276,6 +277,7 @@ struct PreparePhaseVisitor<'a> {
276
277
add_paths : & ' a mut HashSet < String > ,
277
278
remove_dvs : & ' a mut HashMap < String , DvInfo > ,
278
279
commit_timestamp : & ' a mut i64 ,
280
+ is_first_batch : bool ,
279
281
}
280
282
impl PreparePhaseVisitor < ' _ > {
281
283
fn schema ( ) -> Arc < StructType > {
@@ -362,7 +364,9 @@ impl RowVisitor for PreparePhaseVisitor<'_> {
362
364
} else if let Some ( timestamp) =
363
365
getters[ 16 ] . get_long ( i, "commitInfo.inCommitTimestamp" ) ?
364
366
{
365
- * self . commit_timestamp = timestamp;
367
+ if self . is_first_batch && i == 0 {
368
+ * self . commit_timestamp = timestamp;
369
+ }
366
370
}
367
371
}
368
372
Ok ( ( ) )
0 commit comments