Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk asset editting #25865

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dagster import AssetsDefinition, multi_asset
from dagster import AssetsDefinition, AutomationCondition, multi_asset
from dagster._core.definitions.asset_spec import replace_asset_attributes

from .constants import DBT_UPSTREAMS
from .dbt_cloud_utils import (
Expand All @@ -8,7 +9,6 @@
get_project,
relevant_check_specs,
)
from .utils import eager


def get_dbt_cloud_assets() -> AssetsDefinition:
Expand All @@ -17,7 +17,9 @@ def get_dbt_cloud_assets() -> AssetsDefinition:
filtered_specs = filter_specs_by_tag(dbt_cloud_project.get_asset_specs(), EXPECTED_TAG)
specs_with_dbt_core_deps = add_deps(DBT_UPSTREAMS, filtered_specs)
# Dbt cloud assets will run every time the upstream dags run.
specs_with_eager_automation = eager(specs_with_dbt_core_deps)
specs_with_eager_automation = replace_asset_attributes(
specs_with_dbt_core_deps, automation_condition=AutomationCondition.eager()
)

@multi_asset(
specs=specs_with_eager_automation,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Sequence
from typing import Iterable

from dagster import AssetsDefinition, Definitions, SensorDefinition
from dagster._core.definitions.asset_spec import replace_asset_attributes
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
Expand All @@ -13,7 +14,7 @@
PASSWORD,
USERNAME,
)
from .utils import with_group
from .utils import all_assets_defs

airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
Expand All @@ -33,9 +34,6 @@ def get_other_team_airflow_sensor() -> SensorDefinition:
return next(iter(defs.sensors))


def get_other_team_airflow_assets() -> Sequence[AssetsDefinition]:
defs = get_federated_airflow_defs()
return [
with_group(assets_def, "upstream_team_airflow")
for assets_def in defs.get_repository_def().assets_defs_by_key.values()
]
def get_other_team_airflow_assets() -> Iterable[AssetsDefinition]:
assets_defs = all_assets_defs(get_federated_airflow_defs())
return replace_asset_attributes(assets=assets_defs, group_name="upstream_team_airflow")
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from typing import Sequence
from typing import Iterable, Sequence

from dagster import AssetsDefinition, Definitions, SensorDefinition
from dagster._core.definitions.asset_spec import replace_asset_attributes
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
build_defs_from_airflow_instance,
)

from .constants import LEGACY_FEDERATED_BASE_URL, LEGACY_FEDERATED_INSTANCE_NAME, PASSWORD, USERNAME
from .utils import with_group
from .utils import all_assets_defs

airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
Expand All @@ -22,15 +23,15 @@ def get_federated_airflow_defs() -> Definitions:
return build_defs_from_airflow_instance(airflow_instance=airflow_instance)


def federated_airflow_assets() -> Sequence[AssetsDefinition]:
return all_assets_defs(get_federated_airflow_defs())


def get_legacy_instance_airflow_sensor() -> SensorDefinition:
defs = get_federated_airflow_defs()
assert defs.sensors
return next(iter(defs.sensors))


def get_legacy_instance_airflow_assets() -> Sequence[AssetsDefinition]:
defs = get_federated_airflow_defs()
return [
with_group(assets_def, "legacy_airflow")
for assets_def in defs.get_repository_def().assets_defs_by_key.values()
]
def get_legacy_instance_airflow_assets() -> Iterable[AssetsDefinition]:
return replace_asset_attributes(federated_airflow_assets(), group_name="legacy_airflow")
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dagster import AssetExecutionContext
from dagster import AssetExecutionContext, AutomationCondition
from dagster._core.definitions.asset_spec import replace_asset_attributes
from dagster_dbt import (
DagsterDbtTranslator,
DagsterDbtTranslatorSettings,
Expand All @@ -8,7 +9,7 @@
)

from .constants import DBT_SOURCE_TO_DAG, dbt_manifest_path, dbt_project_path
from .utils import eager_asset, with_deps
from .utils import with_deps


@dbt_assets(
Expand All @@ -23,11 +24,18 @@ def jaffle_shop_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()


jaffle_shop_external_assets = [
spec._replace(code_version=None, skippable=False) for spec in jaffle_shop_assets.specs
]
# jaffle_shop_external_assets = [
# spec._replace(code_version=None, skippable=False) for spec in jaffle_shop_assets.specs
# ]

jaffle_shop_with_upstream = eager_asset(with_deps(DBT_SOURCE_TO_DAG, jaffle_shop_assets))
# jaffle_shop_with_upstream = eager_asset(with_deps(DBT_SOURCE_TO_DAG, jaffle_shop_assets))

jaffle_shop_with_upstream = replace_asset_attributes(
[with_deps(DBT_SOURCE_TO_DAG, jaffle_shop_assets)],
code_version=None,
skippable=False,
automation_condition=AutomationCondition.eager(),
)


def jaffle_shop_resource() -> DbtCliResource:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,46 +1,15 @@
from typing import Mapping, Sequence

from dagster import AssetsDefinition, AssetSpec, AutomationCondition, Definitions, Nothing
from dagster import AssetsDefinition, Definitions, Nothing
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.decorators.decorator_assets_definition_builder import (
stringify_asset_key_to_input_name,
)
from dagster._core.definitions.input import In


def eager_asset(assets_def: AssetsDefinition) -> AssetsDefinition:
return assets_def.map_asset_specs(
lambda spec: spec._replace(automation_condition=AutomationCondition.eager())
if spec.automation_condition is None
else spec
)


def apply_eager_automation(defs: Definitions) -> Definitions:
assets = []
for asset in defs.assets or []:
if not isinstance(asset, AssetsDefinition):
continue
if not asset.keys:
continue
assets.append(
asset.map_asset_specs(
lambda spec: spec._replace(automation_condition=AutomationCondition.eager())
if spec.automation_condition is None
else spec
)
)
return Definitions(
assets=assets,
asset_checks=defs.asset_checks,
sensors=defs.sensors,
schedules=defs.schedules,
resources=defs.resources,
)


def with_group(assets_def: AssetsDefinition, group_name: str) -> AssetsDefinition:
return assets_def.map_asset_specs(lambda spec: spec._replace(group_name=group_name))
def all_assets_defs(defs: Definitions) -> Sequence[AssetsDefinition]:
return list(defs.get_repository_def().assets_defs_by_key.values())


def with_deps(
Expand Down Expand Up @@ -90,7 +59,3 @@ def with_deps(
execution_type=assets_def.execution_type,
auto_materialize_policies_by_key=assets_def.auto_materialize_policies_by_key,
)


def eager(specs: Sequence[AssetSpec]) -> Sequence[AssetSpec]:
return [spec._replace(automation_condition=AutomationCondition.eager()) for spec in specs]
72 changes: 72 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/asset_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Optional,
Sequence,
Set,
Union,
)

import dagster._check as check
Expand All @@ -20,6 +21,7 @@
only_allow_hidden_params_in_kwargs,
public,
)
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
Expand Down Expand Up @@ -294,6 +296,76 @@ def with_io_manager_key(self, io_manager_key: str) -> "AssetSpec":
)


def to_assets_defs(
assets: Iterable[Union[AssetSpec, AssetsDefinition]],
) -> Iterable[AssetsDefinition]:
for asset in assets:
if isinstance(asset, AssetSpec):
yield AssetsDefinition(specs=[asset])
else:
yield asset


def replace_asset_attributes(
assets: Iterable[Union[AssetSpec, AssetsDefinition]],
*,
key: CoercibleToAssetKey = ...,
deps: Optional[Iterable["CoercibleToAssetDep"]] = ...,
description: Optional[str] = ...,
metadata: Optional[Mapping[str, Any]] = ...,
skippable: bool = ...,
group_name: Optional[str] = ...,
code_version: Optional[str] = ...,
freshness_policy: Optional[FreshnessPolicy] = ...,
automation_condition: Optional[AutomationCondition] = ...,
owners: Optional[Sequence[str]] = ...,
tags: Optional[Mapping[str, str]] = ...,
kinds: Optional[Set[str]] = ...,
partitions_def: Optional[PartitionsDefinition] = ...,
) -> Iterable[AssetsDefinition]:
for asset in assets:
if isinstance(asset, AssetSpec):
yield AssetsDefinition(
specs=[
replace_attributes(
asset,
key=key,
deps=deps,
description=description,
metadata=metadata,
skippable=skippable,
group_name=group_name,
code_version=code_version,
freshness_policy=freshness_policy,
automation_condition=automation_condition,
owners=owners,
tags=tags,
kinds=kinds,
partitions_def=partitions_def,
)
]
)
else:
yield asset.map_asset_specs(
lambda spec: replace_attributes(
spec,
key=key,
deps=deps,
description=description,
metadata=metadata,
skippable=skippable,
group_name=group_name,
code_version=code_version,
freshness_policy=freshness_policy,
automation_condition=automation_condition,
owners=owners,
tags=tags,
kinds=kinds,
partitions_def=partitions_def,
)
)


def replace_attributes(
spec: AssetSpec,
*,
Expand Down