Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support pyarrow filessytem backend #466

Merged
merged 2 commits into from
Jan 27, 2025
Merged

Conversation

anshumankomawar
Copy link
Collaborator

@anshumankomawar anshumankomawar commented Jan 24, 2025

Summary

This PR removes the custom filesystem abstraction and relies directly on PyArrow's filesystem support. By doing so, we gain compatibility with local, S3, GCP, and other fsspec-backed filesystems.

This switch also sets us up for integration with deltacat in the future.

Changes

The updated filesystem structure for Rivulet is as follows:

Component Responsibility Details
FileProvider Manages URIs for data and metadata files Acts as the interface for generating and managing file. Relies on FileStore for file operations.
FileStore Maintains the resolved filesystem instance Handles all file operations like creation, reading, and listing. Key method: resolve_filesystem (vital to deltacat integration).

Impact

Users can use other fsspec filesystems like S3 without additional configuration or code changes.

Testing

make test
Did manual testing with s3 filesystem.

Checklist

  • Unit tests covering the changes have been added

    • If this is a bugfix, regression tests have been added
  • E2E testing has been performed

@anshumankomawar anshumankomawar force-pushed the anshumankomawar/fsspec branch 11 times, most recently from 20be3e9 to 8364904 Compare January 27, 2025 01:34
@anshumankomawar anshumankomawar marked this pull request as ready for review January 27, 2025 02:17
@anshumankomawar anshumankomawar force-pushed the anshumankomawar/fsspec branch 3 times, most recently from 03a1364 to 44e2b95 Compare January 27, 2025 08:56
Copy link
Collaborator

@flliver flliver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still feels like there is leftover complexity having _metadata_path, _file_store, _file_provider, and _metastore all be instantiated by the dataset class, and almost never actually used in the class itself.

Seeing lots of of over-abstraction as well with file_provider and file_store that we are trying to get rid of by using the pyarrow fsspec. I strongly suspect both of those classes can just be completely deleted and folded into the metastore so we don't have three half classes that are all strongly interrelated and passed around eachother all the time.

Separately, leaking filesystem into every public facing constructor is a no-no, public facing constructors need to stay bare minimum/cleanest possible impl.

Comment on lines -173 to +180
# Initialize metadata handling
self._file_store = FileStore()
self._location_provider = FileLocationProvider(
self._metadata_path, self._file_store
# TODO: integrate with deltacat catalog to infer filesystem
self._metadata_folder = f".riv-meta-{dataset_name}"
# determine root filesystem
path, filesystem = FileStore.filesystem(
metadata_uri or self._metadata_folder, filesystem
)
self._metastore = DatasetMetastore(self._location_provider, self._file_store)

self._metadata_path = posixpath.join(path, self._metadata_folder)

self._file_store = FileStore(self._metadata_path, filesystem)
self._file_provider = FileProvider(self._metadata_path, self._file_store)
self._metastore = DatasetMetastore(self._file_provider)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check with Patrick if you haven't already, one of these file path libs is cross-system safe and one is not. I don't rember if it's posixpath or another way that makes it all safe, Patrick has a link to Ray data which I believe is the safest impl we're aware of.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe posixpath is the same join being used in deltacat, and it still requires that paths are resolved to a filesystem before joining. Made sure to take care of this with the changes.

Comment on lines -178 to +180
self._metastore = DatasetMetastore(self._location_provider, self._file_store)

self._metadata_path = posixpath.join(path, self._metadata_folder)

self._file_store = FileStore(self._metadata_path, filesystem)
self._file_provider = FileProvider(self._metadata_path, self._file_store)
self._metastore = DatasetMetastore(self._file_provider)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is getting into Java-y wrapper class voodoo again. _metadata_path is only used by FileStore/FileProvider, the logic for all this could move into a single class (probably (DatasetMetastore)), FileProvider could probably just go away entirely, then dataset only passes a path to the Metastore and everything works great. Everything about the metastore is in the metastore class, everything about datasets is in the dataset class, instead of right now the wiring for metastore/filestore/fileprovider/metadata_uri is all in the dataset class, even though it shouldn't have to care about any of it.

Comment on lines 196 to 201
metadata_uri: Optional[str] = None,
schema_mode: str = "union",
filesystem: Optional[pyarrow.fs.FileSystem] = None,
) -> Dataset:
"""
Create a Dataset from parquet files.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be careful not to clutter customer facing classes w/ internal stuff like this. Why would we want a customer to provide a filesystem when they've already provided a file_uri and metadata_uri (which already contains a filesystem)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more of a niche case where a customer might want to pass in their own Fsspec implementation. As long as it is correctly implemented pyarrow should handle everything for us. I don't expect this to be used but is an option for customers.

Comment on lines 217 to 222
# TODO: integrate this with filesystem from deltacat catalog
file_uri, file_fs = FileStore.filesystem(file_uri, filesystem=filesystem)
metadata_uri, metadata_fs = FileStore.filesystem(
metadata_uri or posixpath.join(file_uri, "riv-meta")
)
pyarrow_dataset = pyarrow.dataset.dataset(file_uri, filesystem=filesystem)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this duplicative FS management code, it's too easy to have this become bifurcated from the main dataset constructor.

Also, i think I wrote already messed this up when I wrote this, where this constructor uses riv-meta as the file-dir, while the main constructor uses .riv-meta-${dataset_name}.

I think it's time to restructure this by creating an empty dataset first, then getting the metastore/fs from the dataset, and using that FS to open the files and read schema. That would let us delete all the code in this function for FS handling.

Comment on lines 224 to 228
# TODO: when integrating deltacat consider if we can support multiple filesystems
if file_fs != metadata_fs:
raise ValueError(
"File URI and metadata URI must be on the same filesystem."
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the right thing to be checking, but at the wrong place. This should be checked anytime any dataset is constructed, not just in the static from_parquet constructor. Points to that there's a code smell and FS handling needs to be moved out of these functions to a central place.

Comment on lines 319 to 365
metadata_uri = metadata_uri or os.path.join(file_uri, "riv-meta")
# TODO: integrate this with filesystem from deltacat catalog
file_uri, file_fs = FileStore.filesystem(file_uri, filesystem=filesystem)
metadata_uri, metadata_fs = FileStore.filesystem(
metadata_uri or posixpath.join(file_uri, "riv-meta")
)

# TODO: when integrating deltacat consider if we can support multiple filesystems
if file_fs != metadata_fs:
raise ValueError(
"File URI and metadata URI must be on the same filesystem."
)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All this repetitive code definitely a bad code smell and time for refactor.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed will create a story to combine the from_file methods into a wrapper class.

Comment on lines +8 to +27

class FileProvider:
"""
Manages the generation of URIs for data and metadata files and facilitates the creation of files at those URIs.
All files are generated relative to the root of the storage location.

