Skip to content

Commit

Permalink
[dagster-airlift] Observe section federation tutorial
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Nov 14, 2024
1 parent 31e1b1e commit 7da4baa
Show file tree
Hide file tree
Showing 13 changed files with 411 additions and 7 deletions.
4 changes: 4 additions & 0 deletions docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,10 @@
{
"title": "Part 1: Setup upstream and downstream Airflow instances",
"path": "/integrations/airlift/federation-tutorial/setup"
},
{
"title": "Part 2: Observe dag lineage in Dagster",
"path": "/integrations/airlift/federation-tutorial/observe"
}
]
},
Expand Down
208 changes: 208 additions & 0 deletions docs/content/integrations/airlift/federation-tutorial/observe.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
# Airflow Federation Tutorial: Observing multiple Airflow instances

At this point, we should have finished the [setup](/integrations/airlift/federation-tutorial/setup) step, and now we have the example code setup with a fresh virtual environment, and our two Airflow instances running locally. Now, we can start writing Dagster code.

## Observing the Airflow instances

We'll start by creating asset representations of our DAGs in Dagster.

Create a new shell and navigate to the root of the tutorial directory. You will need to set up the `dagster-airlift` package in your Dagster environment:

```bash
source .venv/bin/activate
uv pip install 'dagster-airlift[core]' dagster-webserver dagster
```

### Observing the `warehouse` Airflow instance

Next, we'll declare a reference to our `warehouse` Airflow instance, which is running at `http://localhost:8081`.

```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_warehouse_instance endbefore=end_warehouse_instance
from dagster_airlift.core import AirflowBasicAuthBackend, AirflowInstance

warehouse_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8081",
username="admin",
password="admin",
),
name="warehouse",
)
```

Now, we can use the `load_airflow_dag_asset_specs` function to create asset representations of the DAGs in the `warehouse` Airflow instance:

```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_load_all endbefore=end_load_all
from dagster_airlift.core import load_airflow_dag_asset_specs

assets = load_airflow_dag_asset_specs(
airflow_instance=warehouse_airflow_instance,
)
```

Now, let's add these assets to a `Definitions` object:

```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_defs endbefore=end_defs
from dagster import Definitions

defs = Definitions(assets=assets)
```

Let's set up some environment variables, and then point Dagster to see the asset created from our Airflow instance:

```bash
# Set up environment variables to point to the airlift-federation-tutorial directory on your machine
export TUTORIAL_EXAMPLE_DIR=$(pwd)
export DAGSTER_HOME="$TUTORIAL_EXAMPLE_DIR/.dagster_home"
dagster dev -f airlift_federation_tutorial/dagster_defs/definitions.py
```

If we navigate to the Dagster UI (running at `http://localhost:3000`), we should see the assets created from the `warehouse` Airflow instance.

<Image
alt="Assets from the warehouse Airflow instance in the Dagster UI"
src="/images/integrations/airlift/observe_warehouse.png"
width={320}
height={198}
/>

There's a lot of DAGs in this instance, and we only want to focus on the `load_customers` DAG. Let's filter the assets to only include the `load_customers` DAG:

```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_filter endbefore=end_filter
load_customers = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=warehouse_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "load_customers",
)
)
)
```

Let's instead add this asset to our `Definitions` object:

```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_customers_defs endbefore=end_customers_defs
defs = Definitions(assets=[load_customers])
```

Now, our Dagster environment only includes the `load_customers` DAG from the `warehouse` Airflow instance.

<Image
alt="Assets from the warehouse Airflow instance in the Dagster UI"
src="/images/integrations/airlift/only_load_customers.png"
width={320}
height={198}
/>

Finally, we'll use a sensor to poll the `warehouse` Airflow instance for new runs. This way, whenever we get a successful run of the `load_customers` DAG, we'll see a materialization in the Dagster UI:

