diff --git a/docs/content/_navigation.json b/docs/content/_navigation.json index f761f044b3e9d..252e586b3aa90 100644 --- a/docs/content/_navigation.json +++ b/docs/content/_navigation.json @@ -949,6 +949,10 @@ { "title": "Part 1: Setup upstream and downstream Airflow instances", "path": "/integrations/airlift/federation-tutorial/setup" + }, + { + "title": "Part 2: Observe dag lineage in Dagster", + "path": "/integrations/airlift/federation-tutorial/observe" } ] }, diff --git a/docs/content/integrations/airlift/federation-tutorial/observe.mdx b/docs/content/integrations/airlift/federation-tutorial/observe.mdx new file mode 100644 index 0000000000000..e0d156dc4b956 --- /dev/null +++ b/docs/content/integrations/airlift/federation-tutorial/observe.mdx @@ -0,0 +1,208 @@ +# Airflow Federation Tutorial: Observing multiple Airflow instances + +At this point, we should have finished the [setup](/integrations/airlift/federation-tutorial/setup) step, and now we have the example code setup with a fresh virtual environment, and our two Airflow instances running locally. Now, we can start writing Dagster code. + +## Observing the Airflow instances + +We'll start by creating asset representations of our DAGs in Dagster. + +Create a new shell and navigate to the root of the tutorial directory. You will need to set up the `dagster-airlift` package in your Dagster environment: + +```bash +source .venv/bin/activate +uv pip install 'dagster-airlift[core]' dagster-webserver dagster +``` + +### Observing the `warehouse` Airflow instance + +Next, we'll declare a reference to our `warehouse` Airflow instance, which is running at `http://localhost:8081`. + +```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_warehouse_instance endbefore=end_warehouse_instance +from dagster_airlift.core import AirflowBasicAuthBackend, AirflowInstance + +warehouse_airflow_instance = AirflowInstance( + auth_backend=AirflowBasicAuthBackend( + webserver_url="http://localhost:8081", + username="admin", + password="admin", + ), + name="warehouse", +) +``` + +Now, we can use the `load_airflow_dag_asset_specs` function to create asset representations of the DAGs in the `warehouse` Airflow instance: + +```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_load_all endbefore=end_load_all +from dagster_airlift.core import load_airflow_dag_asset_specs + +assets = load_airflow_dag_asset_specs( + airflow_instance=warehouse_airflow_instance, +) +``` + +Now, let's add these assets to a `Definitions` object: + +```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_defs endbefore=end_defs +from dagster import Definitions + +defs = Definitions(assets=assets) +``` + +Let's set up some environment variables, and then point Dagster to see the asset created from our Airflow instance: + +```bash +# Set up environment variables to point to the airlift-federation-tutorial directory on your machine +export TUTORIAL_EXAMPLE_DIR=$(pwd) +export DAGSTER_HOME="$TUTORIAL_EXAMPLE_DIR/.dagster_home" +dagster dev -f airlift_federation_tutorial/dagster_defs/definitions.py +``` + +If we navigate to the Dagster UI (running at `http://localhost:3000`), we should see the assets created from the `warehouse` Airflow instance. + + + +There's a lot of DAGs in this instance, and we only want to focus on the `load_customers` DAG. Let's filter the assets to only include the `load_customers` DAG: + +```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_filter endbefore=end_filter +load_customers = next( + iter( + load_airflow_dag_asset_specs( + airflow_instance=warehouse_airflow_instance, + dag_selector_fn=lambda dag: dag.dag_id == "load_customers", + ) + ) +) +``` + +Let's instead add this asset to our `Definitions` object: + +```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_customers_defs endbefore=end_customers_defs +defs = Definitions(assets=[load_customers]) +``` + +Now, our Dagster environment only includes the `load_customers` DAG from the `warehouse` Airflow instance. + + + +Finally, we'll use a sensor to poll the `warehouse` Airflow instance for new runs. This way, whenever we get a successful run of the `load_customers` DAG, we'll see a materialization in the Dagster UI: + +```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_sensor endbefore=end_sensor +from dagster_airlift.core import build_airflow_polling_sensor + +warehouse_sensor = build_airflow_polling_sensor( + mapped_assets=[load_customers], + airflow_instance=warehouse_airflow_instance, +) +``` + +Now, we can add this sensor to our `Definitions` object: + +```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_sensor_defs endbefore=end_sensor_defs +defs = Definitions(assets=[load_customers], sensors=[warehouse_sensor]) +``` + +You can test this by navigating to the airflow UI at localhost:8081, and triggering a run of the `load_customers` DAG. When the run completes, you should see a materialization in the Dagster UI. + + + +### Observing the `metrics` Airflow instance + +We can repeat the same process for the `customer_metrics` DAG in the `metrics` Airflow instance, which runs at `http://localhost:8082`. We'll leave this as an exercise to test your understanding. + +When complete, your code should look like this: + +```python file=../../airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/observe_complete.py +from dagster import Definitions +from dagster_airlift.core import ( + AirflowBasicAuthBackend, + AirflowInstance, + build_airflow_polling_sensor, + load_airflow_dag_asset_specs, +) + +warehouse_airflow_instance = AirflowInstance( + auth_backend=AirflowBasicAuthBackend( + webserver_url="http://localhost:8081", + username="admin", + password="admin", + ), + name="warehouse", +) + +metrics_airflow_instance = AirflowInstance( + auth_backend=AirflowBasicAuthBackend( + webserver_url="http://localhost:8082", + username="admin", + password="admin", + ), + name="metrics", +) + +load_customers_dag_asset = next( + iter( + load_airflow_dag_asset_specs( + airflow_instance=warehouse_airflow_instance, + dag_selector_fn=lambda dag: dag.dag_id == "load_customers", + ) + ) +) +customer_metrics_dag_asset = next( + iter( + load_airflow_dag_asset_specs( + airflow_instance=metrics_airflow_instance, + dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics", + ) + ) +) + +warehouse_sensor = build_airflow_polling_sensor( + mapped_assets=[load_customers_dag_asset], + airflow_instance=warehouse_airflow_instance, +) +metrics_sensor = build_airflow_polling_sensor( + mapped_assets=[customer_metrics_dag_asset], + airflow_instance=metrics_airflow_instance, +) + +defs = Definitions( + assets=[load_customers_dag_asset, customer_metrics_dag_asset], + sensors=[warehouse_sensor, metrics_sensor], +) +``` + +### Adding lineage between `load_customers` and `customer_metrics` + +Now that we have both DAGs loaded into Dagster, we can observe the cross-dag lineage between them. To do this, we'll use the `replace_attributes` function to add a dependency from the `load_customers` asset to the `customer_metrics` asset: + +```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_lineage endbefore=end_lineage +from dagster._core.definitions.asset_spec import replace_attributes + +customer_metrics_dag_asset = replace_attributes( + customer_metrics_dag_asset, + deps=[load_customers], +) +``` + +Now, after adding the updated `customer_metrics_dag_asset` to our `Definitions` object, we should see the lineage between the two DAGs in the Dagster UI. + + diff --git a/docs/content/integrations/airlift/federation-tutorial/setup.mdx b/docs/content/integrations/airlift/federation-tutorial/setup.mdx index 38febe0c0df64..4d500712ff91e 100644 --- a/docs/content/integrations/airlift/federation-tutorial/setup.mdx +++ b/docs/content/integrations/airlift/federation-tutorial/setup.mdx @@ -74,3 +74,7 @@ src="/images/integrations/airlift/customer_metrics.png" width={1484} height={300} /> + +## Next steps + +In the next section, we'll add asset representations of our DAGs, and set up lineage across both Airflow instances. Follow along [here](/integrations/airlift/federation-tutorial/observe). \ No newline at end of file diff --git a/docs/next/public/images/integrations/airlift/dag_lineage.png b/docs/next/public/images/integrations/airlift/dag_lineage.png new file mode 100644 index 0000000000000..e44bb1073b9ff Binary files /dev/null and b/docs/next/public/images/integrations/airlift/dag_lineage.png differ diff --git a/docs/next/public/images/integrations/airlift/load_customers_mat.png b/docs/next/public/images/integrations/airlift/load_customers_mat.png new file mode 100644 index 0000000000000..659e89ac3fb0d Binary files /dev/null and b/docs/next/public/images/integrations/airlift/load_customers_mat.png differ diff --git a/docs/next/public/images/integrations/airlift/observe_warehouse.png b/docs/next/public/images/integrations/airlift/observe_warehouse.png new file mode 100644 index 0000000000000..ccb0eaf931c07 Binary files /dev/null and b/docs/next/public/images/integrations/airlift/observe_warehouse.png differ diff --git a/docs/next/public/images/integrations/airlift/only_load_customers.png b/docs/next/public/images/integrations/airlift/only_load_customers.png new file mode 100644 index 0000000000000..0f4a5e260fb6e Binary files /dev/null and b/docs/next/public/images/integrations/airlift/only_load_customers.png differ diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/observe_complete.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/observe_complete.py new file mode 100644 index 0000000000000..1a5562f12135f --- /dev/null +++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/observe_complete.py @@ -0,0 +1,56 @@ +from dagster import Definitions +from dagster_airlift.core import ( + AirflowBasicAuthBackend, + AirflowInstance, + build_airflow_polling_sensor, + load_airflow_dag_asset_specs, +) + +warehouse_airflow_instance = AirflowInstance( + auth_backend=AirflowBasicAuthBackend( + webserver_url="http://localhost:8081", + username="admin", + password="admin", + ), + name="warehouse", +) + +metrics_airflow_instance = AirflowInstance( + auth_backend=AirflowBasicAuthBackend( + webserver_url="http://localhost:8082", + username="admin", + password="admin", + ), + name="metrics", +) + +load_customers_dag_asset = next( + iter( + load_airflow_dag_asset_specs( + airflow_instance=warehouse_airflow_instance, + dag_selector_fn=lambda dag: dag.dag_id == "load_customers", + ) + ) +) +customer_metrics_dag_asset = next( + iter( + load_airflow_dag_asset_specs( + airflow_instance=metrics_airflow_instance, + dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics", + ) + ) +) + +warehouse_sensor = build_airflow_polling_sensor( + mapped_assets=[load_customers_dag_asset], + airflow_instance=warehouse_airflow_instance, +) +metrics_sensor = build_airflow_polling_sensor( + mapped_assets=[customer_metrics_dag_asset], + airflow_instance=metrics_airflow_instance, +) + +defs = Definitions( + assets=[load_customers_dag_asset, customer_metrics_dag_asset], + sensors=[warehouse_sensor, metrics_sensor], +) diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/with_specs.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/observe_with_deps.py similarity index 100% rename from examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/with_specs.py rename to examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/observe_with_deps.py diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/conftest.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/conftest.py index 37443b64565a3..63f0785c969ec 100644 --- a/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/conftest.py +++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/conftest.py @@ -108,6 +108,7 @@ def replace_file(old_file: Path, new_file: Path) -> Generator[None, None, None]: ORIG_DEFS_FILE = makefile_dir() / "airlift_federation_tutorial" / "dagster_defs" / "definitions.py" +SNIPPETS_DIR = makefile_dir() / "snippets" def assert_successful_dag_run(af_instance: AirflowInstance, dag_id: str) -> None: diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_load.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_load.py new file mode 100644 index 0000000000000..15356c3bd7e5d --- /dev/null +++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_load.py @@ -0,0 +1,30 @@ +import subprocess + +import requests +from airlift_federation_tutorial_tests.conftest import ( + assert_successful_dag_run, + downstream_instance, + upstream_instance, +) +from dagster_airlift.in_airflow.gql_queries import VERIFICATION_QUERY + + +def test_load_upstream(upstream_airflow: subprocess.Popen) -> None: + af_instance = upstream_instance() + assert len(af_instance.list_dags()) == 11 + assert_successful_dag_run(af_instance, "load_customers") + + +def test_load_downstream(downstream_airflow: subprocess.Popen) -> None: + assert len(downstream_instance().list_dags()) == 11 + assert_successful_dag_run(downstream_instance(), "customer_metrics") + + +def test_load_dagster(dagster_dev: subprocess.Popen) -> None: + response = requests.post( + # Timeout in seconds + "http://localhost:3000/graphql", + json={"query": VERIFICATION_QUERY}, + timeout=3, + ) + assert response.status_code == 200 diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_specs_stage.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_specs_stage.py index c827ebef9b555..b5f05f123c95a 100644 --- a/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_specs_stage.py +++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_specs_stage.py @@ -1,24 +1,37 @@ import subprocess +from pathlib import Path from typing import Generator import pytest import requests -from airlift_federation_tutorial_tests.conftest import ORIG_DEFS_FILE, makefile_dir, replace_file +from airlift_federation_tutorial_tests.conftest import ( + ORIG_DEFS_FILE, + SNIPPETS_DIR, + makefile_dir, + replace_file, +) from dagster_airlift.in_airflow.gql_queries import ASSET_NODES_QUERY from dagster_airlift.test.shared_fixtures import stand_up_dagster -STAGE_FILE = ORIG_DEFS_FILE.parent / "stages" / "with_specs.py" +OBSERVE_COMPLETE_FILE = ORIG_DEFS_FILE.parent / "stages" / "observe_complete.py" +OBSERVE_WITH_DEPS_FILE = ORIG_DEFS_FILE.parent / "stages" / "observe_with_deps.py" +OBSERVE_SNIPPETS_FILE = SNIPPETS_DIR / "observe.py" @pytest.fixture -def completed_stage() -> Generator[None, None, None]: - with replace_file(ORIG_DEFS_FILE, STAGE_FILE): +def stage_file(request: pytest.FixtureRequest) -> Path: + return request.param + + +@pytest.fixture +def simulate_stage(stage_file: Path) -> Generator[None, None, None]: + with replace_file(ORIG_DEFS_FILE, stage_file): yield @pytest.fixture(name="dagster_dev") def dagster_fixture( - warehouse_airflow: subprocess.Popen, metrics_airflow: subprocess.Popen, completed_stage: None + warehouse_airflow: subprocess.Popen, metrics_airflow: subprocess.Popen, simulate_stage: None ) -> Generator[subprocess.Popen, None, None]: process = None try: @@ -32,7 +45,12 @@ def dagster_fixture( process.terminate() -def test_with_specs(dagster_dev: subprocess.Popen) -> None: +@pytest.mark.parametrize( + "stage_file", + [OBSERVE_COMPLETE_FILE, OBSERVE_WITH_DEPS_FILE, OBSERVE_SNIPPETS_FILE], + indirect=True, +) +def test_stage(dagster_dev: subprocess.Popen, stage_file: Path) -> None: response = requests.post( # Timeout in seconds "http://localhost:3000/graphql", @@ -40,4 +58,4 @@ def test_with_specs(dagster_dev: subprocess.Popen) -> None: timeout=3, ) assert response.status_code == 200 - assert len(response.json()["data"]["assetNodes"]) == 2 + assert len(response.json()["data"]["assetNodes"]) > 0 diff --git a/examples/airlift-federation-tutorial/snippets/observe.py b/examples/airlift-federation-tutorial/snippets/observe.py new file mode 100644 index 0000000000000..8af35b8350b2d --- /dev/null +++ b/examples/airlift-federation-tutorial/snippets/observe.py @@ -0,0 +1,83 @@ +# start_warehouse_instance +from dagster_airlift.core import AirflowBasicAuthBackend, AirflowInstance + +warehouse_airflow_instance = AirflowInstance( + auth_backend=AirflowBasicAuthBackend( + webserver_url="http://localhost:8081", + username="admin", + password="admin", + ), + name="warehouse", +) +# end_warehouse_instance + +# start_load_all +from dagster_airlift.core import load_airflow_dag_asset_specs + +assets = load_airflow_dag_asset_specs( + airflow_instance=warehouse_airflow_instance, +) +# end_load_all + +# start_defs +from dagster import Definitions + +defs = Definitions(assets=assets) +# end_defs + +# start_filter +load_customers = next( + iter( + load_airflow_dag_asset_specs( + airflow_instance=warehouse_airflow_instance, + dag_selector_fn=lambda dag: dag.dag_id == "load_customers", + ) + ) +) +# end_filter + +# start_customers_defs +defs = Definitions(assets=[load_customers]) +# end_customers_defs + +# start_sensor +from dagster_airlift.core import build_airflow_polling_sensor + +warehouse_sensor = build_airflow_polling_sensor( + mapped_assets=[load_customers], + airflow_instance=warehouse_airflow_instance, +) +# end_sensor + +# start_sensor_defs +defs = Definitions(assets=[load_customers], sensors=[warehouse_sensor]) +# end_sensor_defs + +metrics_airflow_instance = AirflowInstance( + auth_backend=AirflowBasicAuthBackend( + webserver_url="http://localhost:8082", + username="admin", + password="admin", + ), + name="metrics", +) + +customer_metrics_dag_asset = next( + iter( + load_airflow_dag_asset_specs( + airflow_instance=metrics_airflow_instance, + dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics", + ) + ) +) + +# start_lineage +from dagster._core.definitions.asset_spec import replace_attributes + +customer_metrics_dag_asset = replace_attributes( + customer_metrics_dag_asset, + deps=[load_customers], +) +# end_lineage + +defs = Definitions(assets=[load_customers, customer_metrics_dag_asset]) diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_defs_data.py b/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_defs_data.py index 09b5adc21d0f3..2f2366773a2b3 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_defs_data.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_defs_data.py @@ -45,7 +45,7 @@ def instance_name(self) -> str: @cached_property def spec_iterator(self) -> Iterable[AssetSpec]: - return spec_iterator(self.mapped_assets) + return list(spec_iterator(self.mapped_assets)) @cached_property def mapping_info(self) -> AirliftMetadataMappingInfo: