Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add in-commit timestamp support for change data feed #617

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 32 additions & 24 deletions kernel/src/table_changes/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ use std::sync::{Arc, LazyLock};
use crate::actions::schemas::GetStructField;
use crate::actions::visitors::{visit_deletion_vector_at, ProtocolVisitor};
use crate::actions::{
get_log_add_schema, Add, Cdc, Metadata, Protocol, Remove, ADD_NAME, CDC_NAME, METADATA_NAME,
PROTOCOL_NAME, REMOVE_NAME,
get_log_add_schema, Add, Cdc, Metadata, Protocol, Remove, ADD_NAME, CDC_NAME, COMMIT_INFO_NAME,
METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME,
};
use crate::engine_data::{GetData, TypedGetData};
use crate::expressions::{column_name, ColumnName};
use crate::path::ParsedLogPath;
use crate::scan::data_skipping::DataSkippingFilter;
use crate::scan::state::DvInfo;
use crate::schema::{ArrayType, ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructType};
use crate::schema::{
ArrayType, ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructField, StructType,
};
use crate::table_changes::scan_file::{cdf_scan_row_expression, cdf_scan_row_schema};
use crate::table_changes::{check_cdf_table_properties, ensure_cdf_read_supported};
use crate::table_properties::TableProperties;
Expand Down Expand Up @@ -78,6 +80,12 @@ pub(crate) fn table_changes_action_iter(
/// Deletion vector resolution affects whether a remove action is selected in the second
/// phase, so we must perform it ahead of time in phase 1.
/// - Ensure that reading is supported on any protocol updates.
/// - Extract the in-commit timestamps from [`CommitInfo`] actions if they are present. These are
/// generated when in-commit timestamps (ICT) table feature is enabled. This must be done in the
/// first phase because the second phase lazily transforms engine data with an extra timestamp
/// column, so the timestamp must be known ahead of time. Note that when ICT is enabled, CommitInfo
/// should be the first action in every commit.
/// See: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#in-commit-timestamps
/// - Ensure that Change Data Feed is enabled for any metadata update. See [`TableProperties`]
/// - Ensure that any schema update is compatible with the provided `schema`. Currently, schema
/// compatibility is checked through schema equality. This will be expanded in the future to
Expand All @@ -93,12 +101,6 @@ pub(crate) fn table_changes_action_iter(
///
/// See https://github.com/delta-io/delta/blob/master/PROTOCOL.md#deletion-vectors
///
/// TODO: When the kernel supports in-commit timestamps, we will also have to inspect CommitInfo
/// actions to find the timestamp. These are generated when incommit timestamps is enabled.
/// This must be done in the first phase because the second phase lazily transforms engine data with
/// an extra timestamp column. Thus, the timestamp must be known ahead of time.
/// See https://github.com/delta-io/delta-kernel-rs/issues/559
///
/// 2. Scan file generation phase [`LogReplayScanner::into_scan_batches`]: This iterates over every
/// action in the commit, and generates [`TableChangesScanData`]. It does so by transforming the
/// actions using [`add_transform_expr`], and generating selection vectors with the following rules:
Expand All @@ -118,14 +120,8 @@ struct LogReplayScanner {
// The commit file that this replay scanner will operate on.
commit_file: ParsedLogPath,
// The timestamp associated with this commit. This is the file modification time
// from the commit's [`FileMeta`].
//
//
// TODO when incommit timestamps are supported: If there is a [`CommitInfo`] with a timestamp
// generated by in-commit timestamps, that timestamp will be used instead.
//
// Note: This will be used once an expression is introduced to transform the engine data in
// [`TableChangesScanData`]
// from the commit's [`FileMeta`]. If in-commit timestamps feature is enabled, this will be the
// in-commit timestamp from the [`CommitInfo`] action.
timestamp: i64,
}

Expand All @@ -136,15 +132,14 @@ impl LogReplayScanner {
/// 2. Construct a map from path to deletion vector of remove actions that share the same path
/// as an add action.
/// 3. Perform validation on each protocol and metadata action in the commit.
/// 4. Extract the in-commit timestamp from [`CommitInfo`] if it is present.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't comment on L130 above but I think we need to do some comment updates?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I went through every mention of ICT and I think I got them all.

///
/// For more details, see the documentation for [`LogReplayScanner`].
fn try_new(
engine: &dyn Engine,
commit_file: ParsedLogPath,
table_schema: &SchemaRef,
) -> DeltaResult<Self> {
let visitor_schema = PreparePhaseVisitor::schema();

// Note: We do not perform data skipping yet because we need to visit all add and
// remove actions for deletion vector resolution to be correct.
//
Expand All @@ -156,22 +151,25 @@ impl LogReplayScanner {
// vectors are resolved so that we can skip both actions in the pair.
let action_iter = engine.get_json_handler().read_json_files(
&[commit_file.location.clone()],
visitor_schema,
PreparePhaseVisitor::schema(),
None, // not safe to apply data skipping yet
)?;

let mut remove_dvs = HashMap::default();
let mut add_paths = HashSet::default();
let mut has_cdc_action = false;
for actions in action_iter {
let mut timestamp = commit_file.location.last_modified;
for (i, actions) in action_iter.enumerate() {
let actions = actions?;

let mut visitor = PreparePhaseVisitor {
add_paths: &mut add_paths,
remove_dvs: &mut remove_dvs,
has_cdc_action: &mut has_cdc_action,
commit_timestamp: &mut timestamp,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this be clearer?

Suggested change
commit_timestamp: &mut timestamp,
in_commit_timestamp: &mut timestamp,

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We initialize this field with the file modification timestamp, so it would be inaccurate to call it that. I do like the update you made below tho when we actually read ICT from a commitinfo.

protocol: None,
metadata_info: None,
is_first_batch: i == 0,
};
visitor.visit_rows_of(actions.as_ref())?;

Expand Down Expand Up @@ -202,7 +200,7 @@ impl LogReplayScanner {
remove_dvs.retain(|rm_path, _| add_paths.contains(rm_path));
}
Ok(LogReplayScanner {
timestamp: commit_file.location.last_modified,
timestamp,
commit_file,
has_cdc_action,
remove_dvs,
Expand All @@ -220,7 +218,6 @@ impl LogReplayScanner {
has_cdc_action,
remove_dvs,
commit_file,
// TODO: Add the timestamp as a column with an expression
timestamp,
} = self;
let remove_dvs = Arc::new(remove_dvs);
Expand Down Expand Up @@ -274,15 +271,19 @@ struct PreparePhaseVisitor<'a> {
has_cdc_action: &'a mut bool,
add_paths: &'a mut HashSet<String>,
remove_dvs: &'a mut HashMap<String, DvInfo>,
commit_timestamp: &'a mut i64,
is_first_batch: bool,
}
impl PreparePhaseVisitor<'_> {
fn schema() -> Arc<StructType> {
let ict_type = StructField::new("inCommitTimestamp", DataType::LONG, true);
Arc::new(StructType::new(vec![
Option::<Add>::get_struct_field(ADD_NAME),
Option::<Remove>::get_struct_field(REMOVE_NAME),
Option::<Cdc>::get_struct_field(CDC_NAME),
Option::<Metadata>::get_struct_field(METADATA_NAME),
Option::<Protocol>::get_struct_field(PROTOCOL_NAME),
StructField::new(COMMIT_INFO_NAME, StructType::new([ict_type]), true),
]))
}
}
Expand Down Expand Up @@ -314,6 +315,7 @@ impl RowVisitor for PreparePhaseVisitor<'_> {
(INTEGER, column_name!("protocol.minWriterVersion")),
(string_list.clone(), column_name!("protocol.readerFeatures")),
(string_list, column_name!("protocol.writerFeatures")),
(LONG, column_name!("commitInfo.inCommitTimestamp")),
];
let (types, names) = types_and_names.into_iter().unzip();
(names, types).into()
Expand All @@ -323,7 +325,7 @@ impl RowVisitor for PreparePhaseVisitor<'_> {

fn visit<'b>(&mut self, row_count: usize, getters: &[&'b dyn GetData<'b>]) -> DeltaResult<()> {
require!(
getters.len() == 16,
getters.len() == 17,
Error::InternalError(format!(
"Wrong number of PreparePhaseVisitor getters: {}",
getters.len()
Expand Down Expand Up @@ -354,6 +356,12 @@ impl RowVisitor for PreparePhaseVisitor<'_> {
let protocol =
ProtocolVisitor::visit_protocol(i, min_reader_version, &getters[12..=15])?;
self.protocol = Some(protocol);
} else if let Some(in_commit_timestamp) =
getters[16].get_long(i, "commitInfo.inCommitTimestamp")?
{
if self.is_first_batch && i == 0 {
*self.commit_timestamp = in_commit_timestamp;
}
}
}
Ok(())
Expand Down
35 changes: 35 additions & 0 deletions kernel/src/table_changes/log_replay/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::table_changes_action_iter;
use super::TableChangesScanData;
use crate::actions::deletion_vector::DeletionVectorDescriptor;
use crate::actions::CommitInfo;
use crate::actions::{Add, Cdc, Metadata, Protocol, Remove};
use crate::engine::sync::SyncEngine;
use crate::expressions::Scalar;
Expand Down Expand Up @@ -609,3 +610,37 @@ async fn file_meta_timestamp() {
let scanner = LogReplayScanner::try_new(engine.as_ref(), commit, &get_schema().into()).unwrap();
assert_eq!(scanner.timestamp, file_meta_ts);
}

#[tokio::test]
async fn table_changes_in_commit_timestamp() {
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();

let timestamp = 12345678;

mock_table
.commit([
Action::CommitInfo(CommitInfo {
in_commit_timestamp: Some(timestamp),
..Default::default()
}),
Comment on lines +623 to +626
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if commit info isn't first? do we still read it? I know the protocol says it must be first with ICT enabled but I wonder what the expected behavior is when it isn't first? do we do the right thing?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(but probably don't solve here)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed a little here:
#581 (comment)

I'm still quite certain that delta-spark doesn't care about the ordering because it goes through the all actions in the commit looking for CommitInfo

        var commitInfo: Option[CommitInfo] = None
        actions.foreach {
          case c: AddCDCFile =>
            cdcActions.append(c)
            totalFiles += 1L
            totalBytes += c.size
          case a: AddFile =>
            totalFiles += 1L
            totalBytes += a.size
          case r: RemoveFile =>
            totalFiles += 1L
            totalBytes += r.size.getOrElse(0L)
          case i: CommitInfo => commitInfo = Some(i)
          case _ => // do nothing
        }

I've added a check that only puts in the ICT if it is the first action in the log, but there comes a question: should we fail if it isn't the first action?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can also revert the check that CommitInfo is first and revisit that in a future PR.

Action::Add(Add {
path: "fake_path_1".into(),
data_change: true,
..Default::default()
}),
])
.await;

let mut commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();

let commit = commits.next().unwrap();
let scanner = LogReplayScanner::try_new(engine.as_ref(), commit, &get_schema().into()).unwrap();
assert_eq!(scanner.timestamp, timestamp);

let iter = scanner.into_scan_batches(engine, None).unwrap();
let sv = result_to_sv(iter);
assert_eq!(sv, vec![false, true]);
}
17 changes: 14 additions & 3 deletions kernel/src/table_changes/scan_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ mod tests {

use super::{scan_data_to_scan_file, CdfScanFile, CdfScanFileType};
use crate::actions::deletion_vector::DeletionVectorDescriptor;
use crate::actions::{Add, Cdc, Remove};
use crate::actions::{Add, Cdc, CommitInfo, Remove};
use crate::engine::sync::SyncEngine;
use crate::log_segment::LogSegment;
use crate::scan::state::DvInfo;
Expand Down Expand Up @@ -312,14 +312,25 @@ mod tests {
..Default::default()
};

let cdc_timestamp = 12345678;
let commit_info = CommitInfo {
in_commit_timestamp: Some(cdc_timestamp),
..Default::default()
};

mock_table
.commit([
Action::Remove(remove_paired.clone()),
Action::Add(add_paired.clone()),
Action::Remove(remove.clone()),
])
.await;
mock_table.commit([Action::Cdc(cdc.clone())]).await;
mock_table
.commit([
Action::CommitInfo(commit_info.clone()),
Action::Cdc(cdc.clone()),
])
.await;
mock_table
.commit([Action::Remove(remove_no_partition.clone())])
.await;
Expand Down Expand Up @@ -386,7 +397,7 @@ mod tests {
},
partition_values: cdc.partition_values,
commit_version: 1,
commit_timestamp: timestamps[1],
commit_timestamp: cdc_timestamp,
remove_dv: None,
},
CdfScanFile {
Expand Down
Loading