Skip to content

Commit 436852b

Browse files
committed
Update to new table_configuration
1 parent f3f26fe commit 436852b

File tree

3 files changed

+35
-17
lines changed

3 files changed

+35
-17
lines changed

kernel/src/table_changes/log_replay.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,12 @@ fn process_cdf_commit(
189189
(p, m) => {
190190
let p = p.unwrap_or_else(|| table_configuration.protocol().clone());
191191
let m = m.unwrap_or_else(|| table_configuration.metadata().clone());
192-
*table_configuration = TableConfiguration::try_new(m, p)?;
192+
*table_configuration = TableConfiguration::try_new(
193+
m,
194+
p,
195+
table_configuration.table_root().clone(),
196+
commit_file.version,
197+
)?;
193198
if !table_configuration.is_cdf_read_supported() {
194199
return Err(Error::change_data_feed_unsupported(commit_file.version));
195200
}

kernel/src/table_changes/log_replay/tests.rs

+28-14
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ fn get_schema() -> StructType {
2828
])
2929
}
3030

31-
fn table_config() -> TableConfiguration {
31+
fn table_config(path: &Path) -> TableConfiguration {
32+
let table_root = url::Url::from_directory_path(path).unwrap();
3233
let schema_string = serde_json::to_string(&get_schema()).unwrap();
3334
let metadata = Metadata {
3435
schema_string,
@@ -49,7 +50,7 @@ fn table_config() -> TableConfiguration {
4950
Some([WriterFeatures::ColumnMapping]),
5051
)
5152
.unwrap();
52-
TableConfiguration::try_new(metadata, protocol).unwrap()
53+
TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap()
5354
}
5455

5556
fn get_segment(
@@ -111,8 +112,9 @@ async fn metadata_protocol() {
111112
.unwrap()
112113
.into_iter();
113114

115+
let table_config = table_config(mock_table.table_root());
114116
let scan_batches =
115-
table_changes_action_iter(engine, commits, get_schema().into(), None, table_config())
117+
table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
116118
.unwrap();
117119
let sv = result_to_sv(scan_batches);
118120
assert_eq!(sv, &[false, false]);
@@ -137,8 +139,9 @@ async fn cdf_not_enabled() {
137139
.unwrap()
138140
.into_iter();
139141

142+
let table_config = table_config(mock_table.table_root());
140143
let res: DeltaResult<Vec<_>> =
141-
table_changes_action_iter(engine, commits, get_schema().into(), None, table_config())
144+
table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
142145
.unwrap()
143146
.try_collect();
144147

@@ -168,8 +171,9 @@ async fn unsupported_reader_feature() {
168171
.unwrap()
169172
.into_iter();
170173

174+
let table_config = table_config(mock_table.table_root());
171175
let res: DeltaResult<Vec<_>> =
172-
table_changes_action_iter(engine, commits, get_schema().into(), None, table_config())
176+
table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
173177
.unwrap()
174178
.try_collect();
175179

@@ -199,8 +203,9 @@ async fn column_mapping_should_fail() {
199203
.unwrap()
200204
.into_iter();
201205

206+
let table_config = table_config(mock_table.table_root());
202207
let res: DeltaResult<Vec<_>> =
203-
table_changes_action_iter(engine, commits, get_schema().into(), None, table_config())
208+
table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
204209
.unwrap()
205210
.try_collect();
206211

@@ -230,8 +235,9 @@ async fn incompatible_schemas_fail() {
230235
.unwrap()
231236
.into_iter();
232237

238+
let table_config = table_config(mock_table.table_root());
233239
let res: DeltaResult<Vec<_>> =
234-
table_changes_action_iter(engine, commits, cdf_schema.into(), None, table_config())
240+
table_changes_action_iter(engine, commits, cdf_schema.into(), None, table_config)
235241
.unwrap()
236242
.try_collect();
237243

@@ -320,7 +326,8 @@ async fn add_remove() {
320326
.unwrap()
321327
.into_iter();
322328

323-
let sv = table_changes_action_iter(engine, commits, get_schema().into(), None, table_config())
329+
let table_config = table_config(mock_table.table_root());
330+
let sv = table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
324331
.unwrap()
325332
.flat_map(|scan_data| {
326333
let scan_data = scan_data.unwrap();
@@ -370,7 +377,8 @@ async fn filter_data_change() {
370377
.unwrap()
371378
.into_iter();
372379

373-
let sv = table_changes_action_iter(engine, commits, get_schema().into(), None, table_config())
380+
let table_config = table_config(mock_table.table_root());
381+
let sv = table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
374382
.unwrap()
375383
.flat_map(|scan_data| {
376384
let scan_data = scan_data.unwrap();
@@ -416,7 +424,8 @@ async fn cdc_selection() {
416424
.unwrap()
417425
.into_iter();
418426

419-
let sv = table_changes_action_iter(engine, commits, get_schema().into(), None, table_config())
427+
let table_config = table_config(mock_table.table_root());
428+
let sv = table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
420429
.unwrap()
421430
.flat_map(|scan_data| {
422431
let scan_data = scan_data.unwrap();
@@ -482,7 +491,9 @@ async fn dv() {
482491
},
483492
)])
484493
.into();
485-
let sv = table_changes_action_iter(engine, commits, get_schema().into(), None, table_config())
494+
495+
let table_config = table_config(mock_table.table_root());
496+
let sv = table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
486497
.unwrap()
487498
.flat_map(|scan_data| {
488499
let scan_data = scan_data.unwrap();
@@ -559,12 +570,13 @@ async fn data_skipping_filter() {
559570
.unwrap()
560571
.into_iter();
561572

573+
let table_config = table_config(mock_table.table_root());
562574
let sv = table_changes_action_iter(
563575
engine,
564576
commits,
565577
logical_schema.into(),
566578
predicate,
567-
table_config(),
579+
table_config,
568580
)
569581
.unwrap()
570582
.flat_map(|scan_data| {
@@ -610,8 +622,9 @@ async fn failing_protocol() {
610622
.unwrap()
611623
.into_iter();
612624

625+
let table_config = table_config(mock_table.table_root());
613626
let res: DeltaResult<Vec<_>> =
614-
table_changes_action_iter(engine, commits, get_schema().into(), None, table_config())
627+
table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
615628
.unwrap()
616629
.try_collect();
617630

@@ -637,11 +650,12 @@ async fn file_meta_timestamp() {
637650

638651
let commit = commits.next().unwrap();
639652
let file_meta_ts = commit.location.last_modified;
653+
let mut table_config = table_config(mock_table.table_root());
640654
let processed_commit = process_cdf_commit(
641655
engine.as_ref(),
642656
commit,
643657
&get_schema().into(),
644-
&mut table_config(),
658+
&mut table_config,
645659
)
646660
.unwrap();
647661
assert_eq!(processed_commit.timestamp, file_meta_ts);

kernel/src/table_changes/scan_file.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,6 @@ mod tests {
251251
use crate::schema::{DataType, StructField, StructType};
252252
use crate::table_changes::log_replay::table_changes_action_iter;
253253
use crate::table_configuration::TableConfiguration;
254-
use crate::table_features::{ReaderFeatures, WriterFeatures};
255254
use crate::utils::test_utils::{Action, LocalMockTable};
256255
use crate::Engine;
257256

@@ -361,7 +360,7 @@ mod tests {
361360
Some::<Vec<String>>(vec![]),
362361
)
363362
.unwrap();
364-
let table_config = TableConfiguration::try_new(metadata, protocol).unwrap();
363+
let table_config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap();
365364

366365
let scan_data = table_changes_action_iter(
367366
Arc::new(engine),

0 commit comments

Comments
 (0)