-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[dagster-airlift] Observe section federation tutorial
- Loading branch information
Showing
14 changed files
with
412 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
208 changes: 208 additions & 0 deletions
208
docs/content/integrations/airlift/federation-tutorial/observe.mdx
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,208 @@ | ||
# Airflow Federation Tutorial: Observing multiple Airflow instances | ||
|
||
At this point, we should have finished the [setup](/integrations/airlift/federation-tutorial/setup) step, and now we have the example code setup with a fresh virtual environment, and our two Airflow instances running locally. Now, we can start writing Dagster code. | ||
|
||
## Observing the Airflow instances | ||
|
||
We'll start by creating asset representations of our DAGs in Dagster. | ||
|
||
Create a new shell and navigate to the root of the tutorial directory. You will need to set up the `dagster-airlift` package in your Dagster environment: | ||
|
||
```bash | ||
source .venv/bin/activate | ||
uv pip install 'dagster-airlift[core]' dagster-webserver dagster | ||
``` | ||
|
||
### Observing the `warehouse` Airflow instance | ||
|
||
Next, we'll declare a reference to our `warehouse` Airflow instance, which is running at `http://localhost:8081`. | ||
|
||
```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_warehouse_instance endbefore=end_warehouse_instance | ||
from dagster_airlift.core import AirflowBasicAuthBackend, AirflowInstance | ||
|
||
warehouse_airflow_instance = AirflowInstance( | ||
auth_backend=AirflowBasicAuthBackend( | ||
webserver_url="http://localhost:8081", | ||
username="admin", | ||
password="admin", | ||
), | ||
name="warehouse", | ||
) | ||
``` | ||
|
||
Now, we can use the `load_airflow_dag_asset_specs` function to create asset representations of the DAGs in the `warehouse` Airflow instance: | ||
|
||
```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_load_all endbefore=end_load_all | ||
from dagster_airlift.core import load_airflow_dag_asset_specs | ||
|
||
assets = load_airflow_dag_asset_specs( | ||
airflow_instance=warehouse_airflow_instance, | ||
) | ||
``` | ||
|
||
Now, let's add these assets to a `Definitions` object: | ||
|
||
```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_defs endbefore=end_defs | ||
from dagster import Definitions | ||
|
||
defs = Definitions(assets=assets) | ||
``` | ||
|
||
Let's set up some environment variables, and then point Dagster to see the asset created from our Airflow instance: | ||
|
||
```bash | ||
# Set up environment variables to point to the airlift-federation-tutorial directory on your machine | ||
export TUTORIAL_EXAMPLE_DIR=$(pwd) | ||
export DAGSTER_HOME="$TUTORIAL_EXAMPLE_DIR/.dagster_home" | ||
dagster dev -f airlift_federation_tutorial/dagster_defs/definitions.py | ||
``` | ||
|
||
If we navigate to the Dagster UI (running at `http://localhost:3000`), we should see the assets created from the `warehouse` Airflow instance. | ||
|
||
<Image | ||
alt="Assets from the warehouse Airflow instance in the Dagster UI" | ||
src="/images/integrations/airlift/observe_warehouse.png" | ||
width={320} | ||
height={198} | ||
/> | ||
|
||
There's a lot of DAGs in this instance, and we only want to focus on the `load_customers` DAG. Let's filter the assets to only include the `load_customers` DAG: | ||
|
||
```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_filter endbefore=end_filter | ||
load_customers = next( | ||
iter( | ||
load_airflow_dag_asset_specs( | ||
airflow_instance=warehouse_airflow_instance, | ||
dag_selector_fn=lambda dag: dag.dag_id == "load_customers", | ||
) | ||
) | ||
) | ||
``` | ||
|
||
Let's instead add this asset to our `Definitions` object: | ||
|
||
```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_customers_defs endbefore=end_customers_defs | ||
defs = Definitions(assets=[load_customers]) | ||
``` | ||
|
||
Now, our Dagster environment only includes the `load_customers` DAG from the `warehouse` Airflow instance. | ||
|
||
<Image | ||
alt="Assets from the warehouse Airflow instance in the Dagster UI" | ||
src="/images/integrations/airlift/only_load_customers.png" | ||
width={320} | ||
height={198} | ||
/> | ||
|
||
Finally, we'll use a sensor to poll the `warehouse` Airflow instance for new runs. This way, whenever we get a successful run of the `load_customers` DAG, we'll see a materialization in the Dagster UI: | ||
|
||
```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_sensor endbefore=end_sensor | ||
from dagster_airlift.core import build_airflow_polling_sensor | ||
|
||
warehouse_sensor = build_airflow_polling_sensor( | ||
mapped_assets=[load_customers], | ||
airflow_instance=warehouse_airflow_instance, | ||
) | ||
``` | ||
|
||
Now, we can add this sensor to our `Definitions` object: | ||
|
||
```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_sensor_defs endbefore=end_sensor_defs | ||
defs = Definitions(assets=[load_customers], sensors=[warehouse_sensor]) | ||
``` | ||
|
||
You can test this by navigating to the airflow UI at localhost:8081, and triggering a run of the `load_customers` DAG. When the run completes, you should see a materialization in the Dagster UI. | ||
|
||
<Image | ||
alt="Materialization of the load_customers DAG in the Dagster UI" | ||
src="/images/integrations/airlift/load_customers_mat.png" | ||
width={320} | ||
height={198} | ||
/> | ||
|
||
### Observing the `metrics` Airflow instance | ||
|
||
We can repeat the same process for the `customer_metrics` DAG in the `metrics` Airflow instance, which runs at `http://localhost:8082`. We'll leave this as an exercise to test your understanding. | ||
|
||
When complete, your code should look like this: | ||
|
||
```python file=../../airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/observe_complete.py | ||
from dagster import Definitions | ||
from dagster_airlift.core import ( | ||
AirflowBasicAuthBackend, | ||
AirflowInstance, | ||
build_airflow_polling_sensor, | ||
load_airflow_dag_asset_specs, | ||
) | ||
|
||
warehouse_airflow_instance = AirflowInstance( | ||
auth_backend=AirflowBasicAuthBackend( | ||
webserver_url="http://localhost:8081", | ||
username="admin", | ||
password="admin", | ||
), | ||
name="warehouse", | ||
) | ||
|
||
metrics_airflow_instance = AirflowInstance( | ||
auth_backend=AirflowBasicAuthBackend( | ||
webserver_url="http://localhost:8082", | ||
username="admin", | ||
password="admin", | ||
), | ||
name="metrics", | ||
) | ||
|
||
load_customers_dag_asset = next( | ||
iter( | ||
load_airflow_dag_asset_specs( | ||
airflow_instance=warehouse_airflow_instance, | ||
dag_selector_fn=lambda dag: dag.dag_id == "load_customers", | ||
) | ||
) | ||
) | ||
customer_metrics_dag_asset = next( | ||
iter( | ||
load_airflow_dag_asset_specs( | ||
airflow_instance=metrics_airflow_instance, | ||
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics", | ||
) | ||
) | ||
) | ||
|
||
warehouse_sensor = build_airflow_polling_sensor( | ||
mapped_assets=[load_customers_dag_asset], | ||
airflow_instance=warehouse_airflow_instance, | ||
) | ||
metrics_sensor = build_airflow_polling_sensor( | ||
mapped_assets=[customer_metrics_dag_asset], | ||
airflow_instance=metrics_airflow_instance, | ||
) | ||
|
||
defs = Definitions( | ||
assets=[load_customers_dag_asset, customer_metrics_dag_asset], | ||
sensors=[warehouse_sensor, metrics_sensor], | ||
) | ||
``` | ||
|
||
### Adding lineage between `load_customers` and `customer_metrics` | ||
|
||
Now that we have both DAGs loaded into Dagster, we can observe the cross-dag lineage between them. To do this, we'll use the `replace_attributes` function to add a dependency from the `load_customers` asset to the `customer_metrics` asset: | ||
|
||
```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_lineage endbefore=end_lineage | ||
from dagster._core.definitions.asset_spec import replace_attributes | ||
|
||
customer_metrics_dag_asset = replace_attributes( | ||
customer_metrics_dag_asset, | ||
deps=[load_customers], | ||
) | ||
``` | ||
|
||
Now, after adding the updated `customer_metrics_dag_asset` to our `Definitions` object, we should see the lineage between the two DAGs in the Dagster UI. | ||
|
||
<Image | ||
alt="Lineage between load_customers and customer_metrics in the Dagster UI" | ||
src="/images/integrations/airlift/dag_lineage.png" | ||
width={320} | ||
height={198} | ||
/> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
56 changes: 56 additions & 0 deletions
56
...t-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/observe_complete.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
from dagster import Definitions | ||
from dagster_airlift.core import ( | ||
AirflowBasicAuthBackend, | ||
AirflowInstance, | ||
build_airflow_polling_sensor, | ||
load_airflow_dag_asset_specs, | ||
) | ||
|
||
warehouse_airflow_instance = AirflowInstance( | ||
auth_backend=AirflowBasicAuthBackend( | ||
webserver_url="http://localhost:8081", | ||
username="admin", | ||
password="admin", | ||
), | ||
name="warehouse", | ||
) | ||
|
||
metrics_airflow_instance = AirflowInstance( | ||
auth_backend=AirflowBasicAuthBackend( | ||
webserver_url="http://localhost:8082", | ||
username="admin", | ||
password="admin", | ||
), | ||
name="metrics", | ||
) | ||
|
||
load_customers_dag_asset = next( | ||
iter( | ||
load_airflow_dag_asset_specs( | ||
airflow_instance=warehouse_airflow_instance, | ||
dag_selector_fn=lambda dag: dag.dag_id == "load_customers", | ||
) | ||
) | ||
) | ||
customer_metrics_dag_asset = next( | ||
iter( | ||
load_airflow_dag_asset_specs( | ||
airflow_instance=metrics_airflow_instance, | ||
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics", | ||
) | ||
) | ||
) | ||
|
||
warehouse_sensor = build_airflow_polling_sensor( | ||
mapped_assets=[load_customers_dag_asset], | ||
airflow_instance=warehouse_airflow_instance, | ||
) | ||
metrics_sensor = build_airflow_polling_sensor( | ||
mapped_assets=[customer_metrics_dag_asset], | ||
airflow_instance=metrics_airflow_instance, | ||
) | ||
|
||
defs = Definitions( | ||
assets=[load_customers_dag_asset, customer_metrics_dag_asset], | ||
sensors=[warehouse_sensor, metrics_sensor], | ||
) |
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
30 changes: 30 additions & 0 deletions
30
examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_load.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
import subprocess | ||
|
||
import requests | ||
from airlift_federation_tutorial_tests.conftest import ( | ||
assert_successful_dag_run, | ||
downstream_instance, | ||
upstream_instance, | ||
) | ||
from dagster_airlift.in_airflow.gql_queries import VERIFICATION_QUERY | ||
|
||
|
||
def test_load_upstream(upstream_airflow: subprocess.Popen) -> None: | ||
af_instance = upstream_instance() | ||
assert len(af_instance.list_dags()) == 11 | ||
assert_successful_dag_run(af_instance, "load_customers") | ||
|
||
|
||
def test_load_downstream(downstream_airflow: subprocess.Popen) -> None: | ||
assert len(downstream_instance().list_dags()) == 11 | ||
assert_successful_dag_run(downstream_instance(), "customer_metrics") | ||
|
||
|
||
def test_load_dagster(dagster_dev: subprocess.Popen) -> None: | ||
response = requests.post( | ||
# Timeout in seconds | ||
"http://localhost:3000/graphql", | ||
json={"query": VERIFICATION_QUERY}, | ||
timeout=3, | ||
) | ||
assert response.status_code == 200 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.