From dce37a236af82b6cc95272649212dbd5971424e5 Mon Sep 17 00:00:00 2001 From: Nick Schrock Date: Tue, 12 Nov 2024 06:06:07 -0500 Subject: [PATCH] Bulk asset editting --- .../dagster_defs/dbt_cloud_assets.py | 8 ++- .../dagster_defs/federated_airflow_1.py | 14 ++-- .../dagster_defs/federated_airflow_2.py | 17 ++--- .../dbt_example/dagster_defs/jaffle_shop.py | 20 ++++-- .../dbt_example/dagster_defs/utils.py | 41 +---------- .../dagster/_core/definitions/asset_spec.py | 72 +++++++++++++++++++ 6 files changed, 109 insertions(+), 63 deletions(-) diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/dbt_cloud_assets.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/dbt_cloud_assets.py index ec513acd72b6f..c1339824433eb 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/dbt_cloud_assets.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/dbt_cloud_assets.py @@ -1,4 +1,5 @@ -from dagster import AssetsDefinition, multi_asset +from dagster import AssetsDefinition, AutomationCondition, multi_asset +from dagster._core.definitions.asset_spec import replace_asset_attributes from .constants import DBT_UPSTREAMS from .dbt_cloud_utils import ( @@ -8,7 +9,6 @@ get_project, relevant_check_specs, ) -from .utils import eager def get_dbt_cloud_assets() -> AssetsDefinition: @@ -17,7 +17,9 @@ def get_dbt_cloud_assets() -> AssetsDefinition: filtered_specs = filter_specs_by_tag(dbt_cloud_project.get_asset_specs(), EXPECTED_TAG) specs_with_dbt_core_deps = add_deps(DBT_UPSTREAMS, filtered_specs) # Dbt cloud assets will run every time the upstream dags run. - specs_with_eager_automation = eager(specs_with_dbt_core_deps) + specs_with_eager_automation = replace_asset_attributes( + specs_with_dbt_core_deps, automation_condition=AutomationCondition.eager() + ) @multi_asset( specs=specs_with_eager_automation, diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/federated_airflow_1.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/federated_airflow_1.py index 03316a77a9f6a..464042feb099d 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/federated_airflow_1.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/federated_airflow_1.py @@ -1,6 +1,7 @@ -from typing import Sequence +from typing import Iterable from dagster import AssetsDefinition, Definitions, SensorDefinition +from dagster._core.definitions.asset_spec import replace_asset_attributes from dagster_airlift.core import ( AirflowBasicAuthBackend, AirflowInstance, @@ -13,7 +14,7 @@ PASSWORD, USERNAME, ) -from .utils import with_group +from .utils import all_assets_defs airflow_instance = AirflowInstance( auth_backend=AirflowBasicAuthBackend( @@ -33,9 +34,6 @@ def get_other_team_airflow_sensor() -> SensorDefinition: return next(iter(defs.sensors)) -def get_other_team_airflow_assets() -> Sequence[AssetsDefinition]: - defs = get_federated_airflow_defs() - return [ - with_group(assets_def, "upstream_team_airflow") - for assets_def in defs.get_repository_def().assets_defs_by_key.values() - ] +def get_other_team_airflow_assets() -> Iterable[AssetsDefinition]: + assets_defs = all_assets_defs(get_federated_airflow_defs()) + return replace_asset_attributes(assets=assets_defs, group_name="upstream_team_airflow") diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/federated_airflow_2.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/federated_airflow_2.py index 3a4ba06408c5d..80dc1440652e1 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/federated_airflow_2.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/federated_airflow_2.py @@ -1,6 +1,7 @@ -from typing import Sequence +from typing import Iterable, Sequence from dagster import AssetsDefinition, Definitions, SensorDefinition +from dagster._core.definitions.asset_spec import replace_asset_attributes from dagster_airlift.core import ( AirflowBasicAuthBackend, AirflowInstance, @@ -8,7 +9,7 @@ ) from .constants import LEGACY_FEDERATED_BASE_URL, LEGACY_FEDERATED_INSTANCE_NAME, PASSWORD, USERNAME -from .utils import with_group +from .utils import all_assets_defs airflow_instance = AirflowInstance( auth_backend=AirflowBasicAuthBackend( @@ -22,15 +23,15 @@ def get_federated_airflow_defs() -> Definitions: return build_defs_from_airflow_instance(airflow_instance=airflow_instance) +def federated_airflow_assets() -> Sequence[AssetsDefinition]: + return all_assets_defs(get_federated_airflow_defs()) + + def get_legacy_instance_airflow_sensor() -> SensorDefinition: defs = get_federated_airflow_defs() assert defs.sensors return next(iter(defs.sensors)) -def get_legacy_instance_airflow_assets() -> Sequence[AssetsDefinition]: - defs = get_federated_airflow_defs() - return [ - with_group(assets_def, "legacy_airflow") - for assets_def in defs.get_repository_def().assets_defs_by_key.values() - ] +def get_legacy_instance_airflow_assets() -> Iterable[AssetsDefinition]: + return replace_asset_attributes(federated_airflow_assets(), group_name="legacy_airflow") diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/jaffle_shop.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/jaffle_shop.py index 479a43cc0c7aa..d288208001e81 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/jaffle_shop.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/jaffle_shop.py @@ -1,4 +1,5 @@ -from dagster import AssetExecutionContext +from dagster import AssetExecutionContext, AutomationCondition +from dagster._core.definitions.asset_spec import replace_asset_attributes from dagster_dbt import ( DagsterDbtTranslator, DagsterDbtTranslatorSettings, @@ -8,7 +9,7 @@ ) from .constants import DBT_SOURCE_TO_DAG, dbt_manifest_path, dbt_project_path -from .utils import eager_asset, with_deps +from .utils import with_deps @dbt_assets( @@ -23,11 +24,18 @@ def jaffle_shop_assets(context: AssetExecutionContext, dbt: DbtCliResource): yield from dbt.cli(["build"], context=context).stream() -jaffle_shop_external_assets = [ - spec._replace(code_version=None, skippable=False) for spec in jaffle_shop_assets.specs -] +# jaffle_shop_external_assets = [ +# spec._replace(code_version=None, skippable=False) for spec in jaffle_shop_assets.specs +# ] -jaffle_shop_with_upstream = eager_asset(with_deps(DBT_SOURCE_TO_DAG, jaffle_shop_assets)) +# jaffle_shop_with_upstream = eager_asset(with_deps(DBT_SOURCE_TO_DAG, jaffle_shop_assets)) + +jaffle_shop_with_upstream = replace_asset_attributes( + [with_deps(DBT_SOURCE_TO_DAG, jaffle_shop_assets)], + code_version=None, + skippable=False, + automation_condition=AutomationCondition.eager(), +) def jaffle_shop_resource() -> DbtCliResource: diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/utils.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/utils.py index 00c5f51fee1c9..6a6bfdc970a7e 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/utils.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/utils.py @@ -1,6 +1,6 @@ from typing import Mapping, Sequence -from dagster import AssetsDefinition, AssetSpec, AutomationCondition, Definitions, Nothing +from dagster import AssetsDefinition, Definitions, Nothing from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.decorators.decorator_assets_definition_builder import ( stringify_asset_key_to_input_name, @@ -8,39 +8,8 @@ from dagster._core.definitions.input import In -def eager_asset(assets_def: AssetsDefinition) -> AssetsDefinition: - return assets_def.map_asset_specs( - lambda spec: spec._replace(automation_condition=AutomationCondition.eager()) - if spec.automation_condition is None - else spec - ) - - -def apply_eager_automation(defs: Definitions) -> Definitions: - assets = [] - for asset in defs.assets or []: - if not isinstance(asset, AssetsDefinition): - continue - if not asset.keys: - continue - assets.append( - asset.map_asset_specs( - lambda spec: spec._replace(automation_condition=AutomationCondition.eager()) - if spec.automation_condition is None - else spec - ) - ) - return Definitions( - assets=assets, - asset_checks=defs.asset_checks, - sensors=defs.sensors, - schedules=defs.schedules, - resources=defs.resources, - ) - - -def with_group(assets_def: AssetsDefinition, group_name: str) -> AssetsDefinition: - return assets_def.map_asset_specs(lambda spec: spec._replace(group_name=group_name)) +def all_assets_defs(defs: Definitions) -> Sequence[AssetsDefinition]: + return list(defs.get_repository_def().assets_defs_by_key.values()) def with_deps( @@ -90,7 +59,3 @@ def with_deps( execution_type=assets_def.execution_type, auto_materialize_policies_by_key=assets_def.auto_materialize_policies_by_key, ) - - -def eager(specs: Sequence[AssetSpec]) -> Sequence[AssetSpec]: - return [spec._replace(automation_condition=AutomationCondition.eager()) for spec in specs] diff --git a/python_modules/dagster/dagster/_core/definitions/asset_spec.py b/python_modules/dagster/dagster/_core/definitions/asset_spec.py index 7b6bde4f1ac48..4610954e3649d 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_spec.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_spec.py @@ -10,6 +10,7 @@ Optional, Sequence, Set, + Union, ) import dagster._check as check @@ -20,6 +21,7 @@ only_allow_hidden_params_in_kwargs, public, ) +from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.declarative_automation.automation_condition import ( AutomationCondition, @@ -294,6 +296,76 @@ def with_io_manager_key(self, io_manager_key: str) -> "AssetSpec": ) +def to_assets_defs( + assets: Iterable[Union[AssetSpec, AssetsDefinition]], +) -> Iterable[AssetsDefinition]: + for asset in assets: + if isinstance(asset, AssetSpec): + yield AssetsDefinition(specs=[asset]) + else: + yield asset + + +def replace_asset_attributes( + assets: Iterable[Union[AssetSpec, AssetsDefinition]], + *, + key: CoercibleToAssetKey = ..., + deps: Optional[Iterable["CoercibleToAssetDep"]] = ..., + description: Optional[str] = ..., + metadata: Optional[Mapping[str, Any]] = ..., + skippable: bool = ..., + group_name: Optional[str] = ..., + code_version: Optional[str] = ..., + freshness_policy: Optional[FreshnessPolicy] = ..., + automation_condition: Optional[AutomationCondition] = ..., + owners: Optional[Sequence[str]] = ..., + tags: Optional[Mapping[str, str]] = ..., + kinds: Optional[Set[str]] = ..., + partitions_def: Optional[PartitionsDefinition] = ..., +) -> Iterable[AssetsDefinition]: + for asset in assets: + if isinstance(asset, AssetSpec): + yield AssetsDefinition( + specs=[ + replace_attributes( + asset, + key=key, + deps=deps, + description=description, + metadata=metadata, + skippable=skippable, + group_name=group_name, + code_version=code_version, + freshness_policy=freshness_policy, + automation_condition=automation_condition, + owners=owners, + tags=tags, + kinds=kinds, + partitions_def=partitions_def, + ) + ] + ) + else: + yield asset.map_asset_specs( + lambda spec: replace_attributes( + spec, + key=key, + deps=deps, + description=description, + metadata=metadata, + skippable=skippable, + group_name=group_name, + code_version=code_version, + freshness_policy=freshness_policy, + automation_condition=automation_condition, + owners=owners, + tags=tags, + kinds=kinds, + partitions_def=partitions_def, + ) + ) + + def replace_attributes( spec: AssetSpec, *,