Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Make CDF use TableConfiguration and refactor log replay #645

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 12 additions & 22 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
@@ -37,30 +37,20 @@ impl MetadataVisitor {
getters.len()
))
);
let name: Option<String> = getters[1].get_opt(row_index, "metadata.name")?;
let description: Option<String> = getters[2].get_opt(row_index, "metadata.description")?;
// get format out of primitives
let format_provider: String = getters[3].get(row_index, "metadata.format.provider")?;
// options for format is always empty, so skip getters[4]
let schema_string: String = getters[5].get(row_index, "metadata.schema_string")?;
let partition_columns: Vec<_> = getters[6].get(row_index, "metadata.partition_list")?;
let created_time: Option<i64> = getters[7].get_opt(row_index, "metadata.created_time")?;
let configuration_map_opt: Option<HashMap<_, _>> =
getters[8].get_opt(row_index, "metadata.configuration")?;
let configuration = configuration_map_opt.unwrap_or_else(HashMap::new);

let configuration_map_opt = getters[8].get_opt(row_index, "metaData.configuration")?;
Ok(Metadata {
id,
name,
description,
name: getters[1].get_opt(row_index, "metaData.name")?,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting i just noticed we had all lowercase before.. guess we are case-insensitive?

description: getters[2].get_opt(row_index, "metaData.description")?,
format: Format {
provider: format_provider,
options: HashMap::new(),
provider: getters[3].get(row_index, "metaData.format.provider")?,
options: HashMap::new(), // options for format is always empty, so skip getters[4]
},
schema_string,
partition_columns,
created_time,
configuration,
schema_string: getters[5].get(row_index, "metaData.schemaString")?,
partition_columns: getters[6].get(row_index, "metaData.partitionColumns")?,
created_time: getters[7].get_opt(row_index, "metaData.created_time")?,
configuration: configuration_map_opt.unwrap_or_else(HashMap::new),
})
}
}
@@ -75,9 +65,9 @@ impl RowVisitor for MetadataVisitor {
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
for i in 0..row_count {
// Since id column is required, use it to detect presence of a metadata action
if let Some(id) = getters[0].get_opt(i, "metadata.id")? {
if let Some(id) = getters[0].get_opt(i, "metaData.id")? {
self.metadata = Some(Self::visit_metadata(i, id, getters)?);
break;
break; // A commit has at most one metaData action
}
}
Ok(())
@@ -159,7 +149,7 @@ impl RowVisitor for ProtocolVisitor {
// Since minReaderVersion column is required, use it to detect presence of a Protocol action
if let Some(mrv) = getters[0].get_opt(i, "protocol.min_reader_version")? {
self.protocol = Some(Self::visit_protocol(i, mrv, getters)?);
break;
break; // A commit has at most one Protocol action
}
}
Ok(())
437 changes: 227 additions & 210 deletions kernel/src/table_changes/log_replay.rs

Large diffs are not rendered by default.

95 changes: 73 additions & 22 deletions kernel/src/table_changes/log_replay/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::table_changes_action_iter;
use super::TableChangesScanData;
use super::{process_cdf_commit, table_changes_action_iter};
use crate::actions::deletion_vector::DeletionVectorDescriptor;
use crate::actions::{Add, Cdc, Metadata, Protocol, Remove};
use crate::engine::sync::SyncEngine;
@@ -10,8 +10,8 @@ use crate::path::ParsedLogPath;
use crate::scan::state::DvInfo;
use crate::scan::PhysicalPredicate;
use crate::schema::{DataType, StructField, StructType};
use crate::table_changes::log_replay::LogReplayScanner;
use crate::table_features::ReaderFeatures;
use crate::table_configuration::TableConfiguration;
use crate::table_features::{ReaderFeatures, WriterFeatures};
use crate::utils::test_utils::{Action, LocalMockTable};
use crate::Expression;
use crate::{DeltaResult, Engine, Error, Version};
@@ -28,6 +28,31 @@ fn get_schema() -> StructType {
])
}

fn table_config(path: &Path) -> TableConfiguration {
let table_root = url::Url::from_directory_path(path).unwrap();
let schema_string = serde_json::to_string(&get_schema()).unwrap();
let metadata = Metadata {
schema_string,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why is Metadata infallible while Protocol is fallible? Seems like any number of things could go wrong with it, such as partition column names that aren't actually in the schema?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Part of me wonders if we should make a clean break between P&M as "plain old data" vs. table configuration as the one stop shop for validation of that raw data? So then, any time you see a P or M in isolation, you have to assume it's broken in arbitrary ways. It's only "for sure" self-consistent and valid if a TC successfully wrapped it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Metadata just doesn't have a constructor for kernel/rust data that does these checks. All it has is pub fn try_new_from_data(data: &dyn EngineData)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Part of me wonders if we should make a clean break between P&M as "plain old data" vs. table configuration as the one stop shop for validation of that raw data?

Ohhh so proposing that we move the validation that we do in Protocol::try_new to TC? Then this Metadata validation could also live there.

Moreover if we think of protocol as just "raw, unchecked data", I'm starting to wonder if we should move ensure_read_supported, has_writer_feature and has_reader_feature to TC?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... TC becomes the logical heart of the table, with P&M as simple raw information underneath

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for anyone reading this thread, I'm tracking table config stuff in #650

configuration: HashMap::from([
("delta.enableChangeDataFeed".to_string(), "true".to_string()),
(
"delta.enableDeletionVectors".to_string(),
"true".to_string(),
),
("delta.columnMapping.mode".to_string(), "none".to_string()),
]),
Comment on lines +36 to +42
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: I wish rust had better (more transparent) handling of String vs. &str... code like this gets so ugly and unwieldy.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep the to_string everywhere is kind of gross, but at least we're aware of where the allocations are happening

..Default::default()
};
let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeatures::DeletionVectors]),
Some([WriterFeatures::ColumnMapping]),
)
.unwrap();
TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap()
}

fn get_segment(
engine: &dyn Engine,
path: &Path,
@@ -87,8 +112,10 @@ async fn metadata_protocol() {
.unwrap()
.into_iter();

let table_config = table_config(mock_table.table_root());
let scan_batches =
table_changes_action_iter(engine, commits, get_schema().into(), None).unwrap();
table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
.unwrap();
let sv = result_to_sv(scan_batches);
assert_eq!(sv, &[false, false]);
}
@@ -112,8 +139,9 @@ async fn cdf_not_enabled() {
.unwrap()
.into_iter();

let table_config = table_config(mock_table.table_root());
let res: DeltaResult<Vec<_>> =
table_changes_action_iter(engine, commits, get_schema().into(), None)
table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
.unwrap()
.try_collect();

@@ -143,8 +171,9 @@ async fn unsupported_reader_feature() {
.unwrap()
.into_iter();

let table_config = table_config(mock_table.table_root());
let res: DeltaResult<Vec<_>> =
table_changes_action_iter(engine, commits, get_schema().into(), None)
table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
.unwrap()
.try_collect();

@@ -174,8 +203,9 @@ async fn column_mapping_should_fail() {
.unwrap()
.into_iter();

let table_config = table_config(mock_table.table_root());
let res: DeltaResult<Vec<_>> =
table_changes_action_iter(engine, commits, get_schema().into(), None)
table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
.unwrap()
.try_collect();

@@ -205,8 +235,9 @@ async fn incompatible_schemas_fail() {
.unwrap()
.into_iter();

let table_config = table_config(mock_table.table_root());
let res: DeltaResult<Vec<_>> =
table_changes_action_iter(engine, commits, cdf_schema.into(), None)
table_changes_action_iter(engine, commits, cdf_schema.into(), None, table_config)
.unwrap()
.try_collect();

@@ -295,7 +326,8 @@ async fn add_remove() {
.unwrap()
.into_iter();

let sv = table_changes_action_iter(engine, commits, get_schema().into(), None)
let table_config = table_config(mock_table.table_root());
let sv = table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
.unwrap()
.flat_map(|scan_data| {
let scan_data = scan_data.unwrap();
@@ -345,7 +377,8 @@ async fn filter_data_change() {
.unwrap()
.into_iter();

let sv = table_changes_action_iter(engine, commits, get_schema().into(), None)
let table_config = table_config(mock_table.table_root());
let sv = table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
.unwrap()
.flat_map(|scan_data| {
let scan_data = scan_data.unwrap();
@@ -391,7 +424,8 @@ async fn cdc_selection() {
.unwrap()
.into_iter();

let sv = table_changes_action_iter(engine, commits, get_schema().into(), None)
let table_config = table_config(mock_table.table_root());
let sv = table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
.unwrap()
.flat_map(|scan_data| {
let scan_data = scan_data.unwrap();
@@ -457,7 +491,9 @@ async fn dv() {
},
)])
.into();
let sv = table_changes_action_iter(engine, commits, get_schema().into(), None)

let table_config = table_config(mock_table.table_root());
let sv = table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
.unwrap()
.flat_map(|scan_data| {
let scan_data = scan_data.unwrap();
@@ -534,13 +570,20 @@ async fn data_skipping_filter() {
.unwrap()
.into_iter();

let sv = table_changes_action_iter(engine, commits, logical_schema.into(), predicate)
.unwrap()
.flat_map(|scan_data| {
let scan_data = scan_data.unwrap();
scan_data.selection_vector
})
.collect_vec();
let table_config = table_config(mock_table.table_root());
let sv = table_changes_action_iter(
engine,
commits,
logical_schema.into(),
predicate,
table_config,
)
.unwrap()
.flat_map(|scan_data| {
let scan_data = scan_data.unwrap();
scan_data.selection_vector
})
.collect_vec();

// Note: since the first pair is a dv operation, remove action will always be filtered
assert_eq!(sv, &[false, true, false, false, true]);
@@ -579,8 +622,9 @@ async fn failing_protocol() {
.unwrap()
.into_iter();

let table_config = table_config(mock_table.table_root());
let res: DeltaResult<Vec<_>> =
table_changes_action_iter(engine, commits, get_schema().into(), None)
table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
.unwrap()
.try_collect();

@@ -606,6 +650,13 @@ async fn file_meta_timestamp() {

let commit = commits.next().unwrap();
let file_meta_ts = commit.location.last_modified;
let scanner = LogReplayScanner::try_new(engine.as_ref(), commit, &get_schema().into()).unwrap();
assert_eq!(scanner.timestamp, file_meta_ts);
let mut table_config = table_config(mock_table.table_root());
let processed_commit = process_cdf_commit(
engine.as_ref(),
commit,
&get_schema().into(),
&mut table_config,
)
.unwrap();
assert_eq!(processed_commit.timestamp, file_meta_ts);
}
54 changes: 5 additions & 49 deletions kernel/src/table_changes/mod.rs
Original file line number Diff line number Diff line change
@@ -31,20 +31,16 @@
//! let table_change_batches = table_changes_scan.execute(engine.clone())?;
//! # Ok::<(), Error>(())
//! ```
use std::collections::HashSet;
use std::sync::{Arc, LazyLock};

use scan::TableChangesScanBuilder;
use url::Url;

use crate::actions::{ensure_supported_features, Protocol};
use crate::log_segment::LogSegment;
use crate::path::AsUrl;
use crate::schema::{DataType, Schema, StructField, StructType};
use crate::snapshot::Snapshot;
use crate::table_features::{ColumnMappingMode, ReaderFeatures};
use crate::table_properties::TableProperties;
use crate::utils::require;
use crate::table_configuration::TableConfiguration;
use crate::{DeltaResult, Engine, Error, Version};

mod log_replay;
@@ -114,6 +110,7 @@ pub struct TableChanges {
end_snapshot: Snapshot,
start_version: Version,
schema: Schema,
start_table_config: TableConfiguration,
}

impl TableChanges {
@@ -154,13 +151,8 @@ impl TableChanges {
let end_snapshot = Snapshot::try_new(table_root.as_url().clone(), engine, end_version)?;

// Verify CDF is enabled at the beginning and end of the interval using
// [`check_cdf_table_properties`] to fail early. This also ensures that column mapping is
// disabled.
//
// We also check the [`Protocol`] using [`ensure_cdf_read_supported`] to verify that
// we support CDF with those features enabled.
//
// Note: We must still check each metadata and protocol action in the CDF range.
// [`TableConfiguration::is_cdf_read_supported`] to fail early. This also ensures that
// column mapping is disabled.
let check_table_config = |snapshot: &Snapshot| {
if snapshot.table_configuration().is_cdf_read_supported() {
Ok(())
@@ -195,6 +187,7 @@ impl TableChanges {
log_segment,
start_version,
schema,
start_table_config: start_snapshot.table_configuration().clone(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we avoid the clone? aren't we throwing away the start_snapshot? oh i guess if this returns ref we need to clone.. hm maybe there's a case for some into_config conversions etc.? (thinking out loud - not to solve here)

})
}

@@ -220,7 +213,6 @@ impl TableChanges {
pub(crate) fn partition_columns(&self) -> &Vec<String> {
&self.end_snapshot.metadata().partition_columns
}

/// Create a [`TableChangesScanBuilder`] for an `Arc<TableChanges>`.
pub fn scan_builder(self: Arc<Self>) -> TableChangesScanBuilder {
TableChangesScanBuilder::new(self)
@@ -232,42 +224,6 @@ impl TableChanges {
}
}

/// Ensures that change data feed is enabled in `table_properties`. See the documentation
/// of [`TableChanges`] for more details.
fn check_cdf_table_properties(table_properties: &TableProperties) -> DeltaResult<()> {
require!(
table_properties.enable_change_data_feed.unwrap_or(false),
Error::unsupported("Change data feed is not enabled")
);
require!(
matches!(
table_properties.column_mapping_mode,
None | Some(ColumnMappingMode::None)
),
Error::unsupported("Change data feed not supported when column mapping is enabled")
);
Ok(())
}

/// Ensures that Change Data Feed is supported for a table with this [`Protocol`] .
/// See the documentation of [`TableChanges`] for more details.
fn ensure_cdf_read_supported(protocol: &Protocol) -> DeltaResult<()> {
static CDF_SUPPORTED_READER_FEATURES: LazyLock<HashSet<ReaderFeatures>> =
LazyLock::new(|| HashSet::from([ReaderFeatures::DeletionVectors]));
match &protocol.reader_features() {
// if min_reader_version = 3 and all reader features are subset of supported => OK
Some(reader_features) if protocol.min_reader_version() == 3 => {
ensure_supported_features(reader_features, &CDF_SUPPORTED_READER_FEATURES)
}
// if min_reader_version = 1 and there are no reader features => OK
None if protocol.min_reader_version() == 1 => Ok(()),
// any other protocol is not supported
_ => Err(Error::unsupported(
"Change data feed not supported on this protocol",
)),
}
}

#[cfg(test)]
mod tests {
use crate::engine::sync::SyncEngine;
9 changes: 8 additions & 1 deletion kernel/src/table_changes/scan.rs
Original file line number Diff line number Diff line change
@@ -198,7 +198,14 @@ impl TableChangesScan {
PhysicalPredicate::None => None,
};
let schema = self.table_changes.end_snapshot.schema().clone().into();
let it = table_changes_action_iter(engine, commits, schema, physical_predicate)?;
let table_configuration = self.table_changes.start_table_config.clone();
let it = table_changes_action_iter(
engine,
commits,
schema,
physical_predicate,
table_configuration,
)?;
Ok(Some(it).into_iter().flatten())
}

28 changes: 27 additions & 1 deletion kernel/src/table_changes/scan_file.rs
Original file line number Diff line number Diff line change
@@ -244,12 +244,13 @@ mod tests {

use super::{scan_data_to_scan_file, CdfScanFile, CdfScanFileType};
use crate::actions::deletion_vector::DeletionVectorDescriptor;
use crate::actions::{Add, Cdc, Remove};
use crate::actions::{Add, Cdc, Metadata, Protocol, Remove};
use crate::engine::sync::SyncEngine;
use crate::log_segment::LogSegment;
use crate::scan::state::DvInfo;
use crate::schema::{DataType, StructField, StructType};
use crate::table_changes::log_replay::table_changes_action_iter;
use crate::table_configuration::TableConfiguration;
use crate::utils::test_utils::{Action, LocalMockTable};
use crate::Engine;

@@ -333,15 +334,40 @@ mod tests {
None,
)
.unwrap();

let table_schema = StructType::new([
StructField::nullable("id", DataType::INTEGER),
StructField::nullable("value", DataType::STRING),
]);

let schema_string = serde_json::to_string(&table_schema).unwrap();
let metadata = Metadata {
schema_string,
configuration: HashMap::from([
("delta.enableChangeDataFeed".to_string(), "true".to_string()),
(
"delta.enableDeletionVectors".to_string(),
"true".to_string(),
),
("delta.columnMapping.mode".to_string(), "none".to_string()),
]),
..Default::default()
};
let protocol = Protocol::try_new(
3,
7,
Some::<Vec<String>>(vec![]),
Some::<Vec<String>>(vec![]),
Comment on lines +359 to +360
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: Creating Protocol from None is actually pretty challenging since it doesn't know the type of T in Option::<T>::None. Also, the type try_new takes for reader/writer features is Option<impl IntoIterator<Item = impl Into<String>>> shouldn't we be using impl ToString?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Into<String> and ToString have different characteristics, especially wrt String:

  • Into<String> for String is a move
  • ToString for String makes a copy
  • More types impl ToString than impl Into<String>

Dunno which we "should" use here...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm i guess we should prefer moves. Though this is affected by the proposed changes to move all logic to TableConfiguration, so I'll leave this for now and address it in #650

)
.unwrap();
let table_config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap();

let scan_data = table_changes_action_iter(
Arc::new(engine),
log_segment.ascending_commit_files.clone(),
table_schema.into(),
None,
table_config,
)
.unwrap();
let scan_files: Vec<_> = scan_data_to_scan_file(scan_data).try_collect().unwrap();
2 changes: 1 addition & 1 deletion kernel/src/table_configuration.rs
Original file line number Diff line number Diff line change
@@ -33,7 +33,7 @@ use crate::{DeltaResult, Version};
/// `try_new` successfully returns `TableConfiguration`, it is also guaranteed that reading the
/// table is supported.
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) struct TableConfiguration {
metadata: Metadata,
protocol: Protocol,