From 8196ec1346406ffee83e6b1b10a21b03cd3ffc47 Mon Sep 17 00:00:00 2001 From: Chris DeCarolis Date: Mon, 11 Nov 2024 11:53:46 -0800 Subject: [PATCH] [dagster-airlift] Load airflow dag asset specs --- .../dagster_airlift/core/load_defs.py | 13 +++++++++- .../unit_tests/core_tests/test_load_defs.py | 25 +++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py b/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py index a9e3e03630e7a..be9dd6791c519 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py @@ -31,7 +31,7 @@ DagInfo, SerializedAirflowDefinitionsData, ) -from dagster_airlift.core.utils import get_metadata_key +from dagster_airlift.core.utils import get_metadata_key, spec_iterator @dataclass @@ -339,3 +339,14 @@ def enrich_airflow_mapped_assets( airflow_instance=airflow_instance, mapped_assets=mapped_assets ).get_or_fetch_state() return list(_apply_airflow_data_to_specs(mapped_assets, serialized_data)) + + +def load_airflow_dag_asset_specs( + airflow_instance: AirflowInstance, + mapped_assets: Optional[Iterable[Union[AssetSpec, AssetsDefinition]]] = None, +) -> Sequence[AssetSpec]: + """Load asset specs for Airflow DAGs from the provided :py:class:`AirflowInstance`, and link upstreams from mapped assets.""" + serialized_data = AirflowInstanceDefsLoader( + airflow_instance=airflow_instance, mapped_assets=mapped_assets or [] + ).get_or_fetch_state() + return list(spec_iterator(construct_dag_assets_defs(serialized_data))) diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_load_defs.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_load_defs.py index 943ee53e5b8f8..7223c2e741614 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_load_defs.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_load_defs.py @@ -34,6 +34,7 @@ from dagster_airlift.core.load_defs import ( build_full_automapped_dags_from_airflow_instance, enrich_airflow_mapped_assets, + load_airflow_dag_asset_specs, ) from dagster_airlift.core.multiple_tasks import assets_with_multiple_task_mappings from dagster_airlift.core.serialization.compute import ( @@ -779,3 +780,27 @@ def test_enrich() -> None: # Asset metadata properties have been glommed onto the asset spec = next(iter(assets_def.specs)) assert spec.metadata["Dag ID"] == "dag" + + +def test_load_dags() -> None: + dag_assets = load_airflow_dag_asset_specs( + airflow_instance=make_instance({"dag": ["task"]}), + ) + assert len(dag_assets) == 1 + dag_asset = next(iter(dag_assets)) + assert dag_asset.key == make_default_dag_asset_key("test_instance", "dag") + + +def test_load_dags_upstream() -> None: + upstream_task_spec = AssetSpec( + key="a", metadata=metadata_for_task_mapping(task_id="task", dag_id="dag") + ) + dag_assets = load_airflow_dag_asset_specs( + airflow_instance=make_instance({"dag": ["task"]}), + mapped_assets=[upstream_task_spec], + ) + assert len(dag_assets) == 1 + dag_asset_spec = next(iter(dag_assets)) + assert dag_asset_spec.key == make_default_dag_asset_key("test_instance", "dag") + assert len(list(dag_asset_spec.deps)) == 1 + assert next(iter(dag_asset_spec.deps)).asset_key == AssetKey("a")