Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor rivulet manifest files to use deltacat #464

Merged
merged 15 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ lint: install
test: install
venv/bin/pytest -m "not integration"

unit-test: install
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just added this to run tests more rapidly. Note that some of the unit tests for compaction take a while so we should consider moving them to benchmarks

venv/bin/pytest -m "not integration and not benchmark"

test-integration: install
docker-compose -f dev/iceberg-integration/docker-compose-integration.yml kill
docker-compose -f dev/iceberg-integration/docker-compose-integration.yml rm -f
Expand All @@ -51,5 +54,5 @@ test-integration-rebuild:
benchmark-aws: install
venv/bin/pytest deltacat/benchmarking/benchmark_parquet_reads.py --benchmark-only --benchmark-group-by=group,param:name

benchmark:
benchmark: install
pytest -m benchmark deltacat/benchmarking
2 changes: 1 addition & 1 deletion deltacat/storage/model/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def of(
f"'{entry_content_type}'"
)
raise ValueError(msg)
entry_content_encoding = meta["content_encoding"]
entry_content_encoding = meta.get("content_encoding", None)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was raising a ValueError for manifests with no content encoding (like in Rivulet...). We don't populate lots of fields currently for rivulet deltas/manifests so we will have to update code to populate them (see: #476)

if entry_content_encoding != content_encoding:
msg = (
f"Expected all manifest entries to have content "
Expand Down
2 changes: 1 addition & 1 deletion deltacat/storage/rivulet/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def __init__(

self._file_store = FileStore(self._metadata_path, filesystem)
self._file_provider = FileProvider(self._metadata_path, self._file_store)
self._metastore = DatasetMetastore(self._file_provider)
self._metastore = DatasetMetastore(self._metadata_path, self._file_provider)

# Initialize accessors
self.fields = FieldsAccessor(self)
Expand Down
6 changes: 3 additions & 3 deletions deltacat/storage/rivulet/fs/file_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class FileStore:
method: `list_files`: Lists all files within a specified directory URI.
"""

def __init__(self, path: str, filesystem: FileSystem):
def __init__(self, path: str, filesystem: Optional[FileSystem] = None):
"""
Serves as the source of truth for all file operations, ensuring that
all paths and operations are relative to the specified filesystem,
Expand All @@ -34,8 +34,8 @@ def __init__(self, path: str, filesystem: FileSystem):
param: path (str): The base URI or path for the filesystem.
param: filesystem (FileSystem): A PyArrow filesystem instance.
"""
_, filesystem = FileStore.filesystem(path, filesystem)
self.filesystem = filesystem
_, fs = FileStore.filesystem(path, filesystem)
self.filesystem = filesystem or fs

@staticmethod
def filesystem(
Expand Down
189 changes: 189 additions & 0 deletions deltacat/storage/rivulet/metastore/delta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
from __future__ import annotations

from typing import Protocol, NamedTuple, List
import time

from deltacat.storage import (
ManifestMeta,
EntryType,
DeltaLocator,
Delta,
DeltaType,
Transaction,
TransactionType,
TransactionOperation,
TransactionOperationType,
)
from deltacat.storage.model.manifest import Manifest, ManifestEntryList, ManifestEntry
from deltacat.storage.model.transaction import TransactionOperationList

from deltacat.storage.rivulet import Schema

StreamPosition = int
"""The stream position for creating a consistent ordering of manifests."""
TreeLevel = int
"""The level of the manifest in the LSM-tree."""


class DeltaContext(NamedTuple):
"""Minimal amount of manifest context that may need to be circulated independently or alongside individual files"""

# Schema needed to understand which field group was added when writing manifest
# TODO in the future we should use something like a field group id and keep schema in dataset-level metadata
schema: Schema
stream_position: StreamPosition
level: TreeLevel


class RivuletDelta(dict):
"""
Temporary class during merging of deltacat/rivulet metadata formats

This class currently serves two purposes:
1. Avoid big bang refactor in which consumers of RivuletDelta have to update their code to consume deltacat Delta/Manifest
2. Provide more time to figure out how to represent SST files / schema / etc within deltacat constructs

"""

context: DeltaContext

@staticmethod
def of(delta: Delta) -> RivuletDelta:
riv_delta = RivuletDelta()
riv_delta["dcDelta"] = delta
schema = Schema.from_dict(delta.get("schema"))
riv_delta["DeltaContext"] = DeltaContext(
schema, delta.stream_position, delta.get("level")
)

return riv_delta

@property
def dcDelta(self) -> Delta:
return self.get("dcDelta")

@property
def sst_files(self) -> List[str]:
if "sst_files" not in self.keys():
self["sst_files"] = [m.uri for m in self.dcDelta.manifest.entries]
return self["sst_files"]

@sst_files.setter
def sst_files(self, files: List[str]):
self["sst_files"] = files

@property
def context(self) -> DeltaContext:
return self["DeltaContext"]

@context.setter
def context(self, mc: DeltaContext):
self["DeltaContext"] = mc


class ManifestIO(Protocol):
"""
Minimal interface for reading and writing manifest files
"""

def write(
self,
sst_files: List[str],
schema: Schema,
level: TreeLevel,
) -> str:
...

def read(self, file: str) -> RivuletDelta:
...


class DeltacatManifestIO(ManifestIO):
"""
Writes manifest data, but by writing to a Deltacat metastore using Deltacat delta/manifest classes
"""

def __init__(self, root: str):
self.root = root

def write(
self,
sst_files: List[str],
schema: Schema,
level: TreeLevel,
) -> str:
# Build the Deltacat Manifest entries:
entry_list = ManifestEntryList()
"""
Currently, we use the "data files" manifest entry field for SST files
This is a bit of a hack - we should consider how to better model SST files
(e.g.: add Manifest entry of type "SST") and decide whether we also need to record data files separately
even though they're referenced by SST
Ticket: https://github.com/ray-project/deltacat/issues/469
"""
for sst_uri in sst_files:
entry_list.append(
ManifestEntry.of(
url=sst_uri,
# TODO have rivulet writer populate these values
# see: https://github.com/ray-project/deltacat/issues/476
meta=ManifestMeta.of(
record_count=None, # or known
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside: What does this comment mean?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it means that e.g. when writing data rivulet writer will know the record count and propagate that to manifests. Not updating code since I think the comment/linked ticket are sufficient

content_length=None,
content_type=None,
content_encoding=None,
entry_type=EntryType.DATA,
),
)
)
dc_manifest = Manifest.of(entries=entry_list)

# Create delta and transaction which writes manifest to root
# TODO replace this with higher level storage interface for deltacat

delta_locator = DeltaLocator.at(
namespace=None,
table_name=None,
table_version=None,
stream_id=None,
stream_format=None,
partition_values=None,
partition_id=None,
# Using microsecond precision timestamp as stream position
# TODO consider having storage interface auto assign stream position
stream_position=time.time_ns(),
Comment on lines +152 to +154
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: this appears to be nanosecond precision.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea. Not updating for now since will move automatic stream position generation behind storage/catalog interface

)

delta = Delta.of(
locator=delta_locator,
delta_type=DeltaType.APPEND,
meta=None,
properties={},
manifest=dc_manifest,
)
# TODO later support multiple schemas (https://github.com/ray-project/deltacat/issues/468)
delta["schema"] = schema.to_dict()
# TODO consider if level should be added as first class key to delta or
# kept as specific to storage interface
delta["level"] = level

tx_results = Transaction.of(
txn_type=TransactionType.APPEND,
txn_operations=TransactionOperationList.of(
[
TransactionOperation.of(
operation_type=TransactionOperationType.CREATE,
dest_metafile=delta,
)
]
),
).commit(self.root)
paths = tx_results[0]
assert (
len(paths) == 1
), "expected delta commit transaction to write exactly 1 metafile"
Comment on lines +182 to +184
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point, the transaction has been committed. What (w|sh)ould be the right way to handle this AssertionError? (Is there a way for the caller to recover from such a scenario?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more like a sanity check type exception - we only added one metafile and if the transaction committed more than one than something is seriously wrong

We are abstracting the low level Tx handling behind storage interface in near future

return paths[0]

def read(self, file: str):
delta = Delta.read(file)
return RivuletDelta.of(delta)
111 changes: 0 additions & 111 deletions deltacat/storage/rivulet/metastore/manifest.py

This file was deleted.

8 changes: 4 additions & 4 deletions deltacat/storage/rivulet/metastore/sst_interval_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from intervaltree import Interval, IntervalTree

from deltacat.storage.rivulet.metastore.manifest import ManifestContext
from deltacat.storage.rivulet.metastore.delta import DeltaContext
from deltacat.storage.rivulet.metastore.sst import SSTable, SSTableRow
from deltacat.storage.rivulet import Schema

Expand All @@ -24,7 +24,7 @@ def pairwise(iterable):

class Block(NamedTuple):
row: SSTableRow
context: ManifestContext
context: DeltaContext
"""Context from the manifest around the placement of this row in the LSM-Tree"""


Expand Down Expand Up @@ -124,14 +124,14 @@ def __init__(self):
self.tree: IntervalTree = IntervalTree()
self.max_key_map: Dict[Any, List[Interval]] = {}

def add_sst_table(self, sst: SSTable, context: ManifestContext):
def add_sst_table(self, sst: SSTable, context: DeltaContext):
"""
Add intervals to SSTree which use primary key min and max as intervals
The data for each interval is a tuple of (schema, SSTableRow)
"""
self.add_sst_rows(sst.rows, context)

def add_sst_rows(self, sst_rows: Iterable[SSTableRow], context: ManifestContext):
def add_sst_rows(self, sst_rows: Iterable[SSTableRow], context: DeltaContext):
"""
Add individual SSTable rows to tree
"""
Expand Down
Loading