```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_sensor endbefore=end_sensor
from dagster_airlift.core import build_airflow_polling_sensor

warehouse_sensor = build_airflow_polling_sensor(
mapped_assets=[load_customers],
airflow_instance=warehouse_airflow_instance,
)
```

Now, we can add this sensor to our `Definitions` object:

```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_sensor_defs endbefore=end_sensor_defs
defs = Definitions(assets=[load_customers], sensors=[warehouse_sensor])
```

You can test this by navigating to the airflow UI at localhost:8081, and triggering a run of the `load_customers` DAG. When the run completes, you should see a materialization in the Dagster UI.

<Image
alt="Materialization of the load_customers DAG in the Dagster UI"
src="/images/integrations/airlift/load_customers_mat.png"
width={320}
height={198}
/>

### Observing the `metrics` Airflow instance

We can repeat the same process for the `customer_metrics` DAG in the `metrics` Airflow instance, which runs at `http://localhost:8082`. We'll leave this as an exercise to test your understanding.

When complete, your code should look like this:

```python file=../../airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/observe_complete.py
from dagster import Definitions
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
build_airflow_polling_sensor,
load_airflow_dag_asset_specs,
)

warehouse_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8081",
username="admin",
password="admin",
),
name="warehouse",
)

metrics_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8082",
username="admin",
password="admin",
),
name="metrics",
)

load_customers_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=warehouse_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "load_customers",
)
)
)
customer_metrics_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
)
)

warehouse_sensor = build_airflow_polling_sensor(
mapped_assets=[load_customers_dag_asset],
airflow_instance=warehouse_airflow_instance,
)
metrics_sensor = build_airflow_polling_sensor(
mapped_assets=[customer_metrics_dag_asset],
airflow_instance=metrics_airflow_instance,
)

defs = Definitions(
assets=[load_customers_dag_asset, customer_metrics_dag_asset],
sensors=[warehouse_sensor, metrics_sensor],
)
```

### Adding lineage between `load_customers` and `customer_metrics`

Now that we have both DAGs loaded into Dagster, we can observe the cross-dag lineage between them. To do this, we'll use the `replace_attributes` function to add a dependency from the `load_customers` asset to the `customer_metrics` asset:

```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_lineage endbefore=end_lineage
from dagster._core.definitions.asset_spec import replace_attributes

customer_metrics_dag_asset = replace_attributes(
customer_metrics_dag_asset,
deps=[load_customers],
)
```

Now, after adding the updated `customer_metrics_dag_asset` to our `Definitions` object, we should see the lineage between the two DAGs in the Dagster UI.

<Image
alt="Lineage between load_customers and customer_metrics in the Dagster UI"
src="/images/integrations/airlift/dag_lineage.png"
width={320}
height={198}
/>
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,7 @@ src="/images/integrations/airlift/customer_metrics.png"
width={1484}
height={300}
/>

## Next steps

In the next section, we'll add asset representations of our DAGs, and set up lineage across both Airflow instances. Follow along [here](/integrations/airlift/federation-tutorial/observe).
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from dagster import Definitions
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
build_airflow_polling_sensor,
load_airflow_dag_asset_specs,
)

warehouse_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8081",
username="admin",
password="admin",
),
name="warehouse",
)

metrics_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8082",
username="admin",
password="admin",
),
name="metrics",
)

load_customers_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=warehouse_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "load_customers",
)
)
)
customer_metrics_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
)
)

warehouse_sensor = build_airflow_polling_sensor(
mapped_assets=[load_customers_dag_asset],
airflow_instance=warehouse_airflow_instance,
)
metrics_sensor = build_airflow_polling_sensor(
mapped_assets=[customer_metrics_dag_asset],
airflow_instance=metrics_airflow_instance,
)

defs = Definitions(
assets=[load_customers_dag_asset, customer_metrics_dag_asset],
sensors=[warehouse_sensor, metrics_sensor],
)
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def replace_file(old_file: Path, new_file: Path) -> Generator[None, None, None]:


