Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Make CDF use TableConfiguration and refactor log replay #645

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 12 additions & 22 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,30 +37,20 @@ impl MetadataVisitor {
getters.len()
))
);
let name: Option<String> = getters[1].get_opt(row_index, "metadata.name")?;
let description: Option<String> = getters[2].get_opt(row_index, "metadata.description")?;
// get format out of primitives
let format_provider: String = getters[3].get(row_index, "metadata.format.provider")?;
// options for format is always empty, so skip getters[4]
let schema_string: String = getters[5].get(row_index, "metadata.schema_string")?;
let partition_columns: Vec<_> = getters[6].get(row_index, "metadata.partition_list")?;
let created_time: Option<i64> = getters[7].get_opt(row_index, "metadata.created_time")?;
let configuration_map_opt: Option<HashMap<_, _>> =
getters[8].get_opt(row_index, "metadata.configuration")?;
let configuration = configuration_map_opt.unwrap_or_else(HashMap::new);

let configuration_map_opt = getters[8].get_opt(row_index, "metaData.configuration")?;
Ok(Metadata {
id,
name,
description,
name: getters[1].get_opt(row_index, "metaData.name")?,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting i just noticed we had all lowercase before.. guess we are case-insensitive?

description: getters[2].get_opt(row_index, "metaData.description")?,
format: Format {
provider: format_provider,
options: HashMap::new(),
provider: getters[3].get(row_index, "metaData.format.provider")?,
options: HashMap::new(), // options for format is always empty, so skip getters[4]
},
schema_string,
partition_columns,
created_time,
configuration,
schema_string: getters[5].get(row_index, "metaData.schemaString")?,
partition_columns: getters[6].get(row_index, "metaData.partitionColumns")?,
created_time: getters[7].get_opt(row_index, "metaData.created_time")?,
configuration: configuration_map_opt.unwrap_or_else(HashMap::new),
})
}
}
Expand All @@ -75,9 +65,9 @@ impl RowVisitor for MetadataVisitor {
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
for i in 0..row_count {
// Since id column is required, use it to detect presence of a metadata action
if let Some(id) = getters[0].get_opt(i, "metadata.id")? {
if let Some(id) = getters[0].get_opt(i, "metaData.id")? {
self.metadata = Some(Self::visit_metadata(i, id, getters)?);
break;
break; // A commit has at most one metaData action
}
}
Ok(())
Expand Down Expand Up @@ -159,7 +149,7 @@ impl RowVisitor for ProtocolVisitor {
// Since minReaderVersion column is required, use it to detect presence of a Protocol action
if let Some(mrv) = getters[0].get_opt(i, "protocol.min_reader_version")? {
self.protocol = Some(Self::visit_protocol(i, mrv, getters)?);
break;
break; // A commit has at most one Protocol action
}
}
Ok(())
Expand Down
437 changes: 227 additions & 210 deletions kernel/src/table_changes/log_replay.rs

Large diffs are not rendered by default.

95 changes: 73 additions & 22 deletions kernel/src/table_changes/log_replay/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::table_changes_action_iter;
use super::TableChangesScanData;
use super::{process_cdf_commit, table_changes_action_iter};
use crate::actions::deletion_vector::DeletionVectorDescriptor;
use crate::actions::{Add, Cdc, Metadata, Protocol, Remove};
use crate::engine::sync::SyncEngine;
Expand All @@ -10,8 +10,8 @@ use crate::path::ParsedLogPath;
use crate::scan::state::DvInfo;
use crate::scan::PhysicalPredicate;
use crate::schema::{DataType, StructField, StructType};
use crate::table_changes::log_replay::LogReplayScanner;
use crate::table_features::ReaderFeatures;
use crate::table_configuration::TableConfiguration;
use crate::table_features::{ReaderFeatures, WriterFeatures};
use crate::utils::test_utils::{Action, LocalMockTable};
use crate::Expression;
use crate::{DeltaResult, Engine, Error, Version};
Expand All @@ -28,6 +28,31 @@ fn get_schema() -> StructType {
])
}

fn table_config(path: &Path) -> TableConfiguration {
let table_root = url::Url::from_directory_path(path).unwrap();
let schema_string = serde_json::to_string(&get_schema()).unwrap();
let metadata = Metadata {
schema_string,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why is Metadata infallible while Protocol is fallible? Seems like any number of things could go wrong with it, such as partition column names that aren't actually in the schema?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Part of me wonders if we should make a clean break between P&M as "plain old data" vs. table configuration as the one stop shop for validation of that raw data? So then, any time you see a P or M in isolation, you have to assume it's broken in arbitrary ways. It's only "for sure" self-consistent and valid if a TC successfully wrapped it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Metadata just doesn't have a constructor for kernel/rust data that does these checks. All it has is pub fn try_new_from_data(data: &dyn EngineData)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Part of me wonders if we should make a clean break between P&M as "plain old data" vs. table configuration as the one stop shop for validation of that raw data?

Ohhh so proposing that we move the validation that we do in Protocol::try_new to TC? Then this Metadata validation could also live there.

Moreover if we think of protocol as just "raw, unchecked data", I'm starting to wonder if we should move ensure_read_supported, has_writer_feature and has_reader_feature to TC?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... TC becomes the logical heart of the table, with P&M as simple raw information underneath

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for anyone reading this thread, I'm tracking table config stuff in #650

configuration: HashMap::from([
("delta.enableChangeDataFeed".to_string(), "true".to_string()),
(
"delta.enableDeletionVectors".to_string(),
"true".to_string(),
),
("delta.columnMapping.mode".to_string(), "none".to_string()),
]),
Comment on lines +36 to +42
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: I wish rust had better (more transparent) handling of String vs. &str... code like this gets so ugly and unwieldy.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep the to_string everywhere is kind of gross, but at least we're aware of where the allocations are happening

..Default::default()
};
let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeatures::DeletionVectors]),
Some([WriterFeatures::ColumnMapping]),
)
.unwrap();
TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap()
}

fn get_segment(
engine: &dyn Engine,
path: &Path,
Expand Down Expand Up @@ -87,8 +112,10 @@ async fn metadata_protocol() {
.unwrap()
.into_iter();

let table_config = table_config(mock_table.table_root());
let scan_batches =
table_changes_action_iter(engine, commits, get_schema().into(), None).unwrap();
table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
.unwrap();
let sv = result_to_sv(scan_batches);
assert_eq!(sv, &[false, false]);
}
Expand All @@ -112,8 +139,9 @@ async fn cdf_not_enabled() {
.unwrap()
.into_iter();

let table_config = table_config(mock_table.table_root());
let res: DeltaResult<Vec<_>> =
table_changes_action_iter(engine, commits, get_schema().into(), None)
table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
.unwrap()
.try_collect();

Expand Down Expand Up @@ -143,8 +171,9 @@ async fn unsupported_reader_feature() {
.unwrap()
.into_iter();

let table_config = table_config(mock_table.table_root());
let res: DeltaResult<Vec<_>> =
table_changes_action_iter(engine, commits, get_schema().into(), None)
table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
.unwrap()
.try_collect();

Expand Down Expand Up @@ -174,8 +203,9 @@ async fn column_mapping_should_fail() {
.unwrap()
.into_iter();

let table_config = table_config(mock_table.table_root());
let res: DeltaResult<Vec<_>> =
table_changes_action_iter(engine, commits, get_schema().into(), None)
table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
.unwrap()
.try_collect();

Expand Down Expand Up @@ -205,8 +235,9 @@ async fn incompatible_schemas_fail() {
.unwrap()
.into_iter();

let table_config = table_config(mock_table.table_root());
let res: DeltaResult<Vec<_>> =
table_changes_action_iter(engine, commits, cdf_schema.into(), None)
table_changes_action_iter(engine, commits, cdf_schema.into(), None, table_config)
.unwrap()
.try_collect();

Expand Down Expand Up @@ -295,7 +326,8 @@ async fn add_remove() {
.unwrap()
.into_iter();

let sv = table_changes_action_iter(engine, commits, get_schema().into(), None)
let table_config = table_config(mock_table.table_root());
let sv = table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
.unwrap()
.flat_map(|scan_data| {
let scan_data = scan_data.unwrap();
Expand Down Expand Up @@ -345,7 +377,8 @@ async fn filter_data_change() {
.unwrap()
.into_iter();

let sv = table_changes_action_iter(engine, commits, get_schema().into(), None)
let table_config = table_config(mock_table.table_root());
let sv = table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
.unwrap()
.flat_map(|scan_data| {
let scan_data = scan_data.unwrap();
Expand Down Expand Up @@ -391,7 +424,8 @@ async fn cdc_selection() {
.unwrap()
.into_iter();

let sv = table_changes_action_iter(engine, commits, get_schema().into(), None)
let table_config = table_config(mock_table.table_root());
let sv = table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
.unwrap()
.flat_map(|scan_data| {
let scan_data = scan_data.unwrap();
Expand Down Expand Up @@ -457,7 +491,9 @@ async fn dv() {
},
)])
.into();
let sv = table_changes_action_iter(engine, commits, get_schema().into(), None)

let table_config = table_config(mock_table.table_root());
let sv = table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
.unwrap()
.flat_map(|scan_data| {
let scan_data = scan_data.unwrap();
Expand Down Expand Up @@ -534,13 +570,20 @@ async fn data_skipping_filter() {
.unwrap()
.into_iter();

let sv = table_changes_action_iter(engine, commits, logical_schema.into(), predicate)
.unwrap()
.flat_map(|scan_data| {
let scan_data = scan_data.unwrap();
scan_data.selection_vector
})
.collect_vec();
let table_config = table_config(mock_table.table_root());
let sv = table_changes_action_iter(
engine,
commits,
logical_schema.into(),
predicate,
table_config,
)
.unwrap()
.flat_map(|scan_data| {
let scan_data = scan_data.unwrap();
scan_data.selection_vector
})
.collect_vec();

// Note: since the first pair is a dv operation, remove action will always be filtered
assert_eq!(sv, &[false, true, false, false, true]);
Expand Down Expand Up @@ -579,8 +622,9 @@ async fn failing_protocol() {
.unwrap()
.into_iter();

let table_config = table_config(mock_table.table_root());
let res: DeltaResult<Vec<_>> =
table_changes_action_iter(engine, commits, get_schema().into(), None)
table_changes_action_iter(engine, commits, get_schema().into(), None, table_config)
.unwrap()
.try_collect();

Expand All @@ -606,6 +650,13 @@ async fn file_meta_timestamp() {

let commit = commits.next().unwrap();
let file_meta_ts = commit.location.last_modified;
let scanner = LogReplayScanner::try_new(engine.as_ref(), commit, &get_schema().into()).unwrap();
assert_eq!(scanner.timestamp, file_meta_ts);
let mut table_config = table_config(mock_table.table_root());
let processed_commit = process_cdf_commit(
engine.as_ref(),
commit,
&get_schema().into(),
&mut table_config,
)
.unwrap();
assert_eq!(processed_commit.timestamp, file_meta_ts);
}
54 changes: 5 additions & 49 deletions kernel/src/table_changes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,16 @@
//! let table_change_batches = table_changes_scan.execute(engine.clone())?;
//! # Ok::<(), Error>(())
//! ```
use std::collections::HashSet;
use std::sync::{Arc, LazyLock};

use scan::TableChangesScanBuilder;
use url::Url;

use crate::actions::{ensure_supported_features, Protocol};
use crate::log_segment::LogSegment;
use crate::path::AsUrl;
use crate::schema::{DataType, Schema, StructField, StructType};
use crate::snapshot::Snapshot;
use crate::table_features::{ColumnMappingMode, ReaderFeatures};
use crate::table_properties::TableProperties;
use crate::utils::require;
use crate::table_configuration::TableConfiguration;
use crate::{DeltaResult, Engine, Error, Version};

mod log_replay;
Expand Down Expand Up @@ -114,6 +110,7 @@ pub struct TableChanges {
end_snapshot: Snapshot,
start_version: Version,
schema: Schema,
start_table_config: TableConfiguration,
}

impl TableChanges {
Expand Down Expand Up @@ -154,13 +151,8 @@ impl TableChanges {
let end_snapshot = Snapshot::try_new(table_root.as_url().clone(), engine, end_version)?;

// Verify CDF is enabled at the beginning and end of the interval using
// [`check_cdf_table_properties`] to fail early. This also ensures that column mapping is
// disabled.
//
// We also check the [`Protocol`] using [`ensure_cdf_read_supported`] to verify that
// we support CDF with those features enabled.
//
// Note: We must still check each metadata and protocol action in the CDF range.
// [`TableConfiguration::is_cdf_read_supported`] to fail early. This also ensures that
// column mapping is disabled.
let check_table_config = |snapshot: &Snapshot| {
if snapshot.table_configuration().is_cdf_read_supported() {
Ok(())
Expand Down Expand Up @@ -195,6 +187,7 @@ impl TableChanges {
log_segment,
start_version,
schema,
start_table_config: start_snapshot.table_configuration().clone(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we avoid the clone? aren't we throwing away the start_snapshot? oh i guess if this returns ref we need to clone.. hm maybe there's a case for some into_config conversions etc.? (thinking out loud - not to solve here)

})
}

Expand All @@ -220,7 +213,6 @@ impl TableChanges {
pub(crate) fn partition_columns(&self) -> &Vec<String> {
&self.end_snapshot.metadata().partition_columns
}

/// Create a [`TableChangesScanBuilder`] for an `Arc<TableChanges>`.
pub fn scan_builder(self: Arc<Self>) -> TableChangesScanBuilder {
TableChangesScanBuilder::new(self)
Expand All @@ -232,42 +224,6 @@ impl TableChanges {
}
}

/// Ensures that change data feed is enabled in `table_properties`. See the documentation
/// of [`TableChanges`] for more details.
fn check_cdf_table_properties(table_properties: &TableProperties) -> DeltaResult<()> {
require!(
table_properties.enable_change_data_feed.unwrap_or(false),
Error::unsupported("Change data feed is not enabled")
);
require!(
matches!(
table_properties.column_mapping_mode,
None | Some(ColumnMappingMode::None)
),
Error::unsupported("Change data feed not supported when column mapping is enabled")
);
Ok(())
}

/// Ensures that Change Data Feed is supported for a table with this [`Protocol`] .
/// See the documentation of [`TableChanges`] for more details.
fn ensure_cdf_read_supported(protocol: &Protocol) -> DeltaResult<()> {
static CDF_SUPPORTED_READER_FEATURES: LazyLock<HashSet<ReaderFeatures>> =
LazyLock::new(|| HashSet::from([ReaderFeatures::DeletionVectors]));
match &protocol.reader_features() {
// if min_reader_version = 3 and all reader features are subset of supported => OK
Some(reader_features) if protocol.min_reader_version() == 3 => {
ensure_supported_features(reader_features, &CDF_SUPPORTED_READER_FEATURES)
}
// if min_reader_version = 1 and there are no reader features => OK
None if protocol.min_reader_version() == 1 => Ok(()),
// any other protocol is not supported
_ => Err(Error::unsupported(
"Change data feed not supported on this protocol",
)),
}
}

#[cfg(test)]
mod tests {
use crate::engine::sync::SyncEngine;
Expand Down
9 changes: 8 additions & 1 deletion kernel/src/table_changes/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,14 @@ impl TableChangesScan {
PhysicalPredicate::None => None,
};
let schema = self.table_changes.end_snapshot.schema().clone().into();
let it = table_changes_action_iter(engine, commits, schema, physical_predicate)?;
let table_configuration = self.table_changes.start_table_config.clone();
let it = table_changes_action_iter(
engine,
commits,
schema,
physical_predicate,
table_configuration,
)?;
Ok(Some(it).into_iter().flatten())
}

Expand Down
Loading
Loading