diff --git a/.flake8 b/.flake8 index b4f70a67..a9bdedd5 100644 --- a/.flake8 +++ b/.flake8 @@ -18,6 +18,7 @@ show_source = true count = true per-file-ignores = upath/__init__.py: F401 + upath/_parser.py: E501 exclude = .noxfile, .nox, diff --git a/pyproject.toml b/pyproject.toml index d3de7bc5..3898bcb2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ maintainers = [ requires-python = ">=3.8" dependencies = [ "fsspec >=2022.1.0,!=2024.3.1", + "pathlib-abc ==0.3.1", ] classifiers = [ "Programming Language :: Python :: 3", @@ -160,6 +161,10 @@ ignore_missing_imports = true module = "webdav4.*" ignore_missing_imports = true +[[tool.mypy.overrides]] +module = "pathlib_abc.*" +ignore_missing_imports = true + [tool.pylint.format] max-line-length = 88 diff --git a/typesafety/test_upath_interface.yml b/typesafety/test_upath_interface.yml index 116411fb..a88629ff 100644 --- a/typesafety/test_upath_interface.yml +++ b/typesafety/test_upath_interface.yml @@ -556,15 +556,15 @@ main: | from upath import UPath - UPath("abc").walk # E: "UPath" has no attribute "walk" [attr-defined] + reveal_type(UPath("abc").walk()) # N: Revealed type is "typing.Generator[Tuple[upath.core.UPath, builtins.list[builtins.str], builtins.list[builtins.str]], None, None]" - case: upath_walk_py312plus disable_cache: false mypy_config: python_version = 3.12 main: | - from upath import UPath + from upath import UPath - reveal_type(UPath("abc").walk()) # N: Revealed type is "typing.Iterator[tuple[upath.core.UPath, builtins.list[builtins.str], builtins.list[builtins.str]]]" + reveal_type(UPath("abc").walk()) # N: Revealed type is "typing.Generator[tuple[upath.core.UPath, builtins.list[builtins.str], builtins.list[builtins.str]], None, None]" - case: upath_rename_extra_kwargs disable_cache: false diff --git a/upath/_abc.py b/upath/_abc.py new file mode 100644 index 00000000..0c582f43 --- /dev/null +++ b/upath/_abc.py @@ -0,0 +1,18 @@ +"""upath._abc + +Re-export of the `pathlib_abc` base classes `PathBase`, `PurePathBase`, +and `ParserBase`. This allows for type hinting of these classes more +easily via the stub file `upath/_abc.pyi`. +""" + +from pathlib_abc import ParserBase as ParserBase +from pathlib_abc import PathBase as PathBase +from pathlib_abc import PurePathBase as PurePathBase +from pathlib_abc import UnsupportedOperation as UnsupportedOperation + +__all__ = [ + "ParserBase", + "PurePathBase", + "PathBase", + "UnsupportedOperation", +] diff --git a/upath/_abc.pyi b/upath/_abc.pyi new file mode 100644 index 00000000..67ad889b --- /dev/null +++ b/upath/_abc.pyi @@ -0,0 +1,163 @@ +"""upath._abc type stubs + +Type stubs for the pathlib-abc classes we use in universal-pathlib. +""" + +import sys +from typing import IO +from typing import Any +from typing import Callable +from typing import Generator +from typing import Sequence + +if sys.version_info > (3, 11): + from typing import Self +elif sys.version_info > (3, 8): + + from typing_extensions import Self +else: + from typing_extensions import Self + +from upath._stat import _StatResultType as StatResultType + +class UnsupportedOperation(NotImplementedError): ... + +class ParserBase: + sep: str + def join(self, path: str, *paths: str) -> str: ... + def split(self, path: str) -> tuple[str, str]: ... + def splitdrive(self, path: str) -> tuple[str, str]: ... + def splitext(self, path: str) -> tuple[str, str]: ... + def normcase(self, path: str) -> str: ... + def isabs(self, path: str) -> bool: ... + +class PurePathBase: + _raw_path: str + _resolving: bool + + @classmethod + def _unsupported_msg(cls, attribute) -> str: ... + def __init__(self, path, *paths) -> None: ... + def with_segments(self, *pathsegments: str) -> Self: ... + def __str__(self) -> str: ... + def as_posix(self) -> str: ... + drive: str + root: str + anchor: str + name: str + suffix: str + suffixes: list[str] + stem: str + def with_name(self, name: str) -> Self: ... + def with_stem(self, stem: str) -> Self: ... + def with_suffix(self, suffix: str) -> Self: ... + def relative_to( + self, other: str | PurePathBase, *, walk_up: bool = False + ) -> Self: ... + def is_relative_to(self, other: str | PurePathBase) -> bool: ... + parts: tuple[str, ...] + def joinpath(self, *pathsegments: str) -> Self: ... + def __truediv__(self, other: str) -> Self: ... + def __rtruediv__(self, other: str) -> Self: ... + _stack: tuple[str, list[str]] + parent: Self + parents: Sequence[Self] + def is_absolute(self) -> bool: ... + _pattern_str: str + def match( + self, path_pattern: str, *, case_sensitive: bool | None = None + ) -> bool: ... + def full_match( + self, pattern: str, *, case_sensitive: bool | None = None + ) -> bool: ... + +class PathBase(PurePathBase): + _max_symlinks: int + + def stat(self, *, follow_symlinks: bool = True) -> StatResultType: ... + def lstat(self) -> StatResultType: ... + def exists(self, *, follow_symlinks: bool = True) -> bool: ... + def is_dir(self, *, follow_symlinks: bool = True) -> bool: ... + def is_file(self, *, follow_symlinks: bool = True) -> bool: ... + def is_mount(self) -> bool: ... + def is_symlink(self) -> bool: ... + def is_junction(self) -> bool: ... + def is_block_device(self) -> bool: ... + def is_char_device(self) -> bool: ... + def is_fifo(self) -> bool: ... + def is_socket(self) -> bool: ... + def samefile(self, other_path: str | Self) -> bool: ... + def open( + self, + mode: str = "r", + buffering: int = -1, + encoding: str | None = None, + errors: str | None = None, + newline: str | None = None, + ) -> IO[Any]: ... + def read_bytes(self) -> bytes: ... + def read_text( + self, + encoding: str | None = None, + errors: str | None = None, + newline: str | None = None, + ) -> str: ... + def write_bytes(self, data: bytes) -> int: ... + def write_text( + self, + data: str, + encoding: str | None = None, + errors: str | None = None, + newline: str | None = None, + ) -> int: ... + def iterdir(self) -> Generator[Self, None, None]: ... + def _glob_selector( + self, parts: list[str], case_sensitive: bool | None, recurse_symlinks: bool + ) -> Callable[[Self], Generator[Self, None, None]]: ... + def glob( + self, + pattern: str, + *, + case_sensitive: bool | None = None, + recurse_symlinks: bool = True, + ) -> Generator[Self, None, None]: ... + def rglob( + self, + pattern: str, + *, + case_sensitive: bool | None = None, + recurse_symlinks: bool = True, + ) -> Generator[Self, None, None]: ... + def walk( + self, + top_down: bool = True, + on_error: Callable[[Exception], None] | None = None, + follow_symlinks: bool = False, + ) -> Generator[tuple[Self, list[str], list[str]], None, None]: ... + def absolute(self) -> Self: ... + @classmethod + def cwd(cls) -> Self: ... + def expanduser(self) -> Self: ... + @classmethod + def home(cls) -> Self: ... + def readlink(self) -> Self: ... + def resolve(self, strict: bool = False) -> Self: ... + def symlink_to( + self, target: str | Self, target_is_directory: bool = False + ) -> None: ... + def hardlink_to(self, target: str | Self) -> None: ... + def touch(self, mode: int = 0o666, exist_ok: bool = True) -> None: ... + def mkdir( + self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False + ) -> None: ... + def rename(self, target: str | Self) -> Self: ... + def replace(self, target: str | Self) -> Self: ... + def chmod(self, mode: int, *, follow_symlinks: bool = True) -> None: ... + def lchmod(self, mode: int) -> None: ... + def unlink(self, missing_ok: bool = False) -> None: ... + def rmdir(self) -> None: ... + def owner(self, *, follow_symlinks: bool = True) -> str: ... + def group(self, *, follow_symlinks: bool = True) -> str: ... + @classmethod + def from_uri(cls, uri: str) -> Self: ... + def as_uri(self) -> str: ... diff --git a/upath/_compat.py b/upath/_compat.py index 334888f3..be3f0065 100644 --- a/upath/_compat.py +++ b/upath/_compat.py @@ -1,373 +1,20 @@ from __future__ import annotations -import ntpath -import os -import posixpath import sys import warnings -from collections.abc import Sequence from functools import wraps -from pathlib import Path -from pathlib import PurePath -from typing import TYPE_CHECKING from typing import Any from typing import Callable from typing import TypeVar -from urllib.parse import SplitResult - -from fsspec import get_filesystem_class - -if TYPE_CHECKING: - from upath import UPath __all__ = [ - "PathlibPathShim", "str_remove_prefix", "str_remove_suffix", - "FSSpecAccessorShim", "deprecated", + "make_instance", ] -if sys.version_info >= (3, 12): # noqa: C901 - - class PathlibPathShim: - """no need to shim pathlib.Path in Python 3.12+""" - - __slots__ = () - __missing_py312_slots__ = () - - def __init__(self, *args): - super().__init__(*args) - -else: - - def _get_missing_py312_pathlib_slots(): - """Return a tuple of slots that are present in Python 3.12's - pathlib.Path but not in the current version of pathlib.Path - """ - py312_slots = ( - "_raw_paths", - "_drv", - "_root", - "_tail_cached", - "_str", - "_str_normcase_cached", - "_parts_normcase_cached", - "_lines_cached", - "_hash", - ) - current_slots = [ - slot for cls in Path.__mro__ for slot in getattr(cls, "__slots__", []) - ] - return tuple([slot for slot in py312_slots if slot not in current_slots]) - - class PathlibPathShim: - """A compatibility shim for python < 3.12 - - Basically vendoring the functionality of pathlib.Path from Python 3.12 - that's not overwritten in upath.core.UPath - - """ - - __slots__ = () - __missing_py312_slots__ = _get_missing_py312_pathlib_slots() - - def __init__(self, *args): - paths = [] - for arg in args: - if isinstance(arg, PurePath) and hasattr(arg, "_raw_paths"): - if arg._flavour is ntpath and self._flavour is posixpath: - # GH-103631: Convert separators for backwards compatibility. - paths.extend(path.replace("\\", "/") for path in arg._raw_paths) - else: - paths.extend(arg._raw_paths) - else: - try: - path = os.fspath(arg) - except TypeError: - path = arg - if not isinstance(path, str): - raise TypeError( - "argument should be a str or an os.PathLike " - "object where __fspath__ returns a str, " - f"not {type(path).__name__!r}" - ) - paths.append(path) - self._raw_paths = paths - - @classmethod - def _parse_path(cls, path): - if not path: - return "", "", [] - sep = cls._flavour.sep - altsep = cls._flavour.altsep - if altsep: - path = path.replace(altsep, sep) - drv, root, rel = cls._flavour.splitroot(path) - if not root and drv.startswith(sep) and not drv.endswith(sep): - drv_parts = drv.split(sep) - if len(drv_parts) == 4 and drv_parts[2] not in "?.": - # e.g. //server/share - root = sep - elif len(drv_parts) == 6: - # e.g. //?/unc/server/share - root = sep - parsed = [sys.intern(str(x)) for x in rel.split(sep) if x and x != "."] - return drv, root, parsed - - def _load_parts(self): - paths = self._raw_paths - if len(paths) == 0: - path = "" - elif len(paths) == 1: - path = paths[0] - else: - path = self._flavour.join(*paths) - drv, root, tail = self._parse_path(path) - self._drv = drv - self._root = root - self._tail_cached = tail - - def _from_parsed_parts(self, drv, root, tail): - path_str = self._format_parsed_parts(drv, root, tail) - path = self.with_segments(path_str) - path._str = path_str or "." - path._drv = drv - path._root = root - path._tail_cached = tail - return path - - @classmethod - def _format_parsed_parts(cls, drv, root, tail): - if drv or root: - return drv + root + cls._flavour.sep.join(tail) - elif tail and cls._flavour.splitdrive(tail[0])[0]: - tail = ["."] + tail - return cls._flavour.sep.join(tail) - - def __str__(self): - try: - return self._str - except AttributeError: - self._str = ( - self._format_parsed_parts(self.drive, self.root, self._tail) or "." - ) - return self._str - - @property - def drive(self): - try: - return self._drv - except AttributeError: - self._load_parts() - return self._drv - - @property - def root(self): - try: - return self._root - except AttributeError: - self._load_parts() - return self._root - - @property - def _tail(self): - try: - return self._tail_cached - except AttributeError: - self._load_parts() - return self._tail_cached - - @property - def anchor(self): - anchor = self.drive + self.root - return anchor - - @property - def name(self): - tail = self._tail - if not tail: - return "" - return tail[-1] - - @property - def suffix(self): - name = self.name - i = name.rfind(".") - if 0 < i < len(name) - 1: - return name[i:] - else: - return "" - - @property - def suffixes(self): - name = self.name - if name.endswith("."): - return [] - name = name.lstrip(".") - return ["." + suffix for suffix in name.split(".")[1:]] - - @property - def stem(self): - name = self.name - i = name.rfind(".") - if 0 < i < len(name) - 1: - return name[:i] - else: - return name - - def with_name(self, name): - if not self.name: - raise ValueError(f"{self!r} has an empty name") - f = self._flavour - if ( - not name - or f.sep in name - or (f.altsep and f.altsep in name) - or name == "." - ): - raise ValueError("Invalid name %r" % (name)) - return self._from_parsed_parts( - self.drive, self.root, self._tail[:-1] + [name] - ) - - def with_stem(self, stem): - return self.with_name(stem + self.suffix) - - def with_suffix(self, suffix): - f = self._flavour - if f.sep in suffix or f.altsep and f.altsep in suffix: - raise ValueError(f"Invalid suffix {suffix!r}") - if suffix and not suffix.startswith(".") or suffix == ".": - raise ValueError("Invalid suffix %r" % (suffix)) - name = self.name - if not name: - raise ValueError(f"{self!r} has an empty name") - old_suffix = self.suffix - if not old_suffix: - name = name + suffix - else: - name = name[: -len(old_suffix)] + suffix - return self._from_parsed_parts( - self.drive, self.root, self._tail[:-1] + [name] - ) - - def relative_to(self, other, /, *_deprecated, walk_up=False): - if _deprecated: - msg = ( - "support for supplying more than one positional argument " - "to pathlib.PurePath.relative_to() is deprecated and " - "scheduled for removal in Python 3.14" - ) - warnings.warn( - f"pathlib.PurePath.relative_to(*args) {msg}", - DeprecationWarning, - stacklevel=2, - ) - other = self.with_segments(other, *_deprecated) - for step, path in enumerate([other] + list(other.parents)): # noqa: B007 - if self.is_relative_to(path): - break - elif not walk_up: - raise ValueError( - f"{str(self)!r} is not in the subpath of {str(other)!r}" - ) - elif path.name == "..": - raise ValueError(f"'..' segment in {str(other)!r} cannot be walked") - else: - raise ValueError( - f"{str(self)!r} and {str(other)!r} have different anchors" - ) - parts = [".."] * step + self._tail[len(path._tail) :] - return self.with_segments(*parts) - - def is_relative_to(self, other, /, *_deprecated): - if _deprecated: - msg = ( - "support for supplying more than one argument to " - "pathlib.PurePath.is_relative_to() is deprecated and " - "scheduled for removal in Python 3.14" - ) - warnings.warn( - f"pathlib.PurePath.is_relative_to(*args) {msg}", - DeprecationWarning, - stacklevel=2, - ) - other = self.with_segments(other, *_deprecated) - return other == self or other in self.parents - - @property - def parts(self): - if self.drive or self.root: - return (self.drive + self.root,) + tuple(self._tail) - else: - return tuple(self._tail) - - @property - def parent(self): - drv = self.drive - root = self.root - tail = self._tail - if not tail: - return self - return self._from_parsed_parts(drv, root, tail[:-1]) - - @property - def parents(self): - return _PathParents(self) - - def _make_child_relpath(self, name): - path_str = str(self) - tail = self._tail - if tail: - path_str = f"{path_str}{self._flavour.sep}{name}" - elif path_str != ".": - path_str = f"{path_str}{name}" - else: - path_str = name - path = self.with_segments(path_str) - path._str = path_str - path._drv = self.drive - path._root = self.root - path._tail_cached = tail + [name] - return path - - def lchmod(self, mode): - """ - Like chmod(), except if the path points to a symlink, the symlink's - permissions are changed, rather than its target's. - """ - self.chmod(mode, follow_symlinks=False) - - class _PathParents(Sequence): - __slots__ = ("_path", "_drv", "_root", "_tail") - - def __init__(self, path): - self._path = path - self._drv = path.drive - self._root = path.root - self._tail = path._tail - - def __len__(self): - return len(self._tail) - - def __getitem__(self, idx): - if isinstance(idx, slice): - return tuple(self[i] for i in range(*idx.indices(len(self)))) - - if idx >= len(self) or idx < -len(self): - raise IndexError(idx) - if idx < 0: - idx += len(self) - return self._path._from_parsed_parts( - self._drv, self._root, self._tail[: -idx - 1] - ) - - def __repr__(self): - return f"<{type(self._path).__name__}.parents>" - - if sys.version_info >= (3, 9): str_remove_suffix = str.removesuffix str_remove_prefix = str.removeprefix @@ -387,92 +34,12 @@ def str_remove_prefix(s: str, prefix: str) -> str: return s -class FSSpecAccessorShim: - """this is a compatibility shim and will be removed""" +C = TypeVar("C") - def __init__(self, parsed_url: SplitResult | None, **kwargs: Any) -> None: - if parsed_url and parsed_url.scheme: - cls = get_filesystem_class(parsed_url.scheme) - url_kwargs = cls._get_kwargs_from_urls(parsed_url.geturl()) - else: - cls = get_filesystem_class(None) - url_kwargs = {} - url_kwargs.update(kwargs) - self._fs = cls(**url_kwargs) - - def __init_subclass__(cls, **kwargs): - warnings.warn( - "All _FSSpecAccessor subclasses have been deprecated. " - " Please follow the universal_pathlib==0.2.0 migration guide at" - " https://github.com/fsspec/universal_pathlib for more" - " information.", - DeprecationWarning, - stacklevel=2, - ) - - @classmethod - def from_path(cls, path: UPath) -> FSSpecAccessorShim: - """internal accessor for backwards compatibility""" - url = path._url._replace(scheme=path.protocol) - obj = cls(url, **path.storage_options) - obj.__dict__["_fs"] = path.fs - return obj - - def _format_path(self, path: UPath) -> str: - return path.path - - def open(self, path, mode="r", *args, **kwargs): - return path.fs.open(self._format_path(path), mode, *args, **kwargs) - - def stat(self, path, **kwargs): - return path.fs.stat(self._format_path(path), **kwargs) - - def listdir(self, path, **kwargs): - p_fmt = self._format_path(path) - contents = path.fs.listdir(p_fmt, **kwargs) - if len(contents) == 0 and not path.fs.isdir(p_fmt): - raise NotADirectoryError(str(self)) - elif ( - len(contents) == 1 - and contents[0]["name"] == p_fmt - and contents[0]["type"] == "file" - ): - raise NotADirectoryError(str(self)) - return contents - - def glob(self, _path, path_pattern, **kwargs): - return _path.fs.glob(self._format_path(path_pattern), **kwargs) - - def exists(self, path, **kwargs): - return path.fs.exists(self._format_path(path), **kwargs) - def info(self, path, **kwargs): - return path.fs.info(self._format_path(path), **kwargs) - - def rm(self, path, recursive, **kwargs): - return path.fs.rm(self._format_path(path), recursive=recursive, **kwargs) - - def mkdir(self, path, create_parents=True, **kwargs): - return path.fs.mkdir( - self._format_path(path), create_parents=create_parents, **kwargs - ) - - def makedirs(self, path, exist_ok=False, **kwargs): - return path.fs.makedirs(self._format_path(path), exist_ok=exist_ok, **kwargs) - - def touch(self, path, **kwargs): - return path.fs.touch(self._format_path(path), **kwargs) - - def mv(self, path, target, recursive=False, maxdepth=None, **kwargs): - if hasattr(target, "_accessor"): - target = target._accessor._format_path(target) - return path.fs.mv( - self._format_path(path), - target, - recursive=recursive, - maxdepth=maxdepth, - **kwargs, - ) +def make_instance(cls: type[C], args: tuple[Any, ...], kwargs: dict[str, Any]) -> C: + """helper for pickling UPath instances""" + return cls(*args, **kwargs) RT = TypeVar("RT") @@ -501,15 +68,3 @@ def wrapper(*args, **kwargs): return func return deprecated_decorator - - -class method_and_classmethod: - """Allow a method to be used as both a method and a classmethod""" - - def __init__(self, method): - self.method = method - - def __get__(self, instance, owner): - if instance is None: - return self.method.__get__(owner) - return self.method.__get__(instance) diff --git a/upath/_flavour.py b/upath/_flavour.py deleted file mode 100644 index 944ba809..00000000 --- a/upath/_flavour.py +++ /dev/null @@ -1,506 +0,0 @@ -from __future__ import annotations - -import os.path -import posixpath -import sys -import warnings -from functools import lru_cache -from typing import TYPE_CHECKING -from typing import Any -from typing import Mapping -from typing import Sequence -from typing import TypedDict -from typing import Union -from urllib.parse import SplitResult -from urllib.parse import urlsplit - -if sys.version_info >= (3, 12): - from typing import TypeAlias -else: - TypeAlias = Any - -from fsspec.registry import known_implementations -from fsspec.registry import registry as _class_registry -from fsspec.spec import AbstractFileSystem - -from upath._compat import deprecated -from upath._compat import str_remove_prefix -from upath._compat import str_remove_suffix -from upath._flavour_sources import FileSystemFlavourBase -from upath._flavour_sources import flavour_registry -from upath._protocol import get_upath_protocol -from upath._protocol import normalize_empty_netloc - -if TYPE_CHECKING: - from upath.core import UPath - -__all__ = [ - "LazyFlavourDescriptor", - "default_flavour", - "upath_urijoin", - "upath_get_kwargs_from_url", -] - -class_registry: Mapping[str, type[AbstractFileSystem]] = _class_registry -PathOrStr: TypeAlias = Union[str, "os.PathLike[str]"] - - -class AnyProtocolFileSystemFlavour(FileSystemFlavourBase): - sep = "/" - protocol = () - root_marker = "/" - - @classmethod - def _strip_protocol(cls, path: str) -> str: - protocol = get_upath_protocol(path) - if path.startswith(protocol + "://"): - path = path[len(protocol) + 3 :] - elif path.startswith(protocol + "::"): - path = path[len(protocol) + 2 :] - path = path.rstrip("/") - return path or cls.root_marker - - @staticmethod - def _get_kwargs_from_urls(path: str) -> dict[str, Any]: - return {} - - @classmethod - def _parent(cls, path): - path = cls._strip_protocol(path) - if "/" in path: - parent = path.rsplit("/", 1)[0].lstrip(cls.root_marker) - return cls.root_marker + parent - else: - return cls.root_marker - - -class ProtocolConfig(TypedDict): - netloc_is_anchor: set[str] - supports_empty_parts: set[str] - meaningful_trailing_slash: set[str] - root_marker_override: dict[str, str] - - -class WrappedFileSystemFlavour: # (pathlib_abc.FlavourBase) - """flavour class for universal_pathlib - - **INTERNAL AND VERY MUCH EXPERIMENTAL** - - Implements the fsspec compatible low-level lexical operations on - PurePathBase-like objects. - - Note: - In case you find yourself in need of subclassing this class, - please open an issue in the universal_pathlib issue tracker: - https://github.com/fsspec/universal_pathlib/issues - Ideally we can find a way to make your use-case work by adding - more functionality to this class. - - """ - - # Note: - # It would be ideal if there would be a way to avoid the need for - # indicating the following settings via the protocol. This is a - # workaround to be able to implement the flavour correctly. - # TODO: - # These settings should be configured on the UPath class?!? - protocol_config: ProtocolConfig = { - "netloc_is_anchor": { - "http", - "https", - "s3", - "s3a", - "smb", - "gs", - "gcs", - "az", - "adl", - "abfs", - "abfss", - "webdav+http", - "webdav+https", - }, - "supports_empty_parts": { - "http", - "https", - "s3", - "s3a", - "gs", - "gcs", - "az", - "adl", - "abfs", - }, - "meaningful_trailing_slash": { - "http", - "https", - }, - "root_marker_override": { - "ssh": "/", - "sftp": "/", - }, - } - - def __init__( - self, - spec: type[AbstractFileSystem | FileSystemFlavourBase] | AbstractFileSystem, - *, - netloc_is_anchor: bool = False, - supports_empty_parts: bool = False, - meaningful_trailing_slash: bool = False, - root_marker_override: str | None = None, - ) -> None: - """initialize the flavour with the given fsspec""" - self._spec = spec - - # netloc is considered an anchor, influences: - # - splitdrive - # - join - self.netloc_is_anchor = bool(netloc_is_anchor) - - # supports empty parts, influences: - # - join - # - UPath._parse_path - self.supports_empty_parts = bool(supports_empty_parts) - - # meaningful trailing slash, influences: - # - join - # - UPath._parse_path - self.has_meaningful_trailing_slash = bool(meaningful_trailing_slash) - - # some filesystems require UPath to enforce a specific root marker - if root_marker_override is None: - self.root_marker_override = None - else: - self.root_marker_override = str(root_marker_override) - - @classmethod - @lru_cache(maxsize=None) - def from_protocol( - cls, - protocol: str, - ) -> WrappedFileSystemFlavour: - """return the fsspec flavour for the given protocol""" - - _c = cls.protocol_config - config: dict[str, Any] = { - "netloc_is_anchor": protocol in _c["netloc_is_anchor"], - "supports_empty_parts": protocol in _c["supports_empty_parts"], - "meaningful_trailing_slash": protocol in _c["meaningful_trailing_slash"], - "root_marker_override": _c["root_marker_override"].get(protocol), - } - - # first try to get an already imported fsspec filesystem class - try: - return cls(class_registry[protocol], **config) - except KeyError: - pass - # next try to get the flavour from the generated flavour registry - # to avoid imports - try: - return cls(flavour_registry[protocol], **config) - except KeyError: - pass - # finally fallback to a default flavour for the protocol - if protocol in known_implementations: - warnings.warn( - f"Could not find default for known protocol {protocol!r}." - " Creating a default flavour for it. Please report this" - " to the universal_pathlib issue tracker.", - UserWarning, - stacklevel=2, - ) - return cls(AnyProtocolFileSystemFlavour, **config) - - def __repr__(self): - if isinstance(self._spec, type): - return f"" - else: - return f"" - - # === fsspec.AbstractFileSystem =================================== - - @property - def protocol(self) -> tuple[str, ...]: - if isinstance(self._spec.protocol, str): - return (self._spec.protocol,) - else: - return self._spec.protocol - - @property - def root_marker(self) -> str: - if self.root_marker_override is not None: - return self.root_marker_override - else: - return self._spec.root_marker - - @property - def local_file(self) -> bool: - return bool(getattr(self._spec, "local_file", False)) - - @staticmethod - def stringify_path(pth: PathOrStr) -> str: - if isinstance(pth, str): - out = pth - elif getattr(pth, "__fspath__", None) is not None: - out = pth.__fspath__() - elif isinstance(pth, os.PathLike): - out = str(pth) - elif hasattr(pth, "path"): # type: ignore[unreachable] - out = pth.path - else: - out = str(pth) - return normalize_empty_netloc(out) - - def strip_protocol(self, pth: PathOrStr) -> str: - pth = self.stringify_path(pth) - return self._spec._strip_protocol(pth) - - def get_kwargs_from_url(self, url: PathOrStr) -> dict[str, Any]: - # NOTE: the public variant is _from_url not _from_urls - if hasattr(url, "storage_options"): - return dict(url.storage_options) - url = self.stringify_path(url) - return self._spec._get_kwargs_from_urls(url) - - def parent(self, path: PathOrStr) -> str: - path = self.stringify_path(path) - return self._spec._parent(path) - - # === pathlib_abc.FlavourBase ===================================== - - @property - def sep(self) -> str: - return self._spec.sep - - @property - def altsep(self) -> str | None: - return None - - def isabs(self, path: PathOrStr) -> bool: - path = self.strip_protocol(path) - if self.local_file: - return os.path.isabs(path) - else: - return path.startswith(self.root_marker) - - def join(self, path: PathOrStr, *paths: PathOrStr) -> str: - if self.netloc_is_anchor: - drv, p0 = self.splitdrive(path) - pN = list(map(self.stringify_path, paths)) - if not drv and not p0: - path, *pN = pN - drv, p0 = self.splitdrive(path) - p0 = p0 or self.sep - else: - p0 = str(self.strip_protocol(path)) or self.root_marker - pN = list(map(self.stringify_path, paths)) - drv = "" - if self.supports_empty_parts: - return drv + self.sep.join([str_remove_suffix(p0, self.sep), *pN]) - else: - return drv + posixpath.join(p0, *pN) - - def split(self, path: PathOrStr): - stripped_path = self.strip_protocol(path) - head = self.parent(stripped_path) or self.root_marker - if head: - return head, stripped_path[len(head) + 1 :] - else: - return "", stripped_path - - def splitdrive(self, path: PathOrStr) -> tuple[str, str]: - path = self.strip_protocol(path) - if self.netloc_is_anchor: - u = urlsplit(path) - if u.scheme: - # cases like: "http://example.com/foo/bar" - drive = u._replace(path="", query="", fragment="").geturl() - rest = u._replace(scheme="", netloc="").geturl() - if ( - u.path.startswith("//") - and SplitResult("", "", "//", "", "").geturl() == "////" - ): - # see: fsspec/universal_pathlib#233 - rest = rest[2:] - return drive, rest or self.root_marker or self.sep - else: - # cases like: "bucket/some/special/key - drive, root, tail = path.partition(self.sep) - return drive, root + tail - elif self.local_file: - return os.path.splitdrive(path) - else: - # all other cases don't have a drive - return "", path - - def normcase(self, path: PathOrStr) -> str: - if self.local_file: - return os.path.normcase(self.stringify_path(path)) - else: - return self.stringify_path(path) - - # === Python3.12 pathlib flavour ================================== - - def splitroot(self, path: PathOrStr) -> tuple[str, str, str]: - drive, tail = self.splitdrive(path) - if self.netloc_is_anchor: - root_marker = self.root_marker or self.sep - else: - root_marker = self.root_marker - return drive, root_marker, str_remove_prefix(tail, self.sep) - - # === deprecated backwards compatibility =========================== - - @deprecated(python_version=(3, 12)) - def casefold(self, s: str) -> str: - if self.local_file: - return s - else: - return s.lower() - - @deprecated(python_version=(3, 12)) - def parse_parts(self, parts: Sequence[str]) -> tuple[str, str, list[str]]: - parsed = [] - sep = self.sep - drv = root = "" - it = reversed(parts) - for part in it: - if part: - drv, root, rel = self.splitroot(part) - if not root or root and rel: - for x in reversed(rel.split(sep)): - parsed.append(sys.intern(x)) - if drv or root: - parsed.append(drv + root) - parsed.reverse() - return drv, root, parsed - - @deprecated(python_version=(3, 12)) - def join_parsed_parts( - self, - drv: str, - root: str, - parts: list[str], - drv2: str, - root2: str, - parts2: list[str], - ) -> tuple[str, str, list[str]]: - if root2: - if not drv2 and drv: - return drv, root2, [drv + root2] + parts2[1:] - elif drv2: - if drv2 == drv or self.casefold(drv2) == self.casefold(drv): - # Same drive => second path is relative to the first - return drv, root, parts + parts2[1:] - else: - # Second path is non-anchored (common case) - return drv, root, parts + parts2 - return drv2, root2, parts2 - - -default_flavour = WrappedFileSystemFlavour(AnyProtocolFileSystemFlavour) - - -class LazyFlavourDescriptor: - """descriptor to lazily get the flavour for a given protocol""" - - def __init__(self) -> None: - self._owner: type[UPath] | None = None - - def __set_name__(self, owner: type[UPath], name: str) -> None: - # helper to provide a more informative repr - self._owner = owner - self._default_protocol: str | None - try: - self._default_protocol = self._owner.protocols[0] # type: ignore - except (AttributeError, IndexError): - self._default_protocol = None - - def __get__(self, instance: UPath, owner: type[UPath]) -> WrappedFileSystemFlavour: - if instance is not None: - return WrappedFileSystemFlavour.from_protocol(instance.protocol) - elif self._default_protocol: # type: ignore - return WrappedFileSystemFlavour.from_protocol(self._default_protocol) - else: - return default_flavour - - def __repr__(self): - cls_name = f"{type(self).__name__}" - if self._owner is None: - return f"" - else: - return f"<{cls_name} of {self._owner.__name__}>" - - -def upath_strip_protocol(pth: PathOrStr) -> str: - if protocol := get_upath_protocol(pth): - return WrappedFileSystemFlavour.from_protocol(protocol).strip_protocol(pth) - return WrappedFileSystemFlavour.stringify_path(pth) - - -def upath_get_kwargs_from_url(url: PathOrStr) -> dict[str, Any]: - if protocol := get_upath_protocol(url): - return WrappedFileSystemFlavour.from_protocol(protocol).get_kwargs_from_url(url) - return {} - - -def upath_urijoin(base: str, uri: str) -> str: - """Join a base URI and a possibly relative URI to form an absolute - interpretation of the latter.""" - # see: - # https://github.com/python/cpython/blob/ae6c01d9d2/Lib/urllib/parse.py#L539-L605 - # modifications: - # - removed allow_fragments parameter - # - all schemes are considered to allow relative paths - # - all schemes are considered to allow netloc (revisit this) - # - no bytes support (removes encoding and decoding) - if not base: - return uri - if not uri: - return base - - bs = urlsplit(base, scheme="") - us = urlsplit(uri, scheme=bs.scheme) - - if us.scheme != bs.scheme: # or us.scheme not in uses_relative: - return uri - # if us.scheme in uses_netloc: - if us.netloc: - return us.geturl() - else: - us = us._replace(netloc=bs.netloc) - # end if - if not us.path and not us.fragment: - us = us._replace(path=bs.path, fragment=bs.fragment) - if not us.query: - us = us._replace(query=bs.query) - return us.geturl() - - base_parts = bs.path.split("/") - if base_parts[-1] != "": - del base_parts[-1] - - if us.path[:1] == "/": - segments = us.path.split("/") - else: - segments = base_parts + us.path.split("/") - segments[1:-1] = filter(None, segments[1:-1]) - - resolved_path: list[str] = [] - - for seg in segments: - if seg == "..": - try: - resolved_path.pop() - except IndexError: - pass - elif seg == ".": - continue - else: - resolved_path.append(seg) - - if segments[-1] in (".", ".."): - resolved_path.append("") - - return us._replace(path="/".join(resolved_path) or "/").geturl() diff --git a/upath/_parser.py b/upath/_parser.py new file mode 100644 index 00000000..8f4310de --- /dev/null +++ b/upath/_parser.py @@ -0,0 +1,376 @@ +"""upath._parser + +Provides a pathlib_abc.ParserBase implementation for fsspec filesystems. +""" + +from __future__ import annotations + +import os +import posixpath +import sys +import warnings +from functools import lru_cache +from typing import TYPE_CHECKING +from typing import Any +from typing import Literal +from typing import Protocol +from typing import TypedDict +from urllib.parse import SplitResult +from urllib.parse import urlsplit + +from fsspec import AbstractFileSystem +from fsspec.registry import known_implementations +from fsspec.registry import registry as _class_registry +from pathlib_abc import ParserBase + +from upath._compat import str_remove_suffix +from upath._flavour_sources import flavour_registry as _flavour_registry +from upath._uris import normalize_empty_netloc + +if sys.version_info >= (3, 11): + from typing import Self +else: + from typing_extensions import Self + +if TYPE_CHECKING: + from upath.core import PureUPath + from upath.core import UPathLike + + +__all__ = [ + "FSSpecParserDescriptor", + "FSSpecParser", +] + +# note: this changed in Python https://github.com/python/cpython/pull/113563 +URLSPLIT_NORMALIZES_DOUBLE_SLASH = ( + SplitResult._make(("", "", "//", "", "")).geturl() == "////" +) + + +class ParserProtocol(Protocol): + """parser interface of fsspec filesystems""" + + protocol: str | tuple[str, ...] + root_marker: Literal["/", ""] + sep: Literal["/"] + + @classmethod + def _strip_protocol(cls, path: UPathLike) -> str: ... + + @staticmethod + def _get_kwargs_from_urls(path: UPathLike) -> dict[str, Any]: ... + + @classmethod + def _parent(cls, path: UPathLike) -> str: ... + + +class ProtocolConfig(TypedDict): + netloc_is_anchor: set[str] + supports_empty_parts: set[str] + meaningful_trailing_slash: set[str] + root_marker_override: dict[str, str] + + +# === registries of fsspec filesystems for uri parsing ================== + +pure_parser_registry: dict[str, ParserProtocol] = _flavour_registry # type: ignore[assignment] +concrete_fs_registry: dict[str, ParserProtocol] = _class_registry # type: ignore[assignment] + + +# === parser implementations =========================================== + + +class FSSpecParserDescriptor: + """Non-data descriptor for the `parser` attribute of a `UPath` subclass.""" + + def __init__(self) -> None: + self._owner: type[PureUPath] | None = None + + def __set_name__(self, owner: type[PureUPath], name: str) -> None: + self._owner = owner + + def __get__( + self, instance: PureUPath | None, owner: type[PureUPath] | None = None + ) -> FSSpecParser: + if instance is not None: + return FSSpecParser.from_protocol(instance.protocol) + elif owner is not None: + if not owner._supported_protocols: + raise AttributeError( + "Cannot access `parser` attribute on the generic UPath class." + ) + else: + return FSSpecParser.from_protocol(owner._supported_protocols[0]) + else: + return self # type: ignore + + def __repr__(self): + return f"<{self.__class__.__name__} of {self._owner!r}>" + + +class FSSpecParser(ParserBase): + """parser class for universal_pathlib + + **INTERNAL AND VERY MUCH EXPERIMENTAL** + + Implements the fsspec compatible low-level lexical operations on + PurePathBase-like objects. + + Note: + In case you find yourself in need of subclassing this class, + please open an issue in the universal_pathlib issue tracker: + https://github.com/fsspec/universal_pathlib/issues + Ideally we can find a way to make your use-case work by adding + more functionality to this class. + + """ + + # Note: + # It would be ideal if there would be a way to avoid the need for + # indicating the following settings via the protocol. This is a + # workaround to be able to implement the parser correctly. Still + # have to wrap my head around how to do this in a better way. + # TODO: + # These settings should be configured on the UPath class?!? + protocol_config: ProtocolConfig = { + "netloc_is_anchor": { + "http", + "https", + "s3", + "s3a", + "sftp", + "ssh", + "smb", + "gs", + "gcs", + "az", + "adl", + "abfs", + "webdav+http", + "webdav+https", + }, + "supports_empty_parts": { + "http", + "https", + "s3", + "s3a", + "gs", + "gcs", + "az", + "adl", + "abfs", + }, + "meaningful_trailing_slash": { + "http", + "https", + }, + "root_marker_override": { + "ssh": "/", + "sftp": "/", + }, + } + + def __init__( + self, + spec: ParserProtocol, + *, + netloc_is_anchor: bool = False, + supports_empty_parts: bool = False, + meaningful_trailing_slash: bool = False, + root_marker_override: str | None = None, + ) -> None: + """initialize the parser with the given fsspec filesystem""" + self._spec = spec + + # netloc is considered an anchor, influences: + # - splitdrive + # - join + self.netloc_is_anchor = bool(netloc_is_anchor) + + # supports empty parts, influences: + # - join + self.supports_empty_parts = bool(supports_empty_parts) + + # meaningful trailing slash, influences: + # - join + self.has_meaningful_trailing_slash = bool(meaningful_trailing_slash) + + # some filesystems require UPath to enforce a specific root marker + if root_marker_override is None: + self.root_marker_override = None + else: + self.root_marker_override = str(root_marker_override) + + @classmethod + @lru_cache(maxsize=None) + def from_protocol(cls, protocol: str) -> Self: + """return the fsspec flavour for the given protocol""" + + _c = cls.protocol_config + config = { + "netloc_is_anchor": protocol in _c["netloc_is_anchor"], + "supports_empty_parts": protocol in _c["supports_empty_parts"], + "meaningful_trailing_slash": protocol in _c["meaningful_trailing_slash"], + "root_marker_override": _c["root_marker_override"].get(protocol), + } + + # first try to get an already imported fsspec filesystem class + try: + return cls(concrete_fs_registry[protocol], **config) + except KeyError: + pass + # next try to get the flavour from the generated flavour registry + # to avoid imports + try: + return cls(pure_parser_registry[protocol], **config) + except KeyError: + pass + # finally fallback to a default flavour for the protocol + if protocol in known_implementations: + warnings.warn( + f"Could not find default for known protocol {protocol!r}." + " Creating a default flavour for it. Please report this" + " to the universal_pathlib issue tracker.", + UserWarning, + stacklevel=2, + ) + spec: Any = type( + f"{protocol.title()}FileSystem", + (AbstractFileSystem,), + {"protocol": protocol}, + ) + return cls(spec, **config) + + def __repr__(self): + if isinstance(self._spec, type): + return f"<{type(self).__name__} wrapping class {self._spec.__name__}>" + else: + return f"<{type(self).__name__} wrapping instance {self._spec!r} of {self._spec.__class__.__name__}>" + + # === fsspec.AbstractFileSystem =================================== + + @property + def protocol(self) -> tuple[str, ...]: + if isinstance(self._spec.protocol, str): + return (self._spec.protocol,) + else: + return self._spec.protocol + + @property + def root_marker(self) -> str: + if self.root_marker_override is not None: + return self.root_marker_override + else: + return self._spec.root_marker + + @property + def local_file(self) -> bool: + return bool(getattr(self._spec, "local_file", False)) + + @staticmethod + def stringify_path(pth: UPathLike) -> str: + if isinstance(pth, str): + out = pth + elif hasattr(pth, "__fspath__") and pth.__fspath__ is not None: + out = pth.__fspath__() + elif isinstance(pth, os.PathLike): + out = str(pth) + elif hasattr(pth, "path"): # type: ignore[unreachable] + out = pth.path + else: + out = str(pth) + return normalize_empty_netloc(out) + + def strip_protocol(self, pth: UPathLike) -> str: + pth = self.stringify_path(pth) + return self._spec._strip_protocol(pth) + + def get_kwargs_from_url(self, url: UPathLike) -> dict[str, Any]: + # NOTE: the public variant is _from_url not _from_urls + if hasattr(url, "storage_options"): + return dict(url.storage_options) + url = self.stringify_path(url) + return self._spec._get_kwargs_from_urls(url) + + def parent(self, path: UPathLike) -> str: + path = self.stringify_path(path) + return self._spec._parent(path) + + # === pathlib_abc.ParserBase ===================================== + + @property + def sep(self) -> str: + return self._spec.sep + + def join(self, path: UPathLike, *paths: UPathLike) -> str: + if self.netloc_is_anchor: + drv, p0 = self.splitdrive(path) + pN = list(map(self.stringify_path, paths)) + if not drv and not p0: + path, *pN = pN + drv, p0 = self.splitdrive(path) + p0 = p0 or self.sep + else: + p0 = str(self.strip_protocol(path)) or self.root_marker + pN = list(map(self.stringify_path, paths)) + drv = "" + if self.supports_empty_parts: + return drv + self.sep.join([str_remove_suffix(p0, self.sep), *pN]) + else: + return drv + posixpath.join(p0, *pN) + + def split(self, path: UPathLike) -> tuple[str, str]: + stripped_path = self.strip_protocol(path) + head = self.parent(stripped_path) or self.root_marker + if head: + tail = stripped_path[len(head) + 1 :] + return head, tail + elif not head and self.netloc_is_anchor: + return stripped_path, "" + else: + return "", stripped_path + + def splitdrive(self, path: UPathLike) -> tuple[str, str]: + path = self.strip_protocol(path) + if self.netloc_is_anchor: + u = urlsplit(path) + if u.scheme: + # cases like: "http://example.com/foo/bar" + drive = u._replace(path="", query="", fragment="").geturl() + rest = u._replace(scheme="", netloc="").geturl() + if URLSPLIT_NORMALIZES_DOUBLE_SLASH and u.path.startswith("//"): + # see: fsspec/universal_pathlib#233 + rest = rest[2:] + return drive, rest or self.root_marker or self.sep + else: + # cases like: "bucket/some/special/key" + drive, root, tail = path.partition(self.sep) + return drive, root + tail + elif self.local_file: + return os.path.splitdrive(path) + else: + # all other cases don't have a drive + return "", path + + def splitext(self, path: UPathLike) -> tuple[str, str]: + path = self.stringify_path(path) + head, sep, tail = path.rpartition(self.sep) + name, dot, ext = tail.rpartition(".") + if name: + return head + sep + name, dot + ext + else: + return path, "" + + def normcase(self, path: UPathLike) -> str: + if self.local_file: + return os.path.normcase(self.stringify_path(path)) + else: + return self.stringify_path(path) + + def isabs(self, path: UPathLike) -> bool: + path = self.strip_protocol(path) + if self.local_file: + return os.path.isabs(path) + else: + return path.startswith(self.root_marker) diff --git a/upath/_stat.py b/upath/_stat.py index c2ef5a0a..da8bf29a 100644 --- a/upath/_stat.py +++ b/upath/_stat.py @@ -13,8 +13,48 @@ __all__ = [ "UPathStatResult", + "_StatResultType", ] +from typing import TYPE_CHECKING + +if TYPE_CHECKING: # noqa: C901 + + class _StatResultType: + @property + def st_mode(self) -> int: ... + @property + def st_ino(self) -> int: ... + @property + def st_dev(self) -> int: ... + @property + def st_nlink(self) -> int: ... + @property + def st_uid(self) -> int: ... + @property + def st_gid(self) -> int: ... + @property + def st_size(self) -> int: ... + @property + def st_atime(self) -> float: ... + @property + def st_mtime(self) -> float: ... + @property + def st_ctime(self) -> float: ... + @property + def st_atime_ns(self) -> int: ... + @property + def st_mtime_ns(self) -> int: ... + @property + def st_ctime_ns(self) -> int: ... + @property + def st_birthtime(self) -> float: ... + @property + def st_birthtime_ns(self) -> int: ... + +else: + _StatResultType = object + def _convert_value_to_timestamp(value: Any) -> int | float: """Try to convert a datetime-like value to a timestamp.""" @@ -50,7 +90,7 @@ def _get_stat_result_extra_fields() -> tuple[str, ...]: return tuple(extra_fields) -class UPathStatResult: +class UPathStatResult(_StatResultType): """A stat_result compatible class wrapping fsspec info dicts. **Note**: It is unlikely that you will ever have to instantiate diff --git a/upath/_protocol.py b/upath/_uris.py similarity index 52% rename from upath/_protocol.py rename to upath/_uris.py index d333dd6a..8b038abc 100644 --- a/upath/_protocol.py +++ b/upath/_uris.py @@ -1,18 +1,19 @@ from __future__ import annotations -import os import re from pathlib import PurePath from typing import TYPE_CHECKING from typing import Any +from urllib.parse import urlsplit if TYPE_CHECKING: - from upath.core import UPath + from upath.core import UPathLike __all__ = [ "get_upath_protocol", "normalize_empty_netloc", "compatible_protocol", + "upath_urijoin", ] # Regular expression to match fsspec style protocols. @@ -34,7 +35,7 @@ def _match_protocol(pth: str) -> str: def get_upath_protocol( - pth: str | PurePath | os.PathLike, + pth: UPathLike, *, protocol: str | None = None, storage_options: dict[str, Any] | None = None, @@ -66,7 +67,7 @@ def normalize_empty_netloc(pth: str) -> str: return pth -def compatible_protocol(protocol: str, *args: str | os.PathLike[str] | UPath) -> bool: +def compatible_protocol(protocol: str, *args: UPathLike) -> bool: """check if UPath protocols are compatible""" for arg in args: other_protocol = get_upath_protocol(arg) @@ -76,3 +77,64 @@ def compatible_protocol(protocol: str, *args: str | os.PathLike[str] | UPath) -> if other_protocol and other_protocol != protocol: return False return True + + +def upath_urijoin(base: str, uri: str) -> str: + """Join a base URI and a possibly relative URI to form an absolute + interpretation of the latter.""" + # see: + # https://github.com/python/cpython/blob/ae6c01d9d2/Lib/urllib/parse.py#L539-L605 + # modifications: + # - removed allow_fragments parameter + # - all schemes are considered to allow relative paths + # - all schemes are considered to allow netloc (revisit this) + # - no bytes support (removes encoding and decoding) + if not base: + return uri + if not uri: + return base + + bs = urlsplit(base, scheme="") + us = urlsplit(uri, scheme=bs.scheme) + + if us.scheme != bs.scheme: # or us.scheme not in uses_relative: + return uri + # if us.scheme in uses_netloc: + if us.netloc: + return us.geturl() + else: + us = us._replace(netloc=bs.netloc) + # end if + if not us.path and not us.fragment: + us = us._replace(path=bs.path, fragment=bs.fragment) + if not us.query: + us = us._replace(query=bs.query) + return us.geturl() + + base_parts = bs.path.split("/") + if base_parts[-1] != "": + del base_parts[-1] + + if us.path[:1] == "/": + segments = us.path.split("/") + else: + segments = base_parts + us.path.split("/") + segments[1:-1] = filter(None, segments[1:-1]) + + resolved_path: list[str] = [] + + for seg in segments: + if seg == "..": + try: + resolved_path.pop() + except IndexError: + pass + elif seg == ".": + continue + else: + resolved_path.append(seg) + + if segments[-1] in (".", ".."): + resolved_path.append("") + + return us._replace(path="/".join(resolved_path) or "/").geturl() diff --git a/upath/core.py b/upath/core.py index 49d997ca..5f361f29 100644 --- a/upath/core.py +++ b/upath/core.py @@ -1,38 +1,39 @@ +"""UPath core module""" + from __future__ import annotations import os import sys import warnings from copy import copy -from pathlib import Path from types import MappingProxyType from typing import IO from typing import TYPE_CHECKING from typing import Any from typing import BinaryIO +from typing import Callable +from typing import Final from typing import Generator from typing import Literal from typing import Mapping -from typing import Sequence +from typing import NoReturn from typing import TextIO -from typing import TypeVar from typing import overload from urllib.parse import urlsplit from fsspec.registry import get_filesystem_class from fsspec.spec import AbstractFileSystem -from upath._compat import FSSpecAccessorShim -from upath._compat import PathlibPathShim -from upath._compat import method_and_classmethod -from upath._compat import str_remove_prefix -from upath._compat import str_remove_suffix -from upath._flavour import LazyFlavourDescriptor -from upath._flavour import upath_get_kwargs_from_url -from upath._flavour import upath_urijoin -from upath._protocol import compatible_protocol -from upath._protocol import get_upath_protocol +from upath._abc import PathBase +from upath._abc import PurePathBase +from upath._abc import UnsupportedOperation +from upath._compat import make_instance +from upath._parser import FSSpecParser +from upath._parser import FSSpecParserDescriptor from upath._stat import UPathStatResult +from upath._uris import compatible_protocol +from upath._uris import get_upath_protocol +from upath._uris import upath_urijoin from upath.registry import get_upath_class if TYPE_CHECKING: @@ -40,103 +41,259 @@ if sys.version_info >= (3, 11): from typing import Self + from typing import TypeAlias else: from typing_extensions import Self + from typing_extensions import TypeAlias -__all__ = ["UPath"] +__all__ = [ + "PureUPath", + "UPath", + "UPathLike", +] -def __getattr__(name): - if name == "_UriFlavour": - from upath._flavour import default_flavour - warnings.warn( - "upath.core._UriFlavour should not be used anymore." - " Please follow the universal_pathlib==0.2.0 migration guide at" - " https://github.com/fsspec/universal_pathlib for more" - " information.", - DeprecationWarning, - stacklevel=2, - ) - return default_flavour - elif name == "PT": - warnings.warn( - "upath.core.PT should not be used anymore." - " Please follow the universal_pathlib==0.2.0 migration guide at" - " https://github.com/fsspec/universal_pathlib for more" - " information.", - DeprecationWarning, - stacklevel=2, - ) - return TypeVar("PT", bound="UPath") +def __getattr__(name: str) -> NoReturn: + if name in {"_UriFlavour", "_FSSpecAccessor", "PT"}: + raise AttributeError(f"{name!r} was removed in universal_pathlib>=0.3.0") else: raise AttributeError(name) -_FSSPEC_HAS_WORKING_GLOB = None - - -def _check_fsspec_has_working_glob(): - global _FSSPEC_HAS_WORKING_GLOB - from fsspec.implementations.memory import MemoryFileSystem - - m = type("_M", (MemoryFileSystem,), {"store": {}, "pseudo_dirs": [""]})() - m.touch("a.txt") - m.touch("f/b.txt") - g = _FSSPEC_HAS_WORKING_GLOB = len(m.glob("**/*.txt")) == 2 - return g - +_UNSET: Final[Any] = object() -def _make_instance(cls, args, kwargs): - """helper for pickling UPath instances""" - return cls(*args, **kwargs) +# the os.PathLike[str] equivalent for UPath-like objects +UPathLike: TypeAlias = "str | os.PathLike[str] | PureUPath" -_unset: Any = object() +class PureUPath(PurePathBase): + """a pure version of UPath without the filesystem access -# accessors are deprecated -_FSSpecAccessor = FSSpecAccessorShim + Contrary to pathlib.PurePath, PureUPath is not a PathLike[AnyStr] subclass. + It does not ship with a __fspath__ method, and it does not support the + os.PathLike protocol. It also does not have a __bytes__ method. This means + that PureUPath subclasses do not represent local files unless they + explicitly implement the os.PathLike protocol. + """ -class UPath(PathlibPathShim, Path): __slots__ = ( "_protocol", "_storage_options", - "_fs_cached", - *PathlibPathShim.__missing_py312_slots__, - "__drv", - "__root", - "__parts", ) - if TYPE_CHECKING: - # public - anchor: str - drive: str - parent: Self - parents: Sequence[Self] - parts: tuple[str, ...] - root: str - stem: str - suffix: str - suffixes: list[str] - - def with_name(self, name: str) -> Self: ... - def with_stem(self, stem: str) -> Self: ... - def with_suffix(self, suffix: str) -> Self: ... - - # private attributes - _protocol: str - _storage_options: dict[str, Any] - _fs_cached: AbstractFileSystem - _tail: str + _protocol: str + _storage_options: dict[str, Any] + parser: FSSpecParser = FSSpecParserDescriptor() # type: ignore[assignment] + _supported_protocols: tuple[str, ...] = () - _protocol_dispatch: bool | None = None - _flavour = LazyFlavourDescriptor() + # === constructors ================================================ + + def __init__( + self, + path: UPathLike, + *paths: UPathLike, + protocol: str | None = None, + **storage_options: Any, + ) -> None: + # determine the protocol + parsed_protocol = get_upath_protocol( + path, + protocol=protocol, + storage_options=storage_options, + ) + # todo: + # support checking if there is a UPath subclass for the protocol + # and use its _transform_init_args method to parse the args + base_options = getattr(self, "_storage_options", {}) + _paths, protocol, storage_options = self._transform_init_args( + (path, *paths), + protocol or parsed_protocol, + {**base_options, **storage_options}, + ) + if self._protocol != protocol and protocol: + self._protocol = protocol + else: + self._protocol = parsed_protocol + + # check that UPath subclasses in args are compatible + # TODO: + # Future versions of UPath could verify that storage_options + # can be combined between UPath instances. Not sure if this + # is really necessary though. A warning might be enough... + if not compatible_protocol(self._protocol, *_paths): + raise ValueError("can't combine incompatible UPath protocols") + + # set up the base class attributes + super().__init__(*map(str, _paths)) + + # retrieve storage_options + # todo: + # support checking if there is a UPath subclass for the protocol + # and use its _parse_storage_options method to parse the storage_options + self._storage_options = self._parse_storage_options( + str(_paths[0]), self._protocol, storage_options + ) + + @classmethod + def _transform_init_args( + cls, + args: tuple[UPathLike, ...], + protocol: str, + storage_options: dict[str, Any], + ) -> tuple[tuple[UPathLike, ...], str, dict[str, Any]]: + """allow customization of init args in subclasses""" + return args, protocol, storage_options + + @classmethod + def _parse_storage_options( + cls, urlpath: str, protocol: str, storage_options: Mapping[str, Any] + ) -> dict[str, Any]: + """Parse storage_options from the urlpath""" + pth_storage_options = FSSpecParser.from_protocol(protocol).get_kwargs_from_url( + urlpath + ) + return {**pth_storage_options, **storage_options} + + # === PureUPath custom API ======================================== + + @property + def protocol(self) -> str: + """The fsspec protocol for the path.""" + return self._protocol + + @property + def storage_options(self) -> Mapping[str, Any]: + """The fsspec storage options for the path.""" + return MappingProxyType(self._storage_options) + + @property + def path(self) -> str: + """The path that a fsspec filesystem can use.""" + return self.parser.strip_protocol(super().__str__()) + + def joinuri(self, uri: UPathLike) -> PureUPath: + """Join with urljoin behavior for UPath instances""" + # short circuit if the new uri uses a different protocol + other_protocol = get_upath_protocol(uri) + if other_protocol and other_protocol != self._protocol: + return PureUPath(uri) + else: + return PureUPath( + upath_urijoin(str(self), str(uri)), + protocol=other_protocol or self._protocol, + **self.storage_options, + ) + + @property + def _url(self) -> SplitResult: + # TODO: + # _url should be deprecated, but for now there is no good way of + # accessing query parameters from urlpaths... + return urlsplit(self.as_posix()) + + # === extra methods for pathlib.PurePath like interface =========== - if sys.version_info >= (3, 13): - parser = _flavour + def __reduce__(self) -> tuple[ + Callable[..., Self], + tuple[type, tuple[str, ...], dict], + ]: + """support pickling UPath instances""" + args = (self._raw_path,) + kwargs = { + "protocol": self._protocol, + **self._storage_options, + } + return make_instance, (type(self), args, kwargs) + + def __hash__(self) -> int: + """The returned hash is based on the protocol and path only. + + Note: in the future, if hash collisions become an issue, we + can add `fsspec.utils.tokenize(storage_options)` + """ + return hash((self.protocol, self.path)) + + def __eq__(self, other: object) -> bool: + """PureUPaths are considered equal if their protocol, path and + storage_options are equal.""" + if not isinstance(other, PureUPath): + return NotImplemented + return ( + self.path == other.path + and self.protocol == other.protocol + and self.storage_options == other.storage_options + ) + + def __lt__(self, other: object) -> bool: + raise NotImplementedError("todo") # fixme + + def __le__(self, other: object) -> bool: + raise NotImplementedError("todo") # fixme + + def __gt__(self, other: object) -> bool: + raise NotImplementedError("todo") # fixme + + def __ge__(self, other: object) -> bool: + raise NotImplementedError("todo") # fixme + + def __repr__(self): + return f"{type(self).__name__}({str(self)!r}, protocol={self._protocol!r})" + + # === customized PurePathBase methods ============================= + + def with_segments(self, *pathsegments: UPathLike) -> Self: + return type(self)( + *pathsegments, + protocol=self._protocol, + **self._storage_options, + ) + + def __str__(self) -> str: + if self._protocol: + return f"{self._protocol}://{self.path}" + else: + return self.path + + @property + def _stack(self): + """ + Split the path into a 2-tuple (anchor, parts), where *anchor* is the + uppermost parent of the path (equivalent to path.parents[-1]), and + *parts* is a reversed list of parts following the anchor. + """ + split = self.parser.split + path = self.parser.strip_protocol(self._raw_path) + parent, name = split(path) + names = [] + while path != parent: + names.append(name) + path = parent + parent, name = split(path) + return path, names + + @property + def name(self): + """The final path component, if any.""" + remainder, stack = self._stack + return next(filter(None, (*stack, *[remainder])), "") + + def with_name(self, name): + """Return a new path with the file name changed.""" + if self.parser.sep in name: + raise ValueError(f"Invalid name {name!r}") + return self.with_segments(self.parser.split(self._raw_path)[0], name) + + +class UPath(PathBase, PureUPath): + """a concrete version of UPath with filesystem access""" + + __slots__ = ("_fs_cached",) + + _fs_cached: AbstractFileSystem + _protocol_dispatch: bool | None = None # === upath.UPath constructor ===================================== @@ -181,10 +338,15 @@ def __new__( upath_cls = cls # create a new instance - if cls is UPath: + if upath_cls.__new__ != UPath.__new__: + # if the upath_cls has a custom __new__ method, we need to + # call it directly to avoid recursion + obj: UPath = upath_cls(*args, protocol=pth_protocol, **storage_options) + + elif cls is UPath: # we called UPath() directly, and want an instance based on the # provided or detected protocol (i.e. upath_cls) - obj: UPath = object.__new__(upath_cls) + obj = object.__new__(upath_cls) obj._protocol = pth_protocol elif issubclass(cls, upath_cls): @@ -230,54 +392,22 @@ def __new__( def __init__( self, *args, protocol: str | None = None, **storage_options: Any ) -> None: - # allow subclasses to customize __init__ arg parsing - base_options = getattr(self, "_storage_options", {}) - args, protocol, storage_options = type(self)._transform_init_args( - args, protocol or self._protocol, {**base_options, **storage_options} - ) - if self._protocol != protocol and protocol: - self._protocol = protocol - - # retrieve storage_options - if args: - args0 = args[0] - if isinstance(args0, UPath): - self._storage_options = {**args0.storage_options, **storage_options} - else: - if hasattr(args0, "__fspath__"): - _args0 = args0.__fspath__() - else: - _args0 = str(args0) - self._storage_options = type(self)._parse_storage_options( - _args0, protocol, storage_options - ) - else: - self._storage_options = storage_options.copy() - - # check that UPath subclasses in args are compatible - # TODO: - # Future versions of UPath could verify that storage_options - # can be combined between UPath instances. Not sure if this - # is really necessary though. A warning might be enough... - if not compatible_protocol(self._protocol, *args): - raise ValueError("can't combine incompatible UPath protocols") - - # fill ._raw_paths - if hasattr(self, "_raw_paths"): - return - super().__init__(*args) + super().__init__(*args, protocol=protocol, **storage_options) # === upath.UPath PUBLIC ADDITIONAL API =========================== - @property - def protocol(self) -> str: - """The fsspec protocol for the path.""" - return self._protocol - - @property - def storage_options(self) -> Mapping[str, Any]: - """The fsspec storage options for the path.""" - return MappingProxyType(self._storage_options) + def joinuri(self, uri: UPathLike) -> UPath: + """Join with urljoin behavior for UPath instances""" + # short circuit if the new uri uses a different protocol + other_protocol = get_upath_protocol(uri) + if other_protocol and other_protocol != self._protocol: + return UPath(uri) + else: + return UPath( + upath_urijoin(str(self), str(uri)), + protocol=other_protocol or self._protocol, + **self.storage_options, + ) @property def fs(self) -> AbstractFileSystem: @@ -290,43 +420,8 @@ def fs(self) -> AbstractFileSystem: ) return fs - @property - def path(self) -> str: - """The path that a fsspec filesystem can use.""" - return super().__str__() - - def joinuri(self, uri: str | os.PathLike[str]) -> UPath: - """Join with urljoin behavior for UPath instances""" - # short circuit if the new uri uses a different protocol - other_protocol = get_upath_protocol(uri) - if other_protocol and other_protocol != self._protocol: - return UPath(uri) - return UPath( - upath_urijoin(str(self), str(uri)), - protocol=other_protocol or self._protocol, - **self.storage_options, - ) - # === upath.UPath CUSTOMIZABLE API ================================ - @classmethod - def _transform_init_args( - cls, - args: tuple[str | os.PathLike, ...], - protocol: str, - storage_options: dict[str, Any], - ) -> tuple[tuple[str | os.PathLike, ...], str, dict[str, Any]]: - """allow customization of init args in subclasses""" - return args, protocol, storage_options - - @classmethod - def _parse_storage_options( - cls, urlpath: str, protocol: str, storage_options: Mapping[str, Any] - ) -> dict[str, Any]: - """Parse storage_options from the urlpath""" - pth_storage_options = upath_get_kwargs_from_url(urlpath) - return {**pth_storage_options, **storage_options} - @classmethod def _fs_factory( cls, urlpath: str, protocol: str, storage_options: Mapping[str, Any] @@ -337,414 +432,12 @@ def _fs_factory( so_dct.update(storage_options) return fs_cls(**storage_options) - # === upath.UPath COMPATIBILITY API =============================== - - def __init_subclass__(cls, **kwargs): - """provide a clean migration path for custom user subclasses""" - - # Check if the user subclass has a custom `__new__` method - has_custom_new_method = ( - cls.__new__ is not UPath.__new__ - and cls.__name__ not in {"PosixUPath", "WindowsUPath"} - ) - - if has_custom_new_method and cls._protocol_dispatch is None: - warnings.warn( - "Detected a customized `__new__` method in subclass" - f" {cls.__name__!r}. Protocol dispatch will be disabled" - " for this subclass. Please follow the" - " universal_pathlib==0.2.0 migration guide at" - " https://github.com/fsspec/universal_pathlib for more" - " information.", - DeprecationWarning, - stacklevel=2, - ) - cls._protocol_dispatch = False - - # Check if the user subclass has defined a custom accessor class - accessor_cls = getattr(cls, "_default_accessor", None) - - has_custom_legacy_accessor = ( - accessor_cls is not None - and issubclass(accessor_cls, FSSpecAccessorShim) - and accessor_cls is not FSSpecAccessorShim - ) - has_customized_fs_instantiation = ( - accessor_cls.__init__ is not FSSpecAccessorShim.__init__ - or hasattr(accessor_cls, "_fs") - ) - - if has_custom_legacy_accessor and has_customized_fs_instantiation: - warnings.warn( - "Detected a customized `__init__` method or `_fs` attribute" - f" in the provided `_FSSpecAccessor` subclass of {cls.__name__!r}." - " It is recommended to instead override the `UPath._fs_factory`" - " classmethod to customize filesystem instantiation. Please follow" - " the universal_pathlib==0.2.0 migration guide at" - " https://github.com/fsspec/universal_pathlib for more" - " information.", - DeprecationWarning, - stacklevel=2, - ) - - def _fs_factory( - cls_, urlpath: str, protocol: str, storage_options: Mapping[str, Any] - ) -> AbstractFileSystem: - url = urlsplit(urlpath) - if protocol: - url = url._replace(scheme=protocol) - inst = cls_._default_accessor(url, **storage_options) - return inst._fs - - def _parse_storage_options( - cls_, urlpath: str, protocol: str, storage_options: Mapping[str, Any] - ) -> dict[str, Any]: - url = urlsplit(urlpath) - if protocol: - url = url._replace(scheme=protocol) - inst = cls_._default_accessor(url, **storage_options) - return inst._fs.storage_options - - cls._fs_factory = classmethod(_fs_factory) - cls._parse_storage_options = classmethod(_parse_storage_options) - - @property - def _path(self): - warnings.warn( - "UPath._path is deprecated and should not be used." - " Please follow the universal_pathlib==0.2.0 migration guide at" - " https://github.com/fsspec/universal_pathlib for more" - " information.", - DeprecationWarning, - stacklevel=2, - ) - return self.path - - @property - def _kwargs(self): - warnings.warn( - "UPath._kwargs is deprecated. Please use" - " UPath.storage_options instead. Follow the" - " universal_pathlib==0.2.0 migration guide at" - " https://github.com/fsspec/universal_pathlib for more" - " information.", - DeprecationWarning, - stacklevel=2, - ) - return self.storage_options - - @property - def _url(self) -> SplitResult: - # TODO: - # _url should be deprecated, but for now there is no good way of - # accessing query parameters from urlpaths... - return urlsplit(self.as_posix()) - - if not TYPE_CHECKING: - # allow mypy to catch missing attributes - - def __getattr__(self, item): - if item == "_accessor": - warnings.warn( - "UPath._accessor is deprecated. Please use" - " UPath.fs instead. Follow the" - " universal_pathlib==0.2.0 migration guide at" - " https://github.com/fsspec/universal_pathlib for more" - " information.", - DeprecationWarning, - stacklevel=2, - ) - if hasattr(self, "_default_accessor"): - accessor_cls = self._default_accessor - else: - accessor_cls = FSSpecAccessorShim - return accessor_cls.from_path(self) - else: - raise AttributeError(item) - - @classmethod - def _from_parts(cls, parts, **kwargs): - warnings.warn( - "UPath._from_parts is deprecated and should not be used." - " Please follow the universal_pathlib==0.2.0 migration guide at" - " https://github.com/fsspec/universal_pathlib for more" - " information.", - DeprecationWarning, - stacklevel=2, - ) - parsed_url = kwargs.pop("url", None) - if parsed_url: - if protocol := parsed_url.scheme: - kwargs["protocol"] = protocol - if netloc := parsed_url.netloc: - kwargs["netloc"] = netloc - obj = UPath.__new__(cls, parts, **kwargs) - obj.__init__(*parts, **kwargs) - return obj - - @classmethod - def _parse_args(cls, args): - warnings.warn( - "UPath._parse_args is deprecated and should not be used." - " Please follow the universal_pathlib==0.2.0 migration guide at" - " https://github.com/fsspec/universal_pathlib for more" - " information.", - DeprecationWarning, - stacklevel=2, - ) - # TODO !!! - pth = cls._flavour.join(*args) - return cls._parse_path(pth) - - @property - def _drv(self): - # direct access to ._drv should emit a warning, - # but there is no good way of doing this for now... - try: - return self.__drv - except AttributeError: - self._load_parts() - return self.__drv - - @_drv.setter - def _drv(self, value): - self.__drv = value - - @property - def _root(self): - # direct access to ._root should emit a warning, - # but there is no good way of doing this for now... - try: - return self.__root - except AttributeError: - self._load_parts() - return self.__root - - @_root.setter - def _root(self, value): - self.__root = value - - @property - def _parts(self): - # UPath._parts is not used anymore, and not available - # in pathlib.Path for Python 3.12 and later. - # Direct access to ._parts should emit a deprecation warning, - # but there is no good way of doing this for now... - try: - return self.__parts - except AttributeError: - self._load_parts() - self.__parts = super().parts - return list(self.__parts) - - @_parts.setter - def _parts(self, value): - self.__parts = value - - @property - def _cparts(self): - # required for pathlib.Path.__eq__ compatibility on Python <3.12 - return self.parts - - # === pathlib.PurePath ============================================ - - def __reduce__(self): - args = tuple(self._raw_paths) - kwargs = { - "protocol": self._protocol, - **self._storage_options, - } - return _make_instance, (type(self), args, kwargs) - - def with_segments(self, *pathsegments: str | os.PathLike[str]) -> Self: - return type(self)( - *pathsegments, - protocol=self._protocol, - **self._storage_options, - ) - - def joinpath(self, *pathsegments: str | os.PathLike[str]) -> Self: - return self.with_segments(self, *pathsegments) - - def __truediv__(self, key: str | os.PathLike[str]) -> Self: - try: - return self.joinpath(key) - except TypeError: - return NotImplemented - - def __rtruediv__(self, key: str | os.PathLike[str]) -> Self: - try: - return self.with_segments(key, self) - except TypeError: - return NotImplemented - - # === upath.UPath non-standard changes ============================ - - # NOTE: - # this is a classmethod on the parent class, but we need to - # override it here to make it possible to provide the _flavour - # with the correct protocol... - # pathlib 3.12 never calls this on the class. Only on the instance. - @method_and_classmethod - def _parse_path(self_or_cls, path): # noqa: B902 - if isinstance(self_or_cls, type): - warnings.warn( - "UPath._parse_path should not be used as a classmethod." - " Please file an issue on the universal_pathlib issue tracker" - " and describe your use case.", - DeprecationWarning, - stacklevel=2, - ) - flavour = self_or_cls._flavour - - if flavour.supports_empty_parts: - drv, root, rel = flavour.splitroot(path) - if not root: - parsed = [] - else: - parsed = list(map(sys.intern, rel.split(flavour.sep))) - if parsed[-1] == ".": - parsed[-1] = "" - parsed = [x for x in parsed if x != "."] - if not flavour.has_meaningful_trailing_slash and parsed[-1] == "": - parsed.pop() - return drv, root, parsed - if not path: - return "", "", [] - sep = flavour.sep - altsep = flavour.altsep - if altsep: - path = path.replace(altsep, sep) - drv, root, rel = flavour.splitroot(path) - if not root and drv.startswith(sep) and not drv.endswith(sep): - drv_parts = drv.split(sep) - if len(drv_parts) == 4 and drv_parts[2] not in "?.": - # e.g. //server/share - root = sep - elif len(drv_parts) == 6: - # e.g. //?/unc/server/share - root = sep - parsed = [sys.intern(str(x)) for x in rel.split(sep) if x and x != "."] - return drv, root, parsed - - @method_and_classmethod - def _format_parsed_parts(self_or_cls, drv, root, tail, **kwargs): # noqa: B902 - if isinstance(self_or_cls, type): - warnings.warn( - "UPath._format_parsed_path should not be used as a classmethod." - " Please file an issue on the universal_pathlib issue tracker" - " and describe your use case.", - DeprecationWarning, - stacklevel=2, - ) - flavour = self_or_cls._flavour - - if kwargs: - warnings.warn( - "UPath._format_parsed_parts should not be used with" - " additional kwargs. Please follow the" - " universal_pathlib==0.2.0 migration guide at" - " https://github.com/fsspec/universal_pathlib for more" - " information.", - DeprecationWarning, - stacklevel=2, - ) - if "url" in kwargs and tail[:1] == [f"{drv}{root}"]: - # This was called from code that expected py38-py311 behavior - # of _format_parsed_parts, which takes drv, root and parts - tail = tail[1:] - - if drv or root: - return drv + root + flavour.sep.join(tail) - elif tail and flavour.splitdrive(tail[0])[0]: - tail = ["."] + tail - return flavour.sep.join(tail) - # === upath.UPath changes ========================================= - def __str__(self): - if self._protocol: - return f"{self._protocol}://{self.path}" - else: - return self.path - - def __fspath__(self): - msg = ( - "in a future version of UPath this will be set to None" - " unless the filesystem is local (or caches locally)" - ) - warnings.warn(msg, PendingDeprecationWarning, stacklevel=2) - return str(self) - - def __bytes__(self): - msg = ( - "in a future version of UPath this will be set to None" - " unless the filesystem is local (or caches locally)" - ) - warnings.warn(msg, PendingDeprecationWarning, stacklevel=2) - return os.fsencode(self) - - def as_uri(self) -> str: - return str(self) - - def is_reserved(self) -> bool: - return False - - def __eq__(self, other: object) -> bool: - """UPaths are considered equal if their protocol, path and - storage_options are equal.""" - if not isinstance(other, UPath): - return NotImplemented - return ( - self.path == other.path - and self.protocol == other.protocol - and self.storage_options == other.storage_options - ) - - def __hash__(self) -> int: - """The returned hash is based on the protocol and path only. - - Note: in the future, if hash collisions become an issue, we - can add `fsspec.utils.tokenize(storage_options)` - """ - return hash((self.protocol, self.path)) - - def relative_to( # type: ignore[override] - self, - other, - /, - *_deprecated, - walk_up=False, - ) -> Self: - if isinstance(other, UPath) and self.storage_options != other.storage_options: - raise ValueError( - "paths have different storage_options:" - f" {self.storage_options!r} != {other.storage_options!r}" - ) - return super().relative_to(other, *_deprecated, walk_up=walk_up) - - def is_relative_to(self, other, /, *_deprecated) -> bool: # type: ignore[override] - if isinstance(other, UPath) and self.storage_options != other.storage_options: - return False - return super().is_relative_to(other, *_deprecated) - - @property - def name(self) -> str: - tail = self._tail - if not tail: - return "" - name = tail[-1] - if not name and len(tail) >= 2: - return tail[-2] - else: - return name - - # === pathlib.Path ================================================ - - def stat( # type: ignore[override] + def stat( self, *, - follow_symlinks=True, + follow_symlinks: bool = True, ) -> UPathStatResult: if not follow_symlinks: warnings.warn( @@ -755,16 +448,37 @@ def stat( # type: ignore[override] ) return UPathStatResult.from_info(self.fs.stat(self.path)) - def lstat(self) -> UPathStatResult: # type: ignore[override] + def lstat(self) -> UPathStatResult: return self.stat(follow_symlinks=False) - def exists(self, *, follow_symlinks=True) -> bool: + def exists(self, *, follow_symlinks: bool = True) -> bool: + if not follow_symlinks: + warnings.warn( + "UPath.stat(follow_symlinks=False): follow_symlinks=False is" + " currently ignored.", + UserWarning, + stacklevel=2, + ) return self.fs.exists(self.path) - def is_dir(self) -> bool: + def is_dir(self, *, follow_symlinks: bool = True) -> bool: + if not follow_symlinks: + warnings.warn( + "UPath.stat(follow_symlinks=False): follow_symlinks=False is" + " currently ignored.", + UserWarning, + stacklevel=2, + ) return self.fs.isdir(self.path) - def is_file(self) -> bool: + def is_file(self, *, follow_symlinks: bool = True) -> bool: + if not follow_symlinks: + warnings.warn( + "UPath.stat(follow_symlinks=False): follow_symlinks=False is" + " currently ignored.", + UserWarning, + stacklevel=2, + ) return self.fs.isfile(self.path) def is_mount(self) -> bool: @@ -794,7 +508,7 @@ def is_fifo(self) -> bool: def is_socket(self) -> bool: return False - def samefile(self, other_path) -> bool: + def samefile(self, other_path: UPathLike) -> bool: st = self.stat() if isinstance(other_path, UPath): other_st = other_path.stat() @@ -802,32 +516,44 @@ def samefile(self, other_path) -> bool: other_st = self.with_segments(other_path).stat() return st == other_st - @overload # type: ignore[override] + @overload def open( self, - mode: Literal["r", "w", "a"] = "r", - buffering: int = ..., - encoding: str = ..., - errors: str = ..., - newline: str = ..., - **fsspec_kwargs: Any, + mode: Literal["rb", "ab", "wb"], + buffering: int = -1, + encoding: str | None = None, + errors: str | None = None, + newline: str | None = None, + ) -> BinaryIO: ... + + @overload + def open( + self, + mode: Literal["r", "a", "w", "rt", "at", "wt"] = "r", + buffering: int = -1, + encoding: str | None = None, + errors: str | None = None, + newline: str | None = None, ) -> TextIO: ... @overload - def open( # type: ignore[override] + def open( self, - mode: Literal["rb", "wb", "ab"], - buffering: int = ..., - encoding: str = ..., - errors: str = ..., - newline: str = ..., + mode: str = "r", + buffering: int = -1, + encoding: str | None = None, + errors: str | None = None, + newline: str | None = None, **fsspec_kwargs: Any, - ) -> BinaryIO: ... + ) -> IO[Any]: ... def open( self, mode: str = "r", - *args: Any, + buffering: int = _UNSET, + encoding: str | None = _UNSET, + errors: str | None = _UNSET, + newline: str | None = _UNSET, **fsspec_kwargs: Any, ) -> IO[Any]: """ @@ -849,19 +575,50 @@ def open( **fsspec_kwargs: Additional options for the fsspec filesystem. """ - # match the signature of pathlib.Path.open() - for key, value in zip(["buffering", "encoding", "errors", "newline"], args): - if key in fsspec_kwargs: - raise TypeError( - f"{type(self).__name__}.open() got multiple values for '{key}'" - ) - fsspec_kwargs[key] = value # translate pathlib buffering to fs block_size - if "buffering" in fsspec_kwargs: - fsspec_kwargs.setdefault("block_size", fsspec_kwargs.pop("buffering")) + if buffering is not _UNSET: + fsspec_kwargs.setdefault("block_size", buffering) + for name, arg in zip( + ("encoding", "errors", "newline"), (encoding, errors, newline) + ): + if arg is not _UNSET: + warnings.warn( + f"UPath.open({name}=...) is currently ignored.", + UserWarning, + stacklevel=2, + ) return self.fs.open(self.path, mode=mode, **fsspec_kwargs) - def iterdir(self) -> Generator[UPath, None, None]: + def read_bytes( + self, + encoding: str | None = _UNSET, + errors: str | None = _UNSET, + newline: str | None = _UNSET, + ) -> bytes: + with self.open("rb", encoding=encoding, errors=errors, newline=newline) as f: + return f.read() + + def read_text( + self, + encoding: str | None = _UNSET, + errors: str | None = _UNSET, + newline: str | None = _UNSET, + ) -> str: + with self.open("r", encoding=encoding, errors=errors, newline=newline) as f: + return f.read() + + def write_text( + self, + data: str, + encoding: str | None = _UNSET, + errors: str | None = _UNSET, + newline: str | None = _UNSET, + ) -> int: + return super().write_text( + data, encoding=encoding, errors=errors, newline=newline + ) + + def iterdir(self) -> Generator[Self, None, None]: for name in self.fs.listdir(self.path): # fsspec returns dictionaries if isinstance(name, dict): @@ -870,75 +627,53 @@ def iterdir(self) -> Generator[UPath, None, None]: # Yielding a path object for these makes little sense continue # only want the path name with iterdir - _, _, name = str_remove_suffix(name, "/").rpartition(self._flavour.sep) - yield self.with_segments(*self.parts, name) - - def _scandir(self): - raise NotImplementedError # todo - - def _make_child_relpath(self, name): - path = super()._make_child_relpath(name) - del path._str # fix _str = str(self) assignment - return path + yield self.with_segments(name) def glob( - self, pattern: str, *, case_sensitive=None + self, + pattern: str, + *, + case_sensitive: bool | None = None, + recurse_symlinks: bool = _UNSET, ) -> Generator[UPath, None, None]: path_pattern = self.joinpath(pattern).path - sep = self._flavour.sep + sep = self.parser.sep base = self.fs._strip_protocol(self.path) for name in self.fs.glob(path_pattern): - name = str_remove_prefix(str_remove_prefix(name, base), sep) + name = name.removeprefix(base).removeprefix(sep) yield self.joinpath(name) def rglob( - self, pattern: str, *, case_sensitive=None + self, + pattern: str, + *, + case_sensitive: bool | None = None, + recurse_symlinks: bool = _UNSET, ) -> Generator[UPath, None, None]: - if _FSSPEC_HAS_WORKING_GLOB is None: - _check_fsspec_has_working_glob() - - if _FSSPEC_HAS_WORKING_GLOB: - r_path_pattern = self.joinpath("**", pattern).path - sep = self._flavour.sep - base = self.fs._strip_protocol(self.path) - for name in self.fs.glob(r_path_pattern): - name = str_remove_prefix(str_remove_prefix(name, base), sep) - yield self.joinpath(name) + path_pattern = self.joinpath(pattern).path + r_path_pattern = self.joinpath("**", pattern).path + sep = self.parser.sep + base = self.fs._strip_protocol(self.path) + seen = set() + for p in (path_pattern, r_path_pattern): + for name in self.fs.glob(p): + name = name.removeprefix(base).removeprefix(sep) + if name in seen: + continue + else: + seen.add(name) + yield self.joinpath(name) - else: - path_pattern = self.joinpath(pattern).path - r_path_pattern = self.joinpath("**", pattern).path - sep = self._flavour.sep - base = self.fs._strip_protocol(self.path) - seen = set() - for p in (path_pattern, r_path_pattern): - for name in self.fs.glob(p): - name = str_remove_prefix(str_remove_prefix(name, base), sep) - if name in seen: - continue - else: - seen.add(name) - yield self.joinpath(name) + def absolute(self) -> Self: + return self @classmethod def cwd(cls) -> UPath: - if cls is UPath: - return get_upath_class("").cwd() # type: ignore[union-attr] - else: - raise NotImplementedError + raise UnsupportedOperation(cls._unsupported_msg("cwd")) @classmethod def home(cls) -> UPath: - if cls is UPath: - return get_upath_class("").home() # type: ignore[union-attr] - else: - raise NotImplementedError - - def absolute(self) -> Self: - return self - - def is_absolute(self) -> bool: - return self._flavour.isabs(str(self)) + raise UnsupportedOperation(cls._unsupported_msg("home")) def resolve(self, strict: bool = False) -> Self: _parts = self.parts @@ -958,16 +693,11 @@ def resolve(self, strict: bool = False) -> Self: return self.with_segments(*_parts[:1], *resolved) - def owner(self) -> str: - raise NotImplementedError - - def group(self) -> str: - raise NotImplementedError - - def readlink(self) -> Self: - raise NotImplementedError - - def touch(self, mode=0o666, exist_ok=True) -> None: + def touch( + self, + mode: int = 0o666, + exist_ok: bool = True, + ) -> None: exists = self.fs.exists(self.path) if exists and not exist_ok: raise FileExistsError(str(self)) @@ -979,7 +709,12 @@ def touch(self, mode=0o666, exist_ok=True) -> None: except (NotImplementedError, ValueError): pass # unsupported by filesystem - def mkdir(self, mode=0o777, parents=False, exist_ok=False) -> None: + def mkdir( + self, + mode: int = 0o777, + parents: bool = False, + exist_ok: bool = False, + ) -> None: if parents and not exist_ok and self.exists(): raise FileExistsError(str(self)) try: @@ -994,32 +729,24 @@ def mkdir(self, mode=0o777, parents=False, exist_ok=False) -> None: if not self.is_dir(): raise FileExistsError(str(self)) - def chmod(self, mode: int, *, follow_symlinks: bool = True) -> None: - raise NotImplementedError - - def lchmod(self, mode: int) -> None: - raise NotImplementedError - - def unlink(self, missing_ok: bool = False) -> None: - if not self.exists(): - if not missing_ok: - raise FileNotFoundError(str(self)) - return - self.fs.rm(self.path, recursive=False) - - def rmdir(self, recursive: bool = True) -> None: # fixme: non-standard - if not self.is_dir(): - raise NotADirectoryError(str(self)) - if not recursive and next(self.iterdir()): # type: ignore[arg-type] - raise OSError(f"Not recursive and directory not empty: {self}") - self.fs.rm(self.path, recursive=recursive) - def rename( self, - target: str | os.PathLike[str] | UPath, + target: UPathLike, + *, # note: non-standard compared to pathlib + recursive: bool = _UNSET, + maxdepth: int | None = _UNSET, + **kwargs: Any, + ) -> Self: + if os.name == "nt" and self.with_segments(self, target).exists(): + raise FileExistsError(str(target)) + return self.replace(target, recursive=recursive, maxdepth=maxdepth, **kwargs) + + def replace( + self, + target: UPathLike, *, # note: non-standard compared to pathlib - recursive: bool = _unset, - maxdepth: int | None = _unset, + recursive: bool = _UNSET, + maxdepth: int | None = _UNSET, **kwargs: Any, ) -> Self: if isinstance(target, str) and self.storage_options: @@ -1042,11 +769,11 @@ def rename( # avoid calling .resolve for subclasses of UPath if ".." in parent.parts or "." in parent.parts: parent = parent.resolve() - target_ = parent.joinpath(os.path.normpath(target)) + target_ = parent.joinpath(os.path.normpath(str(target))) assert isinstance(target_, type(self)), "identical protocols enforced above" - if recursive is not _unset: + if recursive is not _UNSET: kwargs["recursive"] = recursive - if maxdepth is not _unset: + if maxdepth is not _UNSET: kwargs["maxdepth"] = maxdepth self.fs.mv( self.path, @@ -1055,21 +782,32 @@ def rename( ) return target_ - def replace(self, target: str | os.PathLike[str] | UPath) -> UPath: - raise NotImplementedError # todo + def unlink(self, missing_ok: bool = False) -> None: + if not self.exists(): + if not missing_ok: + raise FileNotFoundError(str(self)) + return + self.fs.rm(self.path, recursive=False) - def symlink_to( # type: ignore[override] - self, - target: str | os.PathLike[str] | UPath, - target_is_directory: bool = False, - ) -> None: - raise NotImplementedError + def rmdir(self, recursive: bool = True) -> None: # fixme: non-standard + if not self.is_dir(): + raise NotADirectoryError(str(self)) + if not recursive and next(self.iterdir()): # type: ignore[arg-type] + raise OSError(f"Not recursive and directory not empty: {self}") + self.fs.rm(self.path, recursive=recursive) - def hardlink_to( # type: ignore[override] - self, - target: str | os.PathLike[str] | UPath, - ) -> None: - raise NotImplementedError + def as_uri(self) -> str: + return str(self) + + def is_reserved(self) -> bool: + return False def expanduser(self) -> Self: return self + + # === compatibility methods ======================================= + + if sys.version_info < (3, 12): + + def link_to(self, target: str | Self) -> NoReturn: + raise UnsupportedOperation(self._unsupported_msg("link_to")) diff --git a/upath/implementations/cloud.py b/upath/implementations/cloud.py index 36f4029f..9322ec0d 100644 --- a/upath/implementations/cloud.py +++ b/upath/implementations/cloud.py @@ -3,8 +3,6 @@ import os from typing import Any -from upath._compat import FSSpecAccessorShim as _FSSpecAccessorShim -from upath._flavour import upath_strip_protocol from upath.core import UPath __all__ = [ @@ -15,10 +13,6 @@ ] -# accessors are deprecated -_CloudAccessor = _FSSpecAccessorShim - - class CloudPath(UPath): __slots__ = () @@ -35,7 +29,7 @@ def _transform_init_args( if str(args[0]).startswith("/"): args = (f"{protocol}://{bucket}{args[0]}", *args[1:]) else: - args0 = upath_strip_protocol(args[0]) + args0 = cls.parser.strip_protocol(args[0]) args = (f"{protocol}://{bucket}/", args0, *args[1:]) break return super()._transform_init_args(args, protocol, storage_options) @@ -63,6 +57,7 @@ def relative_to(self, other, /, *_deprecated, walk_up=False): class GCSPath(CloudPath): __slots__ = () + _supported_protocols = ("gcs", "gs") def __init__( self, *args, protocol: str | None = None, **storage_options: Any @@ -83,6 +78,7 @@ def mkdir( class S3Path(CloudPath): __slots__ = () + _supported_protocols = ("s3", "s3a") def __init__( self, *args, protocol: str | None = None, **storage_options: Any @@ -94,6 +90,7 @@ def __init__( class AzurePath(CloudPath): __slots__ = () + _supported_protocols = ("abfs", "abfss", "adl", "az") def __init__( self, *args, protocol: str | None = None, **storage_options: Any diff --git a/upath/implementations/hdfs.py b/upath/implementations/hdfs.py index 55e553c8..dce18a33 100644 --- a/upath/implementations/hdfs.py +++ b/upath/implementations/hdfs.py @@ -1,13 +1,9 @@ from __future__ import annotations -from upath._compat import FSSpecAccessorShim as _FSSpecAccessorShim from upath.core import UPath __all__ = ["HDFSPath"] -# accessors are deprecated -_HDFSAccessor = _FSSpecAccessorShim - class HDFSPath(UPath): __slots__ = () diff --git a/upath/implementations/http.py b/upath/implementations/http.py index 28b532f3..44275471 100644 --- a/upath/implementations/http.py +++ b/upath/implementations/http.py @@ -7,15 +7,11 @@ from fsspec.asyn import sync -from upath._compat import FSSpecAccessorShim as _FSSpecAccessorShim from upath._stat import UPathStatResult from upath.core import UPath __all__ = ["HTTPPath"] -# accessors are deprecated -_HTTPAccessor = _FSSpecAccessorShim - class HTTPPath(UPath): diff --git a/upath/implementations/local.py b/upath/implementations/local.py index a0961cea..4ecca0b3 100644 --- a/upath/implementations/local.py +++ b/upath/implementations/local.py @@ -1,19 +1,19 @@ from __future__ import annotations -import os import sys -from inspect import ismemberdescriptor -from pathlib import Path from pathlib import PosixPath from pathlib import WindowsPath -from typing import IO from typing import Any -from typing import Collection -from typing import MutableMapping from urllib.parse import SplitResult -from upath._protocol import compatible_protocol +if sys.version_info >= (3, 11): + from typing import Self +else: + from typing_extensions import Self + +from upath._uris import compatible_protocol from upath.core import UPath +from upath.core import UPathLike __all__ = [ "LocalPath", @@ -42,17 +42,6 @@ def _check_listdir_works_on_files() -> bool: class LocalPath(UPath): __slots__ = () - @property - def path(self): - sep = self._flavour.sep - if self.drive: - return f"/{super().path}".replace(sep, "/") - return super().path.replace(sep, "/") - - @property - def _url(self): - return SplitResult(self.protocol, "", self.path, "", "") - class FilePath(LocalPath): __slots__ = () @@ -64,185 +53,107 @@ def iterdir(self): raise NotADirectoryError(f"{self}") return super().iterdir() + @property + def path(self): + sep = self.parser.sep + if self.drive: + return f"/{super().path}".replace(sep, "/") + return super().path.replace(sep, "/") + + @property + def _url(self): + return SplitResult(self.protocol, "", self.path, "", "") -_pathlib_py312_ignore = { - "__slots__", - "__module__", - "__new__", - "__init__", - "_from_parts", - "_from_parsed_parts", - "with_segments", -} - - -def _set_class_attributes( - type_dict: MutableMapping[str, Any], - src: type[Path], - *, - ignore: Collection[str] = frozenset(_pathlib_py312_ignore), -) -> None: - """helper function to assign all methods/attrs from src to a class dict""" - visited = set() - for cls in src.__mro__: - if cls is object: - continue - for attr, func_or_value in cls.__dict__.items(): - if ismemberdescriptor(func_or_value): - continue - if attr in ignore or attr in visited: - continue - else: - visited.add(attr) - - type_dict[attr] = func_or_value - - -def _upath_init(inst: PosixUPath | WindowsUPath) -> None: - """helper to initialize the PosixPath/WindowsPath instance with UPath attrs""" - inst._protocol = "" - inst._storage_options = {} - if sys.version_info < (3, 10) and hasattr(inst, "_init"): - inst._init() - - -class PosixUPath(PosixPath, LocalPath): # type: ignore[misc] - __slots__ = () - # assign all PosixPath methods/attrs to prevent multi inheritance issues - _set_class_attributes(locals(), src=PosixPath) - - def open( # type: ignore[override] - self, - mode="r", - buffering=-1, - encoding=None, - errors=None, - newline=None, - **fsspec_kwargs, - ) -> IO[Any]: - if fsspec_kwargs: - return super(LocalPath, self).open( - mode=mode, - buffering=buffering, - encoding=encoding, - errors=errors, - newline=newline, - **fsspec_kwargs, - ) - else: - return PosixPath.open(self, mode, buffering, encoding, errors, newline) - - if sys.version_info < (3, 12): - - def __new__( - cls, *args, protocol: str | None = None, **storage_options: Any - ) -> PosixUPath: - if os.name == "nt": - raise NotImplementedError( - f"cannot instantiate {cls.__name__} on your system" - ) - if not compatible_protocol("", *args): - raise ValueError("can't combine incompatible UPath protocols") - obj = super().__new__(cls, *args) - obj._protocol = "" - return obj # type: ignore[return-value] +class PosixUPath(PosixPath): + __slots__ = ( + "_protocol", + "_storage_options", + "_fs_cached", + ) + + def __new__( + cls, + path: UPathLike, + *paths: UPathLike, + protocol: str | None = None, + **storage_options: Any, + ) -> Self: + if not compatible_protocol("", path, *paths): + raise ValueError("can't combine incompatible UPath protocols") + obj = super().__new__(cls, str(path), *map(str, paths)) + obj._protocol = "" + obj._storage_options = {} + return obj + + if sys.version_info >= (3, 12): def __init__( - self, *args, protocol: str | None = None, **storage_options: Any + self, + path: UPathLike, + *paths: UPathLike, + protocol: str | None = None, + **storage_options: Any, ) -> None: - super(Path, self).__init__() - self._drv, self._root, self._parts = type(self)._parse_args(args) - _upath_init(self) - - def _make_child(self, args): - if not compatible_protocol(self._protocol, *args): - raise ValueError("can't combine incompatible UPath protocols") - return super()._make_child(args) - - @classmethod - def _from_parts(cls, *args, **kwargs): - obj = super(Path, cls)._from_parts(*args, **kwargs) - _upath_init(obj) - return obj - - @classmethod - def _from_parsed_parts(cls, drv, root, parts): - obj = super(Path, cls)._from_parsed_parts(drv, root, parts) - _upath_init(obj) - return obj + super().__init__(str(path), *map(str, paths)) - @property - def path(self) -> str: - return PosixPath.__str__(self) + protocol = UPath.protocol + storage_options = UPath.storage_options + joinuri = UPath.joinuri + fs = UPath.fs + _fs_factory = UPath._fs_factory + _url = UPath._url - -class WindowsUPath(WindowsPath, LocalPath): # type: ignore[misc] - __slots__ = () - - # assign all WindowsPath methods/attrs to prevent multi inheritance issues - _set_class_attributes(locals(), src=WindowsPath) - - def open( # type: ignore[override] - self, - mode="r", - buffering=-1, - encoding=None, - errors=None, - newline=None, - **fsspec_kwargs, - ) -> IO[Any]: - if fsspec_kwargs: - return super(LocalPath, self).open( - mode=mode, - buffering=buffering, - encoding=encoding, - errors=errors, - newline=newline, - **fsspec_kwargs, - ) - else: - return WindowsPath.open(self, mode, buffering, encoding, errors, newline) - - if sys.version_info < (3, 12): - - def __new__( - cls, *args, protocol: str | None = None, **storage_options: Any - ) -> WindowsUPath: - if os.name != "nt": - raise NotImplementedError( - f"cannot instantiate {cls.__name__} on your system" - ) - if not compatible_protocol("", *args): - raise ValueError("can't combine incompatible UPath protocols") - obj = super().__new__(cls, *args) - obj._protocol = "" - return obj # type: ignore[return-value] + @property + def path(self) -> str: + return PosixPath.__str__(self) + + +class WindowsUPath(WindowsPath): + __slots__ = ( + "_protocol", + "_storage_options", + "_fs_cached", + ) + + def __new__( + cls, + path: UPathLike, + *paths: UPathLike, + protocol: str | None = None, + **storage_options: Any, + ) -> Self: + if not compatible_protocol("", path, *paths): + raise ValueError("can't combine incompatible UPath protocols") + obj = super().__new__(cls, str(path), *map(str, paths)) + obj._protocol = "" + obj._storage_options = {} + return obj + + if sys.version_info >= (3, 12): def __init__( - self, *args, protocol: str | None = None, **storage_options: Any + self, + path: UPathLike, + *paths: UPathLike, + protocol: str | None = None, + **storage_options: Any, ) -> None: - super(Path, self).__init__() - self._drv, self._root, self._parts = self._parse_args(args) - _upath_init(self) - - def _make_child(self, args): - if not compatible_protocol(self._protocol, *args): - raise ValueError("can't combine incompatible UPath protocols") - return super()._make_child(args) - - @classmethod - def _from_parts(cls, *args, **kwargs): - obj = super(Path, cls)._from_parts(*args, **kwargs) - _upath_init(obj) - return obj - - @classmethod - def _from_parsed_parts(cls, drv, root, parts): - obj = super(Path, cls)._from_parsed_parts(drv, root, parts) - _upath_init(obj) - return obj + super().__init__(str(path), *map(str, paths)) + + protocol = UPath.protocol + storage_options = UPath.storage_options + joinuri = UPath.joinuri + fs = UPath.fs + _fs_factory = UPath._fs_factory + _url = UPath._url @property def path(self) -> str: - return WindowsPath.as_posix(self) + return WindowsPath.__str__(self) + + +UPath.register(PosixUPath) # type: ignore[attr-defined] +UPath.register(WindowsUPath) # type: ignore[attr-defined] +LocalPath.register(PosixUPath) # type: ignore[attr-defined] +LocalPath.register(WindowsUPath) # type: ignore[attr-defined] diff --git a/upath/implementations/local.pyi b/upath/implementations/local.pyi new file mode 100644 index 00000000..067afa53 --- /dev/null +++ b/upath/implementations/local.pyi @@ -0,0 +1,13 @@ +from pathlib import PosixPath +from pathlib import WindowsPath + +from upath.core import UPath + +class LocalPath(UPath): ... +class FilePath(LocalPath): ... + +class PosixUPath(PosixPath, LocalPath): # type: ignore[misc] + ... + +class WindowsUPath(WindowsPath, LocalPath): # type: ignore[misc] + ... diff --git a/upath/implementations/memory.py b/upath/implementations/memory.py index 7169cd42..7fb16006 100644 --- a/upath/implementations/memory.py +++ b/upath/implementations/memory.py @@ -1,13 +1,9 @@ from __future__ import annotations -from upath._compat import FSSpecAccessorShim as _FSSpecAccessorShim from upath.core import UPath __all__ = ["MemoryPath"] -# accessors are deprecated -_MemoryAccessor = _FSSpecAccessorShim - class MemoryPath(UPath): def iterdir(self): diff --git a/upath/implementations/webdav.py b/upath/implementations/webdav.py index e299788c..a5cde51d 100644 --- a/upath/implementations/webdav.py +++ b/upath/implementations/webdav.py @@ -8,7 +8,6 @@ from fsspec.registry import known_implementations from fsspec.registry import register_implementation -from upath._compat import FSSpecAccessorShim as _FSSpecAccessorShim from upath._compat import str_remove_prefix from upath._compat import str_remove_suffix from upath.core import UPath @@ -24,10 +23,6 @@ register_implementation("webdav", webdav4.fsspec.WebdavFileSystem) -# accessors are deprecated -_WebdavAccessor = _FSSpecAccessorShim - - class WebdavPath(UPath): __slots__ = () diff --git a/upath/tests/cases.py b/upath/tests/cases.py index e8353ef5..1bdb7dd0 100644 --- a/upath/tests/cases.py +++ b/upath/tests/cases.py @@ -4,6 +4,7 @@ import stat import sys import warnings +from contextlib import nullcontext from pathlib import Path import pytest @@ -221,23 +222,6 @@ def test_mkdir_parents_true_exists_ok_false(self): with pytest.raises(FileExistsError): new_dir.mkdir(parents=True, exist_ok=False) - @pytest.mark.skip(reason="_accessor is unsupported in universal_pathlib>0.1.4") - def test_makedirs_exist_ok_true(self): - new_dir = self.path.joinpath("parent", "child", "dir_may_not_exist") - new_dir._accessor.makedirs(new_dir, exist_ok=True) - if not self.SUPPORTS_EMPTY_DIRS: - new_dir.joinpath(".file").touch() - new_dir._accessor.makedirs(new_dir, exist_ok=True) - - @pytest.mark.skip(reason="_accessor is unsupported in universal_pathlib>0.1.4") - def test_makedirs_exist_ok_false(self): - new_dir = self.path.joinpath("parent", "child", "dir_may_exist") - new_dir._accessor.makedirs(new_dir, exist_ok=False) - if not self.SUPPORTS_EMPTY_DIRS: - new_dir.joinpath(".file").touch() - with pytest.raises(FileExistsError): - new_dir._accessor.makedirs(new_dir, exist_ok=False) - def test_open(self): p = self.path.joinpath("file1.txt") with p.open(mode="r") as f: @@ -254,10 +238,17 @@ def test_open_block_size(self): with p.open(mode="r", block_size=8192) as f: assert f.read() == "hello world" + def test_open_encoding(self): + p = self.path.joinpath("file1.txt") + with pytest.warns(UserWarning, match=r"UPath.open\(encoding=.*"): + with p.open(mode="r", encoding="ascii") as f: + assert f.read() == "hello world" + def test_open_errors(self): p = self.path.joinpath("file1.txt") - with p.open(mode="r", encoding="ascii", errors="strict") as f: - assert f.read() == "hello world" + with pytest.warns(UserWarning, match=r"UPath.open\(errors=.*"): + with p.open(mode="r", errors="strict") as f: + assert f.read() == "hello world" def test_owner(self): with pytest.raises(NotImplementedError): @@ -305,8 +296,28 @@ def test_rename2(self): assert not moved.exists() assert back.exists() + def test_rename_target_exists(self): + upath = self.path.joinpath("file1.txt") + target = self.path.joinpath("file2.txt") + assert upath.exists() + assert target.exists() + if os.name == "nt": + cm = pytest.raises(FileExistsError) + else: + cm = nullcontext() + with cm: + upath.rename(target) + def test_replace(self): - pass + upath = self.path.joinpath("file1.txt") + content = upath.read_text() + target = self.path.joinpath("file2.txt") + assert upath.exists() + assert target.exists() + x = upath.replace(target) + assert x == target + assert not upath.exists() + assert content == target.read_text() def test_resolve(self): pass @@ -531,18 +542,11 @@ def test_read_with_fsspec(self): with fs.open(path) as f: assert f.read() == b"hello world" - @pytest.mark.xfail( - sys.version_info >= (3, 13), - reason="no support for private `._drv`, `._root`, `._parts` in 3.13", - ) def test_access_to_private_api(self): - # DO NOT access these private attributes in your code - p = UPath(str(self.path), **self.path.storage_options) - assert isinstance(p._drv, str) - p = UPath(str(self.path), **self.path.storage_options) - assert isinstance(p._root, str) p = UPath(str(self.path), **self.path.storage_options) - assert isinstance(p._parts, (list, tuple)) + assert not hasattr(p, "_drv") + assert not hasattr(p, "_root") + assert not hasattr(p, "_parts") def test_hashable(self): assert hash(self.path) diff --git a/upath/tests/implementations/test_data.py b/upath/tests/implementations/test_data.py index 9ada0687..eab5af2d 100644 --- a/upath/tests/implementations/test_data.py +++ b/upath/tests/implementations/test_data.py @@ -109,8 +109,15 @@ def test_open_block_size(self): def test_open_errors(self): p = UPath("data:text/plain;base64,aGVsbG8gd29ybGQ=") - with p.open(mode="r", encoding="ascii", errors="strict") as f: - assert f.read() == "hello world" + with pytest.warns(UserWarning, match=r"UPath.open\(errors=.*"): + with p.open(mode="r", errors="strict") as f: + assert f.read() == "hello world" + + def test_open_encoding(self): + p = UPath("data:text/plain;base64,aGVsbG8gd29ybGQ=") + with pytest.warns(UserWarning, match=r"UPath.open\(encoding=.*"): + with p.open(mode="r", encoding="ascii") as f: + assert f.read() == "hello world" def test_read_bytes(self, pathlib_base): assert len(self.path.read_bytes()) == 69 @@ -127,7 +134,16 @@ def test_rename(self): self.path.rename("newname") def test_rename2(self): - self.path.rename(self.path) + with pytest.raises(NotImplementedError): + self.path.rename(UPath("data:base64,SGVsbG8gV29ybGQ=")) + + def test_rename_target_exists(self): + with pytest.raises(NotImplementedError): + self.path.rename("newname") + + def test_replace(self): + with pytest.raises(NotImplementedError): + self.path.replace("newname") def test_rglob(self, pathlib_base): with pytest.raises(NotImplementedError): diff --git a/upath/tests/implementations/test_github.py b/upath/tests/implementations/test_github.py index 81db8121..13932354 100644 --- a/upath/tests/implementations/test_github.py +++ b/upath/tests/implementations/test_github.py @@ -54,6 +54,14 @@ def test_rename(self): def test_rename2(self): pass + @pytest.mark.skip(reason="GitHub filesystem is read-only") + def test_rename_target_exists(self): + pass + + @pytest.mark.skip(reason="GitHub filesystem is read-only") + def test_replace(self): + pass + @pytest.mark.skip(reason="GitHub filesystem is read-only") def test_touch_unlink(self): pass diff --git a/upath/tests/implementations/test_http.py b/upath/tests/implementations/test_http.py index 126eec5c..06dfcd0d 100644 --- a/upath/tests/implementations/test_http.py +++ b/upath/tests/implementations/test_http.py @@ -1,4 +1,5 @@ import pytest # noqa: F401 +from aiohttp import ClientTimeout from fsspec import __version__ as fsspec_version from fsspec import get_filesystem_class from packaging.version import Version @@ -18,14 +19,18 @@ def test_httppath(): - path = UPath("http://example.com") + path = UPath( + "http://example.com", client_kwargs={"timeout": ClientTimeout(total=2)} + ) assert isinstance(path, HTTPPath) assert path.exists() @xfail_if_no_ssl_connection def test_httpspath(): - path = UPath("https://example.com") + path = UPath( + "https://example.com", client_kwargs={"timeout": ClientTimeout(total=2)} + ) assert isinstance(path, HTTPPath) assert path.exists() @@ -34,7 +39,9 @@ def test_httpspath(): class TestUPathHttp(BaseTests): @pytest.fixture(autouse=True, scope="function") def path(self, http_fixture): - self.path = UPath(http_fixture) + self.path = UPath( + http_fixture, client_kwargs={"timeout": ClientTimeout(total=2)} + ) def test_work_at_root(self): assert "folder" in (f.name for f in self.path.parent.iterdir()) diff --git a/upath/tests/implementations/test_local.py b/upath/tests/implementations/test_local.py index e3f59d48..1e29a871 100644 --- a/upath/tests/implementations/test_local.py +++ b/upath/tests/implementations/test_local.py @@ -1,8 +1,15 @@ +from pathlib import Path +from pathlib import PosixPath + import pytest from upath import UPath from upath.implementations.local import LocalPath +from upath.implementations.local import PosixUPath +from upath.implementations.local import WindowsUPath from upath.tests.cases import BaseTests +from upath.tests.utils import only_on_windows +from upath.tests.utils import skip_on_windows from upath.tests.utils import xfail_if_version @@ -25,3 +32,23 @@ def path(self, local_testdir): def test_is_LocalPath(self): assert isinstance(self.path, LocalPath) + + +@skip_on_windows +def test_posix_upath(): + path = PosixUPath("/tmp") + assert isinstance(path, PosixUPath) + assert isinstance(path, LocalPath) + assert isinstance(path, UPath) + assert isinstance(path, PosixPath) + assert isinstance(path, Path) + + +@only_on_windows +def test_windows_upath(): + path = WindowsUPath("/tmp") + assert isinstance(path, WindowsUPath) + assert isinstance(path, LocalPath) + assert isinstance(path, UPath) + assert isinstance(path, PosixPath) + assert isinstance(path, Path) diff --git a/upath/tests/utils.py b/upath/tests/utils.py index 4158738b..17ca8f55 100644 --- a/upath/tests/utils.py +++ b/upath/tests/utils.py @@ -42,8 +42,12 @@ def xfail_if_no_ssl_connection(func): except ImportError: return pytest.mark.skip(reason="requests not installed")(func) try: - requests.get("https://example.com") - except (requests.exceptions.ConnectionError, requests.exceptions.SSLError): + requests.get("https://example.com", timeout=2) + except ( + requests.exceptions.ConnectionError, + requests.exceptions.SSLError, + requests.exceptions.Timeout, + ): return pytest.mark.xfail(reason="No SSL connection")(func) else: return func