This class is inspired by the Iceberg `LocationProvider` and provides methods
to generate paths for various types of files (e.g., data files, SSTs, and manifests)
while maintaining a clear structure within the dataset.

TODO (deltacat): FileProvider will be replaced/refactored once we are able to integrate with Deltacat.
TODO: Incorporate additional file naming conventions, such as including
partitionId, taskId, and operationId, to improve traceability and
idempotency.
"""

uri: str

def __init__(self, uri: str, file_store: FileStore):
"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like half a class, it's clearly only ever intended to be used by the metastore to read/write metastore files, and it's heavily dependent on the filesystem.

Comment on lines 11 to +53
class FileStore:
"""Entrypoint for storing and retrieving files"""
"""
Manages the filesystem and low-level file operations.
This class is designed to work with any filesystem supported by PyArrow; local, S3, HDFS, GCP,
and other fsspec-compatible filesystems.

TODO: Add better error consolidation between filesystems. Will be handled by deltacat implementation?

method: `filesystem`: Resolves and normalizes a given path and filesystem.
method: `file_exists`: Checks if a file exists at a given URI.
method: `create_file`: Creates a new file for writing at a specified URI.
method: `read_file`: Reads an existing file from a specified URI.
method: `list_files`: Lists all files within a specified directory URI.
"""

def __init__(self, path: str, filesystem: FileSystem):
"""
Serves as the source of truth for all file operations, ensuring that
all paths and operations are relative to the specified filesystem,
providing consistency and compatibility across fsspec supported backends.

TODO (deltacat): maybe rely on deltacat catalog as a source of truth for rivulet filesystem.

param: path (str): The base URI or path for the filesystem.
param: filesystem (FileSystem): A PyArrow filesystem instance.
"""
_, filesystem = FileStore.filesystem(path, filesystem)
self.filesystem = filesystem

@staticmethod
def filesystem(
path: str, filesystem: Optional[FileSystem] = None
) -> Tuple[str, FileSystem]:
"""
Resolves and normalizes the given path and filesystem.

param: path (str): The URI or path to resolve.
param: filesystem (Optional[FileSystem]): An optional filesystem instance.
returns: Tuple[str, FileSystem]: The normalized path and filesystem.
raises: AssertionError: If multiple paths are resolved.
"""
paths, filesystem = _resolve_paths_and_filesystem(
paths=path, filesystem=filesystem
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since moving to fsspec/pyarrow, I'm not convinced this whole class actually adds any value. Have you thought of what the code looks like if you just delete this class, move the filesystem into metastore, and go from there?

It feels like there's a much simpler/more straightforward implementation possible if the metastore handles the locations of the file read/write, and uses a pyarrow fs to do all the reads/writes directly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After offline chat we agreed that most of the FileStore and FileProvider should be behind the Deltacat storage interface and isn't something we need anymore. This pr just focuses on migrating to pyarrow fs which will help setup for the pr @mcember is working on, which should iterate on this pr and unify with deltacat storage.

@anshumankomawar anshumankomawar merged commit 2f038d6 into 2.0 Jan 27, 2025
3 checks passed
@anshumankomawar anshumankomawar deleted the anshumankomawar/fsspec branch January 31, 2025 23:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants