Skip to content

Commit

Permalink
Implement KeySubstringAssetSelection (#25800)
Browse files Browse the repository at this point in the history
## Summary & Motivation
We want to support subset matching asset keys in the [new asset selection syntax](https://www.notion.so/dagster/New-asset-selection-syntax-12e18b92e4628027ae42ed52139b4b97?showMoveTo=true&saveParent=true). Thus, we need to define a new `KeySubstringAssetSelection ` class to support subset matching asset keys given a list of strings.

## How I Tested These Changes
```
pytest python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_selection.py::test_asset_selection_key_substring
```
  • Loading branch information
briantu authored Nov 14, 2024
1 parent 44aa9d9 commit 31cae09
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ def __getitem__(self, _):

CoercibleToAssetKey = Union[AssetKey, str, Sequence[str]]
CoercibleToAssetKeyPrefix = Union[str, Sequence[str]]
CoercibleToAssetKeySubset = Union[str, Sequence[str]]


def check_opt_coercible_to_asset_key_prefix_param(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,31 @@ def key_prefixes(
selected_key_prefixes=_asset_key_prefixes, include_sources=include_sources
)

@staticmethod
def key_substring(
key_substring: str, include_sources: bool = False
) -> "KeySubstringAssetSelection":
"""Returns a selection that includes assets whose string representation contains the provided substring and all the asset checks that target it.
Args:
include_sources (bool): If True, then include source assets matching the substring
in the selection.
Examples:
.. code-block:: python
# match any asset key containing "bc"
# e.g. AssetKey(["a", "bcd"]) would match, but not AssetKey(["ab", "cd"]).
AssetSelection.key_substring("bc")
# match any asset key containing "b/c"
# e.g. AssetKey(["ab", "cd"]) would match.
AssetSelection.key_substring("b/c")
"""
return KeySubstringAssetSelection(
selected_key_substring=key_substring, include_sources=include_sources
)

@public
@staticmethod
def groups(*group_strs, include_sources: bool = False) -> "GroupsAssetSelection":
Expand Down Expand Up @@ -1018,6 +1043,29 @@ def __str__(self) -> str:
return f"key_prefix:({' or '.join(key_prefix_strs)})"


@whitelist_for_serdes
@record
class KeySubstringAssetSelection(AssetSelection):
selected_key_substring: str
include_sources: bool

def resolve_inner(
self, asset_graph: BaseAssetGraph, allow_missing: bool
) -> AbstractSet[AssetKey]:
base_set = (
asset_graph.get_all_asset_keys()
if self.include_sources
else asset_graph.materializable_asset_keys
)
return {key for key in base_set if self.selected_key_substring in key.to_user_string()}

def to_serializable_asset_selection(self, asset_graph: BaseAssetGraph) -> "AssetSelection":
return self

def __str__(self) -> str:
return f"key_substring:{self.selected_key_substring}"


def _fetch_all_upstream(
selection: AbstractSet[AssetKey],
asset_graph: BaseAssetGraph,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,22 @@ def test_asset_selection_key_prefixes(all_assets: _AssetList):
assert sel.resolve(all_assets) == {earth.key}


def test_asset_selection_key_substring(all_assets: _AssetList):
sel = AssetSelection.key_substring("alice")
assert sel.resolve(all_assets) == _asset_keys_of({alice})

sel = AssetSelection.key_substring("ls/ze")
assert sel.resolve(all_assets) == _asset_keys_of({zebra})

# does not include source assets by default
sel = AssetSelection.key_substring("celestial")
assert sel.resolve(all_assets) == set()

# includes source assets if flag set
sel = AssetSelection.key_substring("celestial/e", include_sources=True)
assert sel.resolve(all_assets) == {earth.key}


def test_select_source_asset_keys():
a = SourceAsset("a")
selection = AssetSelection.keys(a.key)
Expand Down

0 comments on commit 31cae09

Please sign in to comment.