Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic Transaction Snapshot Isolation Support & Storage Namespace Interface Implementation #455

Merged
merged 14 commits into from
Jan 29, 2025

Conversation

pdames
Copy link
Member

@pdames pdames commented Jan 18, 2025

Summary

Basic concurrent transaction conflict detection via MVCC-based snapshot isolation with corresponding passing unit tests. Added a main metastore storage interface implementation w/ namespace creation/retrieval support.

Checklist

  • Unit tests covering the changes have been added

    • If this is a bugfix, regression tests have been added
  • E2E testing has been performed

@pdames pdames requested a review from wuisawesome January 21, 2025 16:58
txn_operation_type.value,
f"{txn_id}{extension}",
]
sorted_metafile_paths = MetafileRevisionInfo._sorted_file_paths(
Copy link
Member Author

@pdames pdames Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can improve performance here by writing a file like _latest.mpk in each rev directory populated with the latest metafile revision, and then putting each revision file in its own revision number directory (such that this method only needs to list the contents of the latest revision number directory to check for conflicts, and that metafile reads don't need to list all rev dir contents to discover the latest metafile revision to read (they just go directly to reading _latest.mpk).

Copy link
Collaborator

@thesalus thesalus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving some early comments (partway through metafile.py)

deltacat/tests/storage/model/test_metafile_io.py Outdated Show resolved Hide resolved
deltacat/storage/model/metafile.py Outdated Show resolved Hide resolved
deltacat/storage/model/metafile.py Outdated Show resolved Hide resolved
deltacat/storage/model/metafile.py Outdated Show resolved Hide resolved
deltacat/storage/model/metafile.py Outdated Show resolved Hide resolved
deltacat/storage/model/metafile.py Outdated Show resolved Hide resolved
@pdames pdames changed the title Basic Transaction Snapshot Isolation Support Basic Transaction Snapshot Isolation Support & Storage Namespace Interface Implementation Jan 24, 2025
deltacat/catalog/main/impl.py Outdated Show resolved Hide resolved

# table functions
def write_to_table(
data: Union[LocalTable, LocalDataset, DistributedDataset],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're going to have to reconcile dataset.py and some functions here. I'm wondering if it makes sense to spin out a separate catalog impl just for table [version?]. Worth taking a look at methods in rivulet dataset.py at methods like from_parquet, export, add_schema, and thinking where they fit in to deltacat

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could always propose a separate catalog interface just to see what it looks like. Today, in DeltaCAT 1.X, only the storage interface is used extensively, so we're pretty free to evolve or propose a new catalog interface in 2.0.


def read_table(
table: str, namespace: Optional[str] = None, *args, **kwargs
) -> DistributedDataset:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does the user signal if they want to read as a ray dataset or daft dataframe?

We should compare this with the rivulet dataset scan and think about how to integrate that into this interface

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just an old/out-of-date interface. The dataset type they want to read the table as needs to be exposed as an explicit argument here.

raise NotImplementedError("read_table not implemented")


def alter_table(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain how table versions and streams are layered in here? My understanding was that a user can explicitly manage multiple versions of a table, and that within a table version they can explicitly manage multiple different streams (representing "variants" of the same data).

Copy link
Member Author

@pdames pdames Jan 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current thought is that there is only a fixed number of streams per table version, where each stream is uniquely identified by the metastore type it's associated with (either Iceberg, Hudi, Delta Lake, or the main DeltaCAT straem). Non-DeltaCAT native stream types will have metadata writes to the DeltaCAT main stream translated and synchronized to them, either directly (e.g., Iceberg) or transitively (e.g., from Iceberg to Hudi/DeltaLake via XTable). A user would need to specify explicit configuration stating that they would like to synchronize their DeltaCAT writes to Iceberg/Hudi/DeltaLake/etc. in order for those streams to be created and maintained.

The TableVersion then holds properties that are intended to be common to all streams, like the latest Arrow Schema associated with the Stream that the Iceberg/Hudi/Delta schema has ultimately been derived from (with no derivation needed for the main DeltaCAT stream, since it uses Arrow Schemas natively).

I think TableVersion creation will largely be a backend decision (e.g., by default, we should put a backwards-incompatible schema change into a new table version), but could also be an optional, explicit decision from the end-user (e.g., the user effectively wants to create a new evolutionary "branch" of their table, and hold that branch in a new table version).

By Default, table reads will resolve to the latest active table version, but a user can specify if they'd like to explicitly read a different table version.

return _list_metafiles(
*args,
metafile=Namespace.of(NamespaceLocator.of("placeholder")),
txn_op_type=TransactionOperationType.READ_SIBLINGS,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is READ_SIBLINGS a transaction type?

A transaction type like READ_LATEST makes sense to me, because you are essentially locking to a transaction start time and reading with respect to that snapshot. Does READ_SIBLING just list sibling directories, and in that case does it even have to do with transactions?

This may be a smell that the scope of transaction management / commits is too large, if you need to go through transaction management to basically do any metastore reads

Copy link
Member Author

@pdames pdames Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't strictly need to go through a transaction manager for any metastore reads, but the point of MVCC Snapshot Isolation is to use read transactions whenever you want to "freeze" the database in a consistent state during a read, such that you don't interleave partial state from multiple in-progress transactions into your read.

So reads outside the context of a transaction are perfectly valid, but you'll see the evolving "streaming" state of the metastore instead of the "frozen" state of the metastore captured at the beginning of the transaction, and associated ACID compliance guarantees will be lost (e.g., you may return a namespace that was created as part of an incomplete transaction, and should have never been exposed to a reader).

However, I think it could be argued that READ_SIBLINGS in particular could be considered potentially unnecessary when you already have READ_CHILDREN (e.g., we could treat all namespaces as children of the root catalog, which presumably has no other catalog-level siblings).

Copy link
Member Author

@pdames pdames Jan 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In reading your comment again, I should also be clear that READ_SIBLINGS reads the latest committed revision of all siblings for the current transaction's snapshot (i.e., this will read the latest committed version of every namespace in the catalog for the current transaction snapshot, and not just return whatever namespace revisions it happens to find on disk).

table_name: str,
partition_values: Optional[PartitionValues] = None,
table_version: Optional[str] = None,
first_stream_position: Optional[int] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are stream positions comparable across partitions? Or is this only relevant within a single partition?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream positions are not currently assumed to be comparable across partitions.

partition_scheme_id: Optional[str] = None,
*args,
**kwargs,
) -> ListResult[Delta]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So having paginated listing everywhere means we need a custom implementation of pyarrow filesystems? We should bring this up with Anshuman who is refactor filesystem interfaces

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps, but it may not be the only solution to the problem (e.g., we could also entertain creating a "delta page" metafile that stores N delta locators inside of it).

Either way, I definitely agree that the default pyarrow filesystems will start becoming vastly inefficient (esp. for S3 and other cloud storage systems) with the current metastore layout as the number of objects starts growing into the thousands and beyond, since pyarrow fs will attempt to eagerly list all all child partitions of a table stream, all child deltas of a stream partition, etc. Even if we can paginate the loading of the full Table/Partition/Delta metafile contents, just the exhaustive listing itself here is undesirable.

raise NotImplementedError("get_latest_delta not implemented")


def download_delta(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this belong in the storage interface? Playing devil's advocate - the storage interface should be just a dumb wrapper for manipulating the metastore. Once you get into actually using metadata files to download data, that feels more like the catalog interface or some level of abstraction between catalog and storage

Copy link
Member Author

@pdames pdames Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For compactors and similar system maintenance jobs, the answer is "yes", since they're arguably one of few job types that need to be delta aware (e.g., a compactor will explicitly download all of the deltas that arrived since the last time it ran, then merge them into a singular new replacement delta). The assumption here is that the average end-user shouldn't need to understand the CDC log of deltas that are ultimately merged into the final table/dataset they read/write, and should instead just read/write at the table/dataset abstraction level instead of at the delta level.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, in short, I don't disagree with the purity of keeping the storage interface restricted metadata manipulation only, but we would need to find a way to bridge the gap somehow between exposing system-level API parameters for delta-level read/write to backend procedures like compaction w/o also unnecessarily exposing the internals of the metastore format to end users.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok that makes sense, I'm aligned with this after reading your comment


def create_table(
table: str,
namespace: Optional[str] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does creating a table work when namespace isn't specified? Does it create a default namespace?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - tables land in a global system namespace if the namespace isn't specified, so the namespace effectively becomes implicit.

raise NotImplementedError("get_partition not implemented")


def stage_delta(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been meaning to ask: where in the deltacat metastore are data files stored? Presumably staging a delta will choose where to write data files?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now the data file storage location is auto-resolved by the catalog. This may be an area where we need some additional arguments if we'd like to support staging deltas to multiple disparate locations with rivulet though.

raise NotImplementedError("get_partition not implemented")


def stage_delta(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the large number of arguments, should we do what you've done in other parts of deltacat and make most parameters keyword only?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would break backwards compatibility with 1.X so, while I think this is probably the right approach eventually, I'd prefer to do it in a subsequent iteration after we've merged 2.0 back to main.

@pdames pdames force-pushed the 2.0 branch 2 times, most recently from 678ad7b to 3a05a6d Compare January 25, 2025 03:07
@pdames pdames merged commit 825a48c into ray-project:2.0 Jan 29, 2025
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants