Skip to content

Commit

Permalink
[dagster-airlift] Load airflow dag asset specs
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Nov 13, 2024
1 parent 4224bcf commit 60edc68
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
DagInfo,
SerializedAirflowDefinitionsData,
)
from dagster_airlift.core.utils import get_metadata_key
from dagster_airlift.core.utils import get_metadata_key, spec_iterator


@dataclass
Expand Down Expand Up @@ -339,3 +339,14 @@ def enrich_airflow_mapped_assets(
airflow_instance=airflow_instance, mapped_assets=mapped_assets
).get_or_fetch_state()
return list(_apply_airflow_data_to_specs(mapped_assets, serialized_data))


def load_airflow_dag_asset_specs(
airflow_instance: AirflowInstance,
mapped_assets: Optional[Iterable[Union[AssetSpec, AssetsDefinition]]] = None,
) -> Sequence[AssetSpec]:
"""Load asset specs for Airflow DAGs from the provided :py:class:`AirflowInstance`, and link upstreams from mapped assets."""
serialized_data = AirflowInstanceDefsLoader(
airflow_instance=airflow_instance, mapped_assets=mapped_assets or []
).get_or_fetch_state()
return list(spec_iterator(construct_dag_assets_defs(serialized_data)))
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from dagster_airlift.core.load_defs import (
build_full_automapped_dags_from_airflow_instance,
enrich_airflow_mapped_assets,
load_airflow_dag_asset_specs,
)
from dagster_airlift.core.multiple_tasks import assets_with_multiple_task_mappings
from dagster_airlift.core.serialization.compute import (
Expand Down Expand Up @@ -779,3 +780,27 @@ def test_enrich() -> None:
# Asset metadata properties have been glommed onto the asset
spec = next(iter(assets_def.specs))
assert spec.metadata["Dag ID"] == "dag"


def test_load_dags() -> None:
dag_assets = load_airflow_dag_asset_specs(
airflow_instance=make_instance({"dag": ["task"]}),
)
assert len(dag_assets) == 1
dag_asset = next(iter(dag_assets))
assert dag_asset.key == make_default_dag_asset_key("test_instance", "dag")


def test_load_dags_upstream() -> None:
upstream_task_spec = AssetSpec(
key="a", metadata=metadata_for_task_mapping(task_id="task", dag_id="dag")
)
dag_assets = load_airflow_dag_asset_specs(
airflow_instance=make_instance({"dag": ["task"]}),
mapped_assets=[upstream_task_spec],
)
assert len(dag_assets) == 1
dag_asset_spec = next(iter(dag_assets))
assert dag_asset_spec.key == make_default_dag_asset_key("test_instance", "dag")
assert len(list(dag_asset_spec.deps)) == 1
assert next(iter(dag_asset_spec.deps)).asset_key == AssetKey("a")

0 comments on commit 60edc68

Please sign in to comment.