From 9e1b2e69c9ef737abb9828e4225a7ec0956b181f Mon Sep 17 00:00:00 2001 From: Chris DeCarolis Date: Mon, 11 Nov 2024 15:56:22 -0800 Subject: [PATCH] [dagster-airlift] Add initial stage to add specs to federation tutorial --- examples/airlift-federation-tutorial/Makefile | 2 +- .../dagster_defs/stages/__init__.py | 0 .../dagster_defs/stages/with_specs.py | 61 +++++++++++++++++++ .../conftest.py | 22 +++++++ .../test_specs_stage.py | 43 +++++++++++++ .../dagster_airlift/core/__init__.py | 1 + .../dagster_airlift/core/load_defs.py | 5 +- 7 files changed, 132 insertions(+), 2 deletions(-) create mode 100644 examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/__init__.py create mode 100644 examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/with_specs.py create mode 100644 examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_specs_stage.py diff --git a/examples/airlift-federation-tutorial/Makefile b/examples/airlift-federation-tutorial/Makefile index 9aa195a2e49db..a75c1001dab7c 100644 --- a/examples/airlift-federation-tutorial/Makefile +++ b/examples/airlift-federation-tutorial/Makefile @@ -35,7 +35,7 @@ airflow_setup: wipe mkdir -p $$DAGSTER_HOME chmod +x $(MAKEFILE_DIR)/scripts/airflow_setup.sh $(MAKEFILE_DIR)/scripts/airflow_setup.sh $(MAKEFILE_DIR)/airlift_federation_tutorial/upstream_airflow_dags $(UPSTREAM_AIRFLOW_HOME) 8081 - $(MAKEFILE_DIR)/scripts/airflow_setup.sh $(MAKEFILE_DIR)/airlift_federation_tutorial/upstream_airflow_dags $(DOWNSTREAM_AIRFLOW_HOME) 8082 + $(MAKEFILE_DIR)/scripts/airflow_setup.sh $(MAKEFILE_DIR)/airlift_federation_tutorial/downstream_airflow_dags $(DOWNSTREAM_AIRFLOW_HOME) 8082 upstream_airflow_run: AIRFLOW_HOME=$(UPSTREAM_AIRFLOW_HOME) airflow standalone diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/__init__.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/with_specs.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/with_specs.py new file mode 100644 index 0000000000000..d2b2a4e632c0d --- /dev/null +++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/with_specs.py @@ -0,0 +1,61 @@ +from dagster import Definitions +from dagster._core.definitions.asset_spec import replace_attributes +from dagster_airlift.core import ( + AirflowBasicAuthBackend, + AirflowInstance, + build_airflow_polling_sensor, + load_airflow_dag_asset_specs, +) + +upstream_airflow_instance = AirflowInstance( + auth_backend=AirflowBasicAuthBackend( + webserver_url="http://localhost:8081", + username="admin", + password="admin", + ), + name="upstream", +) + +downstream_airflow_instance = AirflowInstance( + auth_backend=AirflowBasicAuthBackend( + webserver_url="http://localhost:8082", + username="admin", + password="admin", + ), + name="downstream", +) + +load_customers_dag_asset = next( + iter( + load_airflow_dag_asset_specs( + airflow_instance=upstream_airflow_instance, + dag_selector_fn=lambda dag: dag.dag_id == "load_customers", + ) + ) +) +customer_metrics_dag_asset = replace_attributes( + next( + iter( + load_airflow_dag_asset_specs( + airflow_instance=downstream_airflow_instance, + dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics", + ) + ) + # Add a dependency on the load_customers_dag_asset + ), + deps=[load_customers_dag_asset], +) + +upstream_sensor = build_airflow_polling_sensor( + mapped_assets=[load_customers_dag_asset], + airflow_instance=upstream_airflow_instance, +) +downstream_sensor = build_airflow_polling_sensor( + mapped_assets=[customer_metrics_dag_asset], + airflow_instance=downstream_airflow_instance, +) + +defs = Definitions( + assets=[load_customers_dag_asset, customer_metrics_dag_asset], + sensors=[upstream_sensor, downstream_sensor], +) diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/conftest.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/conftest.py index d19de28d07364..d55b440fdf68c 100644 --- a/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/conftest.py +++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/conftest.py @@ -1,5 +1,7 @@ import os +import shutil import subprocess +from contextlib import contextmanager from pathlib import Path from typing import Generator @@ -66,3 +68,23 @@ def dagster_fixture( finally: if process: process.terminate() + + +@contextmanager +def replace_file(old_file: Path, new_file: Path) -> Generator[None, None, None]: + backup_file = old_file.with_suffix(old_file.suffix + ".bak") + try: + if old_file.exists(): + shutil.copy2(old_file, backup_file) + if new_file.exists(): + shutil.copy2(new_file, old_file) + else: + raise FileNotFoundError(f"New file {new_file} not found") + yield + finally: + if backup_file.exists(): + shutil.copy2(backup_file, old_file) + backup_file.unlink() + + +ORIG_DEFS_FILE = makefile_dir() / "airlift_federation_tutorial" / "dagster_defs" / "definitions.py" diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_specs_stage.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_specs_stage.py new file mode 100644 index 0000000000000..7c96d838d5d56 --- /dev/null +++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_specs_stage.py @@ -0,0 +1,43 @@ +import subprocess +from typing import Generator + +import pytest +import requests +from airlift_federation_tutorial_tests.conftest import ORIG_DEFS_FILE, makefile_dir, replace_file +from dagster_airlift.in_airflow.gql_queries import ASSET_NODES_QUERY +from dagster_airlift.test.shared_fixtures import stand_up_dagster + +STAGE_FILE = ORIG_DEFS_FILE.parent / "stages" / "with_specs.py" + + +@pytest.fixture +def completed_stage() -> Generator[None, None, None]: + with replace_file(ORIG_DEFS_FILE, STAGE_FILE): + yield + + +@pytest.fixture(name="dagster_dev") +def dagster_fixture( + upstream_airflow: subprocess.Popen, downstream_airflow: subprocess.Popen, completed_stage: None +) -> Generator[subprocess.Popen, None, None]: + process = None + try: + with stand_up_dagster( + dagster_dev_cmd=["make", "-C", str(makefile_dir()), "dagster_run"], + port=3000, + ) as process: + yield process + finally: + if process: + process.terminate() + + +def test_with_specs(dagster_dev: subprocess.Popen) -> None: + response = requests.post( + # Timeout in seconds + "http://localhost:3000/graphql", + json={"query": ASSET_NODES_QUERY}, + timeout=3, + ) + assert response.status_code == 200 + assert len(response.json()["data"]["assetNodes"]) == 2 diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/__init__.py b/examples/experimental/dagster-airlift/dagster_airlift/core/__init__.py index d09a25a1136ce..8d6baccc3a9a8 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/__init__.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/__init__.py @@ -9,6 +9,7 @@ AirflowInstance as AirflowInstance, DagSelectorFn as DagSelectorFn, build_defs_from_airflow_instance as build_defs_from_airflow_instance, + load_airflow_dag_asset_specs as load_airflow_dag_asset_specs, ) from .multiple_tasks import ( TaskHandleDict as TaskHandleDict, diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py b/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py index 0adf96ac35957..5be669d52cc12 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py @@ -352,9 +352,12 @@ def enrich_airflow_mapped_assets( def load_airflow_dag_asset_specs( airflow_instance: AirflowInstance, mapped_assets: Optional[Iterable[Union[AssetSpec, AssetsDefinition]]] = None, + dag_selector_fn: Optional[DagSelectorFn] = None, ) -> Sequence[AssetSpec]: """Load asset specs for Airflow DAGs from the provided :py:class:`AirflowInstance`, and link upstreams from mapped assets.""" serialized_data = AirflowInstanceDefsLoader( - airflow_instance=airflow_instance, mapped_assets=mapped_assets or [] + airflow_instance=airflow_instance, + mapped_assets=mapped_assets or [], + dag_selector_fn=dag_selector_fn, ).get_or_fetch_state() return list(spec_iterator(construct_dag_assets_defs(serialized_data)))