ORIG_DEFS_FILE = makefile_dir() / "airlift_federation_tutorial" / "dagster_defs" / "definitions.py"
SNIPPETS_DIR = makefile_dir() / "snippets"


def assert_successful_dag_run(af_instance: AirflowInstance, dag_id: str) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import subprocess

import requests
from airlift_federation_tutorial_tests.conftest import (
assert_successful_dag_run,
downstream_instance,
upstream_instance,
)
from dagster_airlift.in_airflow.gql_queries import VERIFICATION_QUERY


def test_load_upstream(upstream_airflow: subprocess.Popen) -> None:
af_instance = upstream_instance()
assert len(af_instance.list_dags()) == 11
assert_successful_dag_run(af_instance, "load_customers")


def test_load_downstream(downstream_airflow: subprocess.Popen) -> None:
assert len(downstream_instance().list_dags()) == 11
assert_successful_dag_run(downstream_instance(), "customer_metrics")


def test_load_dagster(dagster_dev: subprocess.Popen) -> None:
response = requests.post(
# Timeout in seconds
"http://localhost:3000/graphql",
json={"query": VERIFICATION_QUERY},
timeout=3,
)
assert response.status_code == 200
Original file line number Diff line number Diff line change
@@ -1,24 +1,37 @@
import subprocess
from pathlib import Path
from typing import Generator

import pytest
import requests
from airlift_federation_tutorial_tests.conftest import ORIG_DEFS_FILE, makefile_dir, replace_file
from airlift_federation_tutorial_tests.conftest import (
ORIG_DEFS_FILE,
SNIPPETS_DIR,
makefile_dir,
replace_file,
)
from dagster_airlift.in_airflow.gql_queries import ASSET_NODES_QUERY
from dagster_airlift.test.shared_fixtures import stand_up_dagster

STAGE_FILE = ORIG_DEFS_FILE.parent / "stages" / "with_specs.py"
OBSERVE_COMPLETE_FILE = ORIG_DEFS_FILE.parent / "stages" / "observe_complete.py"
OBSERVE_WITH_DEPS_FILE = ORIG_DEFS_FILE.parent / "stages" / "observe_with_deps.py"
OBSERVE_SNIPPETS_FILE = SNIPPETS_DIR / "observe.py"


@pytest.fixture
def completed_stage() -> Generator[None, None, None]:
with replace_file(ORIG_DEFS_FILE, STAGE_FILE):
def stage_file(request: pytest.FixtureRequest) -> Path:
return request.param


@pytest.fixture
def simulate_stage(stage_file: Path) -> Generator[None, None, None]:
with replace_file(ORIG_DEFS_FILE, stage_file):
yield


@pytest.fixture(name="dagster_dev")
def dagster_fixture(
warehouse_airflow: subprocess.Popen, metrics_airflow: subprocess.Popen, completed_stage: None
warehouse_airflow: subprocess.Popen, metrics_airflow: subprocess.Popen, simulate_stage: None
) -> Generator[subprocess.Popen, None, None]:
process = None
try:
Expand All @@ -32,12 +45,17 @@ def dagster_fixture(
process.terminate()


def test_with_specs(dagster_dev: subprocess.Popen) -> None:
@pytest.mark.parametrize(
"stage_file",
[OBSERVE_COMPLETE_FILE, OBSERVE_WITH_DEPS_FILE, OBSERVE_SNIPPETS_FILE],
indirect=True,
)
def test_stage(dagster_dev: subprocess.Popen, stage_file: Path) -> None:
response = requests.post(
# Timeout in seconds
"http://localhost:3000/graphql",
json={"query": ASSET_NODES_QUERY},
timeout=3,
)
assert response.status_code == 200
assert len(response.json()["data"]["assetNodes"]) == 2
assert len(response.json()["data"]["assetNodes"]) > 0
Loading

0 comments on commit 7da4baa

Please sign in to comment.