diff --git a/kernel/src/actions/visitors.rs b/kernel/src/actions/visitors.rs index 957befe80..853b90594 100644 --- a/kernel/src/actions/visitors.rs +++ b/kernel/src/actions/visitors.rs @@ -37,30 +37,20 @@ impl MetadataVisitor { getters.len() )) ); - let name: Option = getters[1].get_opt(row_index, "metadata.name")?; - let description: Option = getters[2].get_opt(row_index, "metadata.description")?; // get format out of primitives - let format_provider: String = getters[3].get(row_index, "metadata.format.provider")?; - // options for format is always empty, so skip getters[4] - let schema_string: String = getters[5].get(row_index, "metadata.schema_string")?; - let partition_columns: Vec<_> = getters[6].get(row_index, "metadata.partition_list")?; - let created_time: Option = getters[7].get_opt(row_index, "metadata.created_time")?; - let configuration_map_opt: Option> = - getters[8].get_opt(row_index, "metadata.configuration")?; - let configuration = configuration_map_opt.unwrap_or_else(HashMap::new); - + let configuration_map_opt = getters[8].get_opt(row_index, "metaData.configuration")?; Ok(Metadata { id, - name, - description, + name: getters[1].get_opt(row_index, "metaData.name")?, + description: getters[2].get_opt(row_index, "metaData.description")?, format: Format { - provider: format_provider, - options: HashMap::new(), + provider: getters[3].get(row_index, "metaData.format.provider")?, + options: HashMap::new(), // options for format is always empty, so skip getters[4] }, - schema_string, - partition_columns, - created_time, - configuration, + schema_string: getters[5].get(row_index, "metaData.schemaString")?, + partition_columns: getters[6].get(row_index, "metaData.partitionColumns")?, + created_time: getters[7].get_opt(row_index, "metaData.created_time")?, + configuration: configuration_map_opt.unwrap_or_else(HashMap::new), }) } } @@ -75,9 +65,9 @@ impl RowVisitor for MetadataVisitor { fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> { for i in 0..row_count { // Since id column is required, use it to detect presence of a metadata action - if let Some(id) = getters[0].get_opt(i, "metadata.id")? { + if let Some(id) = getters[0].get_opt(i, "metaData.id")? { self.metadata = Some(Self::visit_metadata(i, id, getters)?); - break; + break; // A commit has at most one metaData action } } Ok(()) @@ -159,7 +149,7 @@ impl RowVisitor for ProtocolVisitor { // Since minReaderVersion column is required, use it to detect presence of a Protocol action if let Some(mrv) = getters[0].get_opt(i, "protocol.min_reader_version")? { self.protocol = Some(Self::visit_protocol(i, mrv, getters)?); - break; + break; // A commit has at most one Protocol action } } Ok(()) diff --git a/kernel/src/table_changes/log_replay.rs b/kernel/src/table_changes/log_replay.rs index 89951a39b..cedc02f54 100644 --- a/kernel/src/table_changes/log_replay.rs +++ b/kernel/src/table_changes/log_replay.rs @@ -1,6 +1,10 @@ -//! Defines [`LogReplayScanner`] used by [`TableChangesScan`] to process commit files and extract -//! the metadata needed to generate the Change Data Feed. - +//! This module handles the log replay for Change Data Feed. This is done in two phases: +//! [`process_cdf_commit`], and [`cdf_commit_to_scan_batches`]. The first phase pre-processes the +//! commit file to validate the [`TableConfiguration`] and to collect information about the commit's +//! timestamp and presence of cdc actions. Then [`TableChangesScanData`] are generated in the second +//! phase. Note: As a consequence of the two phases, we must iterate over each action in the +//! commit twice. It also may use an unbounded amount of memory, proportional to the number of +//! `add` + `remove` actions in the _single_ commit. use std::collections::{HashMap, HashSet}; use std::sync::{Arc, LazyLock}; @@ -17,8 +21,7 @@ use crate::scan::data_skipping::DataSkippingFilter; use crate::scan::state::DvInfo; use crate::schema::{ArrayType, ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructType}; use crate::table_changes::scan_file::{cdf_scan_row_expression, cdf_scan_row_schema}; -use crate::table_changes::{check_cdf_table_properties, ensure_cdf_read_supported}; -use crate::table_properties::TableProperties; +use crate::table_configuration::TableConfiguration; use crate::utils::require; use crate::{DeltaResult, Engine, EngineData, Error, ExpressionRef, RowVisitor}; @@ -52,76 +55,55 @@ pub(crate) fn table_changes_action_iter( commit_files: impl IntoIterator, table_schema: SchemaRef, physical_predicate: Option<(ExpressionRef, SchemaRef)>, + mut table_configuration: TableConfiguration, ) -> DeltaResult>> { let filter = DataSkippingFilter::new(engine.as_ref(), physical_predicate).map(Arc::new); + let process_engine_ref = engine.clone(); let result = commit_files .into_iter() - .map(move |commit_file| -> DeltaResult<_> { - let scanner = LogReplayScanner::try_new(engine.as_ref(), commit_file, &table_schema)?; - scanner.into_scan_batches(engine.clone(), filter.clone()) + .map(move |commit_file| -> DeltaResult { + process_cdf_commit( + process_engine_ref.as_ref(), + commit_file, + &table_schema, + &mut table_configuration, + ) }) //Iterator-Result-Iterator-Result + .map(move |processed_commit| -> DeltaResult<_> { + cdf_commit_to_scan_batches(processed_commit?, engine.clone(), filter.clone()) + }) .flatten_ok() // Iterator-Result-Result .map(|x| x?); // Iterator-Result Ok(result) } -/// Processes a single commit file from the log to generate an iterator of [`TableChangesScanData`]. -/// The scanner operates in two phases that _must_ be performed in the following order: -/// 1. Prepare phase [`LogReplayScanner::try_new`]: This iterates over every action in the commit. -/// In this phase, we do the following: -/// - Determine if there exist any `cdc` actions. We determine this in the first phase because -/// the selection vectors for actions are lazily constructed in phase 2. We must know ahead -/// of time whether to filter out add/remove actions. -/// - Constructs the remove deletion vector map from paths belonging to `remove` actions to the -/// action's corresponding [`DvInfo`]. This map will be filtered to only contain paths that -/// exists in another `add` action _within the same commit_. We store the result in `remove_dvs`. -/// Deletion vector resolution affects whether a remove action is selected in the second -/// phase, so we must perform it ahead of time in phase 1. -/// - Ensure that reading is supported on any protocol updates. -/// - Ensure that Change Data Feed is enabled for any metadata update. See [`TableProperties`] -/// - Ensure that any schema update is compatible with the provided `schema`. Currently, schema -/// compatibility is checked through schema equality. This will be expanded in the future to -/// allow limited schema evolution. -/// -/// Note: We check the protocol, change data feed enablement, and schema compatibility in phase 1 -/// in order to detect errors and fail early. -/// -/// Note: The reader feature [`ReaderFeatures::DeletionVectors`] controls whether the table is -/// allowed to contain deletion vectors. [`TableProperties`].enable_deletion_vectors only -/// determines whether writers are allowed to create _new_ deletion vectors. Hence, we do not need -/// to check the table property for deletion vector enablement. -/// -/// See https://github.com/delta-io/delta/blob/master/PROTOCOL.md#deletion-vectors -/// -/// TODO: When the kernel supports in-commit timestamps, we will also have to inspect CommitInfo -/// actions to find the timestamp. These are generated when incommit timestamps is enabled. -/// This must be done in the first phase because the second phase lazily transforms engine data with -/// an extra timestamp column. Thus, the timestamp must be known ahead of time. -/// See https://github.com/delta-io/delta-kernel-rs/issues/559 -/// -/// 2. Scan file generation phase [`LogReplayScanner::into_scan_batches`]: This iterates over every -/// action in the commit, and generates [`TableChangesScanData`]. It does so by transforming the -/// actions using [`add_transform_expr`], and generating selection vectors with the following rules: -/// - If a `cdc` action was found in the prepare phase, only `cdc` actions are selected -/// - Otherwise, select `add` and `remove` actions. Note that only `remove` actions that do not -/// share a path with an `add` action are selected. -/// -/// Note: As a consequence of the two phases, LogReplayScanner will iterate over each action in the -/// commit twice. It also may use an unbounded amount of memory, proportional to the number of -/// `add` + `remove` actions in the _single_ commit. -struct LogReplayScanner { - // True if a `cdc` action was found after running [`LogReplayScanner::try_new`] +/// Represents a single commit file that's been processed by [`process_cdf_commit`]. If successfully +/// constructed, the [`ProcessedCdfCommit`] will hold: +/// - The `timestamp` of the commit. Currently this is the file modification timestamp. When the +/// kernel supports in-commit timestamps, we will also have to inspect CommitInfo actions to find +/// the in-commit timestamp. These are generated when the incommit timestamps table property is +/// enabled. This must be done in [`process_cdf_commit`] instead of the [`cdf_commit_to_scan_batches`] +/// because it lazily transforms engine data with an extra timestamp column Thus, the timestamp must +/// be known before the next phase. +/// See +/// - `remove_dvs`, a map from each remove action's path to its [`DvInfo`]. This will be used to +/// resolve the deletion vectors to find the rows that were changed this commit. +/// See [`crate::table_changes::resolve_dvs`] +/// - `has_cdc_action` which is `true` when there is a `cdc` action present in the commit. This is +/// used in [`cdf_commit_to_scan_batches`] to correctly generate a selection vector over the actions +/// in the commit. +struct ProcessedCdfCommit { + // True if a `cdc` action was found in the commit has_cdc_action: bool, // A map from path to the deletion vector from the remove action. It is guaranteed that there // is an add action with the same path in this commit remove_dvs: HashMap, - // The commit file that this replay scanner will operate on. + // The commit file that this [`ProcessedCdfCommit`] represents. commit_file: ParsedLogPath, // The timestamp associated with this commit. This is the file modification time // from the commit's [`FileMeta`]. // - // - // TODO when incommit timestamps are supported: If there is a [`CommitInfo`] with a timestamp + // TODO when in-commit timestamps are supported: If there is a [`CommitInfo`] with a timestamp // generated by in-commit timestamps, that timestamp will be used instead. // // Note: This will be used once an expression is introduced to transform the engine data in @@ -129,174 +111,202 @@ struct LogReplayScanner { timestamp: i64, } -impl LogReplayScanner { - /// Constructs a LogReplayScanner, performing the Prepare phase detailed in [`LogReplayScanner`]. - /// This iterates over each action in the commit. It performs the following: - /// 1. Check the commits for the presence of a `cdc` action. - /// 2. Construct a map from path to deletion vector of remove actions that share the same path - /// as an add action. - /// 3. Perform validation on each protocol and metadata action in the commit. - /// - /// For more details, see the documentation for [`LogReplayScanner`]. - fn try_new( - engine: &dyn Engine, - commit_file: ParsedLogPath, - table_schema: &SchemaRef, - ) -> DeltaResult { - let visitor_schema = PreparePhaseVisitor::schema(); +/// This processes a commit file to prepare for generating [`TableChangesScanData`]. To do so, it +/// performs the following: +/// - Determine if there exist any `cdc` actions. We determine this in this phase because the +/// selection vectors for actions are lazily constructed in the following phase. We must know +/// ahead of time whether to filter out add/remove actions. +/// - Constructs the map from paths belonging to `remove` action's path to its [`DvInfo`]. This +/// map will be filtered to only contain paths that exists in another `add` action _within the +/// same commit_. We store the result in `remove_dvs`. Deletion vector resolution affects +/// whether a remove action is selected in the second phase, so we must perform it before +/// [`cdf_commit_to_scan_batches`]. +/// - Ensure that reading is supported on any protocol updates. +/// - Ensure that Change Data Feed is enabled for any metadata update. See [`TableProperties`] +/// - Ensure that any schema update is compatible with the provided `schema`. Currently, schema +/// compatibility is checked through schema equality. This will be expanded in the future to +/// allow limited schema evolution. +/// +/// Note: We check the protocol, change data feed enablement, and schema compatibility in +/// [process_cdf_commit] in order to detect errors and fail early. +/// +/// Note: The reader feature [`ReaderFeatures::DeletionVectors`] controls whether the table is +/// allowed to contain deletion vectors. [`TableProperties`].enable_deletion_vectors only +/// determines whether writers are allowed to create _new_ deletion vectors. Hence, we do not need +/// to check the table property for deletion vector enablement. +/// +/// See https://github.com/delta-io/delta/blob/master/PROTOCOL.md#deletion-vectors +fn process_cdf_commit( + engine: &dyn Engine, + commit_file: ParsedLogPath, + table_schema: &SchemaRef, + table_configuration: &mut TableConfiguration, +) -> DeltaResult { + let visitor_schema = ProcessCdfCommitVisitor::schema(); - // Note: We do not perform data skipping yet because we need to visit all add and - // remove actions for deletion vector resolution to be correct. - // - // Consider a scenario with a pair of add/remove actions with the same path. The add - // action has file statistics, while the remove action does not (stats is optional for - // remove). In this scenario we might skip the add action, while the remove action remains. - // As a result, we would read the file path for the remove action, which is unnecessary because - // all of the rows will be filtered by the predicate. Instead, we wait until deletion - // vectors are resolved so that we can skip both actions in the pair. - let action_iter = engine.get_json_handler().read_json_files( - &[commit_file.location.clone()], - visitor_schema, - None, // not safe to apply data skipping yet - )?; + // Note: We do not perform data skipping yet because we need to visit all add and + // remove actions for deletion vector resolution to be correct. + // + // Consider a scenario with a pair of add/remove actions with the same path. The add + // action has file statistics, while the remove action does not (stats is optional for + // remove). In this scenario we might skip the add action, while the remove action remains. + // As a result, we would read the file path for the remove action, which is unnecessary because + // all of the rows will be filtered by the predicate. Instead, we wait until deletion + // vectors are resolved so that we can skip both actions in the pair. + let action_iter = engine.get_json_handler().read_json_files( + &[commit_file.location.clone()], + visitor_schema, + None, // not safe to apply data skipping yet + )?; - let mut remove_dvs = HashMap::default(); - let mut add_paths = HashSet::default(); - let mut has_cdc_action = false; - for actions in action_iter { - let actions = actions?; + let mut remove_dvs = HashMap::default(); + let mut add_paths = HashSet::default(); + let mut has_cdc_action = false; + for actions in action_iter { + let actions = actions?; - let mut visitor = PreparePhaseVisitor { - add_paths: &mut add_paths, - remove_dvs: &mut remove_dvs, - has_cdc_action: &mut has_cdc_action, - protocol: None, - metadata_info: None, - }; - visitor.visit_rows_of(actions.as_ref())?; + let mut visitor = ProcessCdfCommitVisitor { + add_paths: &mut add_paths, + remove_dvs: &mut remove_dvs, + has_cdc_action: &mut has_cdc_action, + protocol: None, + metadata: None, + }; - if let Some(protocol) = visitor.protocol { - ensure_cdf_read_supported(&protocol) - .map_err(|_| Error::change_data_feed_unsupported(commit_file.version))?; - } - if let Some((schema, configuration)) = visitor.metadata_info { - let schema: StructType = serde_json::from_str(&schema)?; - // Currently, schema compatibility is defined as having equal schema types. In the - // future, more permisive schema evolution will be supported. - // See: https://github.com/delta-io/delta-kernel-rs/issues/523 - require!( - table_schema.as_ref() == &schema, - Error::change_data_feed_incompatible_schema(table_schema, &schema) - ); - let table_properties = TableProperties::from(configuration); - check_cdf_table_properties(&table_properties) - .map_err(|_| Error::change_data_feed_unsupported(commit_file.version))?; + visitor.visit_rows_of(actions.as_ref())?; + let metadata_changed = visitor.metadata.is_some(); + match (visitor.protocol, visitor.metadata) { + (None, None) => {} // no change + (protocol, metadata) => { + // at least one of protocol and metadata changed, so update the table configuration + *table_configuration = TableConfiguration::try_new( + metadata.unwrap_or_else(|| table_configuration.metadata().clone()), + protocol.unwrap_or_else(|| table_configuration.protocol().clone()), + table_configuration.table_root().clone(), + commit_file.version, + )?; + if !table_configuration.is_cdf_read_supported() { + return Err(Error::change_data_feed_unsupported(commit_file.version)); + } } } - // We resolve the remove deletion vector map after visiting the entire commit. - if has_cdc_action { - remove_dvs.clear(); - } else { - // The only (path, deletion_vector) pairs we must track are ones whose path is the - // same as an `add` action. - remove_dvs.retain(|rm_path, _| add_paths.contains(rm_path)); + if metadata_changed { + require!( + table_schema.as_ref() == table_configuration.schema(), + Error::change_data_feed_incompatible_schema( + table_schema, + table_configuration.schema() + ) + ); } - Ok(LogReplayScanner { - timestamp: commit_file.location.last_modified, - commit_file, - has_cdc_action, - remove_dvs, - }) } - /// Generates an iterator of [`TableChangesScanData`] by iterating over each action of the - /// commit, generating a selection vector, and transforming the engine data. This performs - /// phase 2 of [`LogReplayScanner`]. - fn into_scan_batches( - self, - engine: Arc, - filter: Option>, - ) -> DeltaResult>> { - let Self { - has_cdc_action, - remove_dvs, - commit_file, - // TODO: Add the timestamp as a column with an expression - timestamp, - } = self; - let remove_dvs = Arc::new(remove_dvs); + // We resolve the remove deletion vector map after visiting the entire commit. + if has_cdc_action { + remove_dvs.clear(); + } else { + // The only (path, deletion_vector) pairs we must track are ones whose path is the + // same as an `add` action. + remove_dvs.retain(|rm_path, _| add_paths.contains(rm_path)); + } + Ok(ProcessedCdfCommit { + timestamp: commit_file.location.last_modified, + commit_file, + has_cdc_action, + remove_dvs, + }) +} - let schema = FileActionSelectionVisitor::schema(); - let action_iter = engine.get_json_handler().read_json_files( - &[commit_file.location.clone()], - schema, - None, - )?; - let commit_version = commit_file - .version - .try_into() - .map_err(|_| Error::generic("Failed to convert commit version to i64"))?; - let evaluator = engine.get_expression_handler().get_evaluator( - get_log_add_schema().clone(), - cdf_scan_row_expression(timestamp, commit_version), - cdf_scan_row_schema().into(), - ); +/// Generates an iterator of [`TableChangesScanData`] by consuming a [`ProcessedCdfCommit`] and iterating +/// over each action in the commit. This generates a selection vector and transforms the actions using +/// [`add_transform_expr`]. Selection vectors are generated using the following rules: +/// - If a `cdc` action was found in the prepare phase, only `cdc` actions are selected +/// - Otherwise, select `add` and `remove` actions. Note that only `remove` actions that do not +/// share a path with an `add` action are selected. +fn cdf_commit_to_scan_batches( + processed_commit: ProcessedCdfCommit, + engine: Arc, + filter: Option>, +) -> DeltaResult>> { + let ProcessedCdfCommit { + has_cdc_action, + remove_dvs, + commit_file, + timestamp, + } = processed_commit; + let remove_dvs = Arc::new(remove_dvs); - let result = action_iter.map(move |actions| -> DeltaResult<_> { - let actions = actions?; + let schema = FileActionSelectionVisitor::schema(); + let action_iter = + engine + .get_json_handler() + .read_json_files(&[commit_file.location.clone()], schema, None)?; + let commit_version = commit_file + .version + .try_into() + .map_err(|_| Error::generic("Failed to convert commit version to i64"))?; + let evaluator = engine.get_expression_handler().get_evaluator( + get_log_add_schema().clone(), + cdf_scan_row_expression(timestamp, commit_version), + cdf_scan_row_schema().into(), + ); - // Apply data skipping to get back a selection vector for actions that passed skipping. - // We start our selection vector based on what was filtered. We will add to this vector - // below if a file has been removed. Note: None implies all files passed data skipping. - let selection_vector = match &filter { - Some(filter) => filter.apply(actions.as_ref())?, - None => vec![true; actions.len()], - }; + let result = action_iter.map(move |actions| -> DeltaResult<_> { + let actions = actions?; - let mut visitor = - FileActionSelectionVisitor::new(&remove_dvs, selection_vector, has_cdc_action); - visitor.visit_rows_of(actions.as_ref())?; - let scan_data = evaluator.evaluate(actions.as_ref())?; - Ok(TableChangesScanData { - scan_data, - selection_vector: visitor.selection_vector, - remove_dvs: remove_dvs.clone(), - }) - }); - Ok(result) - } + // Apply data skipping to get back a selection vector for actions that passed skipping. + // We start our selection vector based on what was filtered. We will add to this vector + // below if a file has been removed. Note: None implies all files passed data skipping. + let selection_vector = match &filter { + Some(filter) => filter.apply(actions.as_ref())?, + None => vec![true; actions.len()], + }; + + let mut visitor = + FileActionSelectionVisitor::new(&remove_dvs, selection_vector, has_cdc_action); + visitor.visit_rows_of(actions.as_ref())?; + let scan_data = evaluator.evaluate(actions.as_ref())?; + Ok(TableChangesScanData { + scan_data, + selection_vector: visitor.selection_vector, + remove_dvs: remove_dvs.clone(), + }) + }); + Ok(result) } -// This is a visitor used in the prepare phase of [`LogReplayScanner`]. See -// [`LogReplayScanner::try_new`] for details usage. -struct PreparePhaseVisitor<'a> { +// This is a visitor used in [`process_cdf_commit`]. +struct ProcessCdfCommitVisitor<'a> { protocol: Option, - metadata_info: Option<(String, HashMap)>, + metadata: Option, has_cdc_action: &'a mut bool, add_paths: &'a mut HashSet, remove_dvs: &'a mut HashMap, } -impl PreparePhaseVisitor<'_> { - fn schema() -> Arc { - Arc::new(StructType::new(vec![ - Option::::get_struct_field(ADD_NAME), - Option::::get_struct_field(REMOVE_NAME), - Option::::get_struct_field(CDC_NAME), - Option::::get_struct_field(METADATA_NAME), - Option::::get_struct_field(PROTOCOL_NAME), - ])) +impl ProcessCdfCommitVisitor<'_> { + fn schema() -> SchemaRef { + static PREPARE_PHASE_SCHEMA: LazyLock = LazyLock::new(|| { + Arc::new(StructType::new(vec![ + Option::::get_struct_field(ADD_NAME), + Option::::get_struct_field(REMOVE_NAME), + Option::::get_struct_field(CDC_NAME), + Option::::get_struct_field(METADATA_NAME), + Option::::get_struct_field(PROTOCOL_NAME), + ])) + }); + PREPARE_PHASE_SCHEMA.clone() } } -impl RowVisitor for PreparePhaseVisitor<'_> { +impl RowVisitor for ProcessCdfCommitVisitor<'_> { fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) { - // NOTE: The order of the names and types is based on [`PreparePhaseVisitor::schema`] + // NOTE: The order of the names and types is based on [`ProcessCdfCommitVisitor::schema`] static NAMES_AND_TYPES: LazyLock = LazyLock::new(|| { const STRING: DataType = DataType::STRING; const INTEGER: DataType = DataType::INTEGER; const LONG: DataType = DataType::LONG; const BOOLEAN: DataType = DataType::BOOLEAN; - let string_list: DataType = ArrayType::new(STRING, false).into(); - let string_string_map = MapType::new(STRING, STRING, false).into(); + let str_list: DataType = ArrayType::new(STRING, false).into(); + let str_str_map: DataType = MapType::new(STRING, STRING, false).into(); let types_and_names = vec![ (STRING, column_name!("add.path")), (BOOLEAN, column_name!("add.dataChange")), @@ -308,12 +318,14 @@ impl RowVisitor for PreparePhaseVisitor<'_> { (INTEGER, column_name!("remove.deletionVector.sizeInBytes")), (LONG, column_name!("remove.deletionVector.cardinality")), (STRING, column_name!("cdc.path")), + (STRING, column_name!("metaData.id")), (STRING, column_name!("metaData.schemaString")), - (string_string_map, column_name!("metaData.configuration")), + (str_list.clone(), column_name!("metaData.partitionColumns")), + (str_str_map, column_name!("metaData.configuration")), (INTEGER, column_name!("protocol.minReaderVersion")), (INTEGER, column_name!("protocol.minWriterVersion")), - (string_list.clone(), column_name!("protocol.readerFeatures")), - (string_list, column_name!("protocol.writerFeatures")), + (str_list.clone(), column_name!("protocol.readerFeatures")), + (str_list, column_name!("protocol.writerFeatures")), ]; let (types, names) = types_and_names.into_iter().unzip(); (names, types).into() @@ -323,9 +335,9 @@ impl RowVisitor for PreparePhaseVisitor<'_> { fn visit<'b>(&mut self, row_count: usize, getters: &[&'b dyn GetData<'b>]) -> DeltaResult<()> { require!( - getters.len() == 16, + getters.len() == 18, Error::InternalError(format!( - "Wrong number of PreparePhaseVisitor getters: {}", + "Wrong number of ProcessCdfCommitVisitor getters: {}", getters.len() )) ); @@ -344,15 +356,20 @@ impl RowVisitor for PreparePhaseVisitor<'_> { } } else if getters[9].get_str(i, "cdc.path")?.is_some() { *self.has_cdc_action = true; - } else if let Some(schema) = getters[10].get_str(i, "metaData.schemaString")? { - let configuration_map_opt = getters[11].get_opt(i, "metadata.configuration")?; - let configuration = configuration_map_opt.unwrap_or_else(HashMap::new); - self.metadata_info = Some((schema.to_string(), configuration)); + } else if let Some(id) = getters[10].get_opt(i, "metaData.id")? { + let configuration_map_opt = getters[13].get_opt(i, "metaData.configuration")?; + self.metadata = Some(Metadata { + id, + schema_string: getters[11].get(i, "metaData.schemaString")?, + partition_columns: getters[12].get(i, "metaData.partitionColumns")?, + configuration: configuration_map_opt.unwrap_or_else(HashMap::new), + ..Default::default() // Other fields are ignored + }); } else if let Some(min_reader_version) = - getters[12].get_int(i, "protocol.min_reader_version")? + getters[14].get_int(i, "protocol.min_reader_version")? { let protocol = - ProtocolVisitor::visit_protocol(i, min_reader_version, &getters[12..=15])?; + ProtocolVisitor::visit_protocol(i, min_reader_version, &getters[14..=17])?; self.protocol = Some(protocol); } } @@ -360,8 +377,8 @@ impl RowVisitor for PreparePhaseVisitor<'_> { } } -// This visitor generates selection vectors based on the rules specified in [`LogReplayScanner`]. -// See [`LogReplayScanner::into_scan_batches`] for usage. +// This visitor generates selection vectors based on the rules specified in +// [`cdf_commit_to_scan_batches`]. struct FileActionSelectionVisitor<'a> { selection_vector: Vec, has_cdc_action: bool, diff --git a/kernel/src/table_changes/log_replay/tests.rs b/kernel/src/table_changes/log_replay/tests.rs index 35c4a99f8..6e3f4bf71 100644 --- a/kernel/src/table_changes/log_replay/tests.rs +++ b/kernel/src/table_changes/log_replay/tests.rs @@ -1,5 +1,5 @@ -use super::table_changes_action_iter; use super::TableChangesScanData; +use super::{process_cdf_commit, table_changes_action_iter}; use crate::actions::deletion_vector::DeletionVectorDescriptor; use crate::actions::{Add, Cdc, Metadata, Protocol, Remove}; use crate::engine::sync::SyncEngine; @@ -10,8 +10,8 @@ use crate::path::ParsedLogPath; use crate::scan::state::DvInfo; use crate::scan::PhysicalPredicate; use crate::schema::{DataType, StructField, StructType}; -use crate::table_changes::log_replay::LogReplayScanner; -use crate::table_features::ReaderFeatures; +use crate::table_configuration::TableConfiguration; +use crate::table_features::{ReaderFeatures, WriterFeatures}; use crate::utils::test_utils::{Action, LocalMockTable}; use crate::Expression; use crate::{DeltaResult, Engine, Error, Version}; @@ -28,6 +28,31 @@ fn get_schema() -> StructType { ]) } +fn table_config(path: &Path) -> TableConfiguration { + let table_root = url::Url::from_directory_path(path).unwrap(); + let schema_string = serde_json::to_string(&get_schema()).unwrap(); + let metadata = Metadata { + schema_string, + configuration: HashMap::from([ + ("delta.enableChangeDataFeed".to_string(), "true".to_string()), + ( + "delta.enableDeletionVectors".to_string(), + "true".to_string(), + ), + ("delta.columnMapping.mode".to_string(), "none".to_string()), + ]), + ..Default::default() + }; + let protocol = Protocol::try_new( + 3, + 7, + Some([ReaderFeatures::DeletionVectors]), + Some([WriterFeatures::ColumnMapping]), + ) + .unwrap(); + TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap() +} + fn get_segment( engine: &dyn Engine, path: &Path, @@ -87,8 +112,10 @@ async fn metadata_protocol() { .unwrap() .into_iter(); + let table_config = table_config(mock_table.table_root()); let scan_batches = - table_changes_action_iter(engine, commits, get_schema().into(), None).unwrap(); + table_changes_action_iter(engine, commits, get_schema().into(), None, table_config) + .unwrap(); let sv = result_to_sv(scan_batches); assert_eq!(sv, &[false, false]); } @@ -112,8 +139,9 @@ async fn cdf_not_enabled() { .unwrap() .into_iter(); + let table_config = table_config(mock_table.table_root()); let res: DeltaResult> = - table_changes_action_iter(engine, commits, get_schema().into(), None) + table_changes_action_iter(engine, commits, get_schema().into(), None, table_config) .unwrap() .try_collect(); @@ -143,8 +171,9 @@ async fn unsupported_reader_feature() { .unwrap() .into_iter(); + let table_config = table_config(mock_table.table_root()); let res: DeltaResult> = - table_changes_action_iter(engine, commits, get_schema().into(), None) + table_changes_action_iter(engine, commits, get_schema().into(), None, table_config) .unwrap() .try_collect(); @@ -174,8 +203,9 @@ async fn column_mapping_should_fail() { .unwrap() .into_iter(); + let table_config = table_config(mock_table.table_root()); let res: DeltaResult> = - table_changes_action_iter(engine, commits, get_schema().into(), None) + table_changes_action_iter(engine, commits, get_schema().into(), None, table_config) .unwrap() .try_collect(); @@ -205,8 +235,9 @@ async fn incompatible_schemas_fail() { .unwrap() .into_iter(); + let table_config = table_config(mock_table.table_root()); let res: DeltaResult> = - table_changes_action_iter(engine, commits, cdf_schema.into(), None) + table_changes_action_iter(engine, commits, cdf_schema.into(), None, table_config) .unwrap() .try_collect(); @@ -295,7 +326,8 @@ async fn add_remove() { .unwrap() .into_iter(); - let sv = table_changes_action_iter(engine, commits, get_schema().into(), None) + let table_config = table_config(mock_table.table_root()); + let sv = table_changes_action_iter(engine, commits, get_schema().into(), None, table_config) .unwrap() .flat_map(|scan_data| { let scan_data = scan_data.unwrap(); @@ -345,7 +377,8 @@ async fn filter_data_change() { .unwrap() .into_iter(); - let sv = table_changes_action_iter(engine, commits, get_schema().into(), None) + let table_config = table_config(mock_table.table_root()); + let sv = table_changes_action_iter(engine, commits, get_schema().into(), None, table_config) .unwrap() .flat_map(|scan_data| { let scan_data = scan_data.unwrap(); @@ -391,7 +424,8 @@ async fn cdc_selection() { .unwrap() .into_iter(); - let sv = table_changes_action_iter(engine, commits, get_schema().into(), None) + let table_config = table_config(mock_table.table_root()); + let sv = table_changes_action_iter(engine, commits, get_schema().into(), None, table_config) .unwrap() .flat_map(|scan_data| { let scan_data = scan_data.unwrap(); @@ -457,7 +491,9 @@ async fn dv() { }, )]) .into(); - let sv = table_changes_action_iter(engine, commits, get_schema().into(), None) + + let table_config = table_config(mock_table.table_root()); + let sv = table_changes_action_iter(engine, commits, get_schema().into(), None, table_config) .unwrap() .flat_map(|scan_data| { let scan_data = scan_data.unwrap(); @@ -534,13 +570,20 @@ async fn data_skipping_filter() { .unwrap() .into_iter(); - let sv = table_changes_action_iter(engine, commits, logical_schema.into(), predicate) - .unwrap() - .flat_map(|scan_data| { - let scan_data = scan_data.unwrap(); - scan_data.selection_vector - }) - .collect_vec(); + let table_config = table_config(mock_table.table_root()); + let sv = table_changes_action_iter( + engine, + commits, + logical_schema.into(), + predicate, + table_config, + ) + .unwrap() + .flat_map(|scan_data| { + let scan_data = scan_data.unwrap(); + scan_data.selection_vector + }) + .collect_vec(); // Note: since the first pair is a dv operation, remove action will always be filtered assert_eq!(sv, &[false, true, false, false, true]); @@ -579,8 +622,9 @@ async fn failing_protocol() { .unwrap() .into_iter(); + let table_config = table_config(mock_table.table_root()); let res: DeltaResult> = - table_changes_action_iter(engine, commits, get_schema().into(), None) + table_changes_action_iter(engine, commits, get_schema().into(), None, table_config) .unwrap() .try_collect(); @@ -606,6 +650,13 @@ async fn file_meta_timestamp() { let commit = commits.next().unwrap(); let file_meta_ts = commit.location.last_modified; - let scanner = LogReplayScanner::try_new(engine.as_ref(), commit, &get_schema().into()).unwrap(); - assert_eq!(scanner.timestamp, file_meta_ts); + let mut table_config = table_config(mock_table.table_root()); + let processed_commit = process_cdf_commit( + engine.as_ref(), + commit, + &get_schema().into(), + &mut table_config, + ) + .unwrap(); + assert_eq!(processed_commit.timestamp, file_meta_ts); } diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index e65b0ae53..3dace50f9 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -31,20 +31,16 @@ //! let table_change_batches = table_changes_scan.execute(engine.clone())?; //! # Ok::<(), Error>(()) //! ``` -use std::collections::HashSet; use std::sync::{Arc, LazyLock}; use scan::TableChangesScanBuilder; use url::Url; -use crate::actions::{ensure_supported_features, Protocol}; use crate::log_segment::LogSegment; use crate::path::AsUrl; use crate::schema::{DataType, Schema, StructField, StructType}; use crate::snapshot::Snapshot; -use crate::table_features::{ColumnMappingMode, ReaderFeatures}; -use crate::table_properties::TableProperties; -use crate::utils::require; +use crate::table_configuration::TableConfiguration; use crate::{DeltaResult, Engine, Error, Version}; mod log_replay; @@ -114,6 +110,7 @@ pub struct TableChanges { end_snapshot: Snapshot, start_version: Version, schema: Schema, + start_table_config: TableConfiguration, } impl TableChanges { @@ -154,13 +151,8 @@ impl TableChanges { let end_snapshot = Snapshot::try_new(table_root.as_url().clone(), engine, end_version)?; // Verify CDF is enabled at the beginning and end of the interval using - // [`check_cdf_table_properties`] to fail early. This also ensures that column mapping is - // disabled. - // - // We also check the [`Protocol`] using [`ensure_cdf_read_supported`] to verify that - // we support CDF with those features enabled. - // - // Note: We must still check each metadata and protocol action in the CDF range. + // [`TableConfiguration::is_cdf_read_supported`] to fail early. This also ensures that + // column mapping is disabled. let check_table_config = |snapshot: &Snapshot| { if snapshot.table_configuration().is_cdf_read_supported() { Ok(()) @@ -195,6 +187,7 @@ impl TableChanges { log_segment, start_version, schema, + start_table_config: start_snapshot.table_configuration().clone(), }) } @@ -220,7 +213,6 @@ impl TableChanges { pub(crate) fn partition_columns(&self) -> &Vec { &self.end_snapshot.metadata().partition_columns } - /// Create a [`TableChangesScanBuilder`] for an `Arc`. pub fn scan_builder(self: Arc) -> TableChangesScanBuilder { TableChangesScanBuilder::new(self) @@ -232,42 +224,6 @@ impl TableChanges { } } -/// Ensures that change data feed is enabled in `table_properties`. See the documentation -/// of [`TableChanges`] for more details. -fn check_cdf_table_properties(table_properties: &TableProperties) -> DeltaResult<()> { - require!( - table_properties.enable_change_data_feed.unwrap_or(false), - Error::unsupported("Change data feed is not enabled") - ); - require!( - matches!( - table_properties.column_mapping_mode, - None | Some(ColumnMappingMode::None) - ), - Error::unsupported("Change data feed not supported when column mapping is enabled") - ); - Ok(()) -} - -/// Ensures that Change Data Feed is supported for a table with this [`Protocol`] . -/// See the documentation of [`TableChanges`] for more details. -fn ensure_cdf_read_supported(protocol: &Protocol) -> DeltaResult<()> { - static CDF_SUPPORTED_READER_FEATURES: LazyLock> = - LazyLock::new(|| HashSet::from([ReaderFeatures::DeletionVectors])); - match &protocol.reader_features() { - // if min_reader_version = 3 and all reader features are subset of supported => OK - Some(reader_features) if protocol.min_reader_version() == 3 => { - ensure_supported_features(reader_features, &CDF_SUPPORTED_READER_FEATURES) - } - // if min_reader_version = 1 and there are no reader features => OK - None if protocol.min_reader_version() == 1 => Ok(()), - // any other protocol is not supported - _ => Err(Error::unsupported( - "Change data feed not supported on this protocol", - )), - } -} - #[cfg(test)] mod tests { use crate::engine::sync::SyncEngine; diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index 0c2ff3eed..345f98445 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -198,7 +198,14 @@ impl TableChangesScan { PhysicalPredicate::None => None, }; let schema = self.table_changes.end_snapshot.schema().clone().into(); - let it = table_changes_action_iter(engine, commits, schema, physical_predicate)?; + let table_configuration = self.table_changes.start_table_config.clone(); + let it = table_changes_action_iter( + engine, + commits, + schema, + physical_predicate, + table_configuration, + )?; Ok(Some(it).into_iter().flatten()) } diff --git a/kernel/src/table_changes/scan_file.rs b/kernel/src/table_changes/scan_file.rs index f428e09df..bb9790dc1 100644 --- a/kernel/src/table_changes/scan_file.rs +++ b/kernel/src/table_changes/scan_file.rs @@ -244,12 +244,13 @@ mod tests { use super::{scan_data_to_scan_file, CdfScanFile, CdfScanFileType}; use crate::actions::deletion_vector::DeletionVectorDescriptor; - use crate::actions::{Add, Cdc, Remove}; + use crate::actions::{Add, Cdc, Metadata, Protocol, Remove}; use crate::engine::sync::SyncEngine; use crate::log_segment::LogSegment; use crate::scan::state::DvInfo; use crate::schema::{DataType, StructField, StructType}; use crate::table_changes::log_replay::table_changes_action_iter; + use crate::table_configuration::TableConfiguration; use crate::utils::test_utils::{Action, LocalMockTable}; use crate::Engine; @@ -333,15 +334,40 @@ mod tests { None, ) .unwrap(); + let table_schema = StructType::new([ StructField::nullable("id", DataType::INTEGER), StructField::nullable("value", DataType::STRING), ]); + + let schema_string = serde_json::to_string(&table_schema).unwrap(); + let metadata = Metadata { + schema_string, + configuration: HashMap::from([ + ("delta.enableChangeDataFeed".to_string(), "true".to_string()), + ( + "delta.enableDeletionVectors".to_string(), + "true".to_string(), + ), + ("delta.columnMapping.mode".to_string(), "none".to_string()), + ]), + ..Default::default() + }; + let protocol = Protocol::try_new( + 3, + 7, + Some::>(vec![]), + Some::>(vec![]), + ) + .unwrap(); + let table_config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap(); + let scan_data = table_changes_action_iter( Arc::new(engine), log_segment.ascending_commit_files.clone(), table_schema.into(), None, + table_config, ) .unwrap(); let scan_files: Vec<_> = scan_data_to_scan_file(scan_data).try_collect().unwrap(); diff --git a/kernel/src/table_configuration.rs b/kernel/src/table_configuration.rs index 565546d52..cf4d55624 100644 --- a/kernel/src/table_configuration.rs +++ b/kernel/src/table_configuration.rs @@ -33,7 +33,7 @@ use crate::{DeltaResult, Version}; /// `try_new` successfully returns `TableConfiguration`, it is also guaranteed that reading the /// table is supported. #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct TableConfiguration { metadata: Metadata, protocol: Protocol,