Skip to content

Commit

Permalink
[dagster-airlift] Use declarative automation
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Nov 13, 2024
1 parent 3940172 commit 9fd24a1
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 3 deletions.
4 changes: 4 additions & 0 deletions docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,10 @@
{
"title": "Part 2: Observe dag lineage in Dagster",
"path": "/integrations/airlift/federation-tutorial/observe"
},
{
"title": "Part 3: Federate across Airflow instances",
"path": "/integrations/airlift/federation-tutorial/federated-execution"
}
]
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# Airlift Federation Tutorial: Federating Execution Across Airflow Instances

At this point, we should be [observing our DAGs within Dagster](/integrations/airlift/federation-tutorial/observe), and now we have cross-instance lineage for our DAGs. Now, we'll federate the execution of our DAGs across both Airflow instances by using Dagster's Declarative Automation system.

## Making `customer_metrics` executable.

The `load_airflow_dag_asset_specs` function creates asset representations (called `AssetSpec`) of Airflow DAGs, but these assets are not executable. We need to define an execution function in Dagster in order to make them executable.

In order to federate execution of `customer_metrics`, we first need to make it executable within Dagster. We can do this by using the `@multi_asset` decorator to define how the `customer_metrics` asset should be executed. We'll use the `AirflowInstance` defined earlier to trigger a run of the `customer_metrics` DAG. We then wait for the run to complete, and if it is successful, we'll successfully materialize the asset. If the run fails, we'll raise an exception.

```python file=../../airlift-federation-tutorial/snippets/federated_execution.py startafter=start_multi_asset endbefore=end_multi_asset
@multi_asset(specs=[customer_metrics_dag_asset])
def run_customer_metrics() -> MaterializeResult:
run_id = metrics_airflow_instance.trigger_dag("customer_metrics")
metrics_airflow_instance.wait_for_run_completion("customer_metrics", run_id)
if metrics_airflow_instance.get_run_state("customer_metrics", run_id) == "success":
return MaterializeResult(asset_key=customer_metrics_dag_asset.key)
else:
raise Exception("Dag run failed.")
```

Now, we'll replace the `customer_metrics_dag_asset` in our `Definitions` object with the `run_customer_metrics` function:

```python file=../../airlift-federation-tutorial/snippets/federated_execution.py startafter=start_multi_asset_defs endbefore=end_multi_asset_defs
defs = Definitions(
assets=[load_customers_dag_asset, run_customer_metrics],
sensors=[warehouse_sensor, metrics_sensor],
)
```

We should be able to go to the Dagster UI and see that the `customer_metrics` asset can now be materialized.

## Federating execution

Ultimately, we would like to kick off a run of `customer_metrics` whenever `load_customers` completes successfully. We're already retrieving a materialization when `load_customers` completes, so we can use this to trigger a run of `customer_metrics` by using Declarative Automation. First, we'll add an `AutomationCondition.eager()` to our `customer_metrics_dag_asset`. This will tell Dagster to run the `run_customer_metrics` function whenever the `load_customers` asset is materialized.

```python file=../../airlift-federation-tutorial/snippets/federated_execution.py startafter=start_eager endbefore=end_eager
from dagster import AutomationCondition

customer_metrics_dag_asset = replace_attributes(
customer_metrics_dag_asset,
automation_condition=AutomationCondition.eager(),
)
```

Now, we can set up Declarative Automation by adding an `AutomationConditionSensorDefinition`.

```python file=../../airlift-federation-tutorial/snippets/federated_execution.py startafter=start_automation_sensor endbefore=end_automation_sensor
automation_sensor = AutomationConditionSensorDefinition(
name="automation_sensor",
target="*",
default_status=DefaultSensorStatus.RUNNING,
minimum_interval_seconds=1,
)
```

We'll add this sensor to our `Definitions` object.

```python file=../../airlift-federation-tutorial/snippets/federated_execution.py startafter=start_complete_defs endbefore=end_complete_defs
defs = Definitions(
assets=[load_customers_dag_asset, run_customer_metrics],
sensors=[warehouse_sensor, metrics_sensor, automation_sensor],
)
```

Now the `run_customer_metrics` function will be executed whenever the `load_customers` asset is materialized. Let's test this out by triggering a run of the `load_customers` DAG in Airflow. When the run completes, we should see a materialization of the `customer_metrics` asset kick off in the Dagster UI, and eventually a run of the `customer_metrics` DAG in the metrics Airflow instance.

## Complete code

When all the above steps are complete, your code should look something like this.

```python file=../../airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/executable_and_da.py
from dagster import (
AutomationConditionSensorDefinition,
DefaultSensorStatus,
Definitions,
MaterializeResult,
multi_asset,
)
from dagster._core.definitions.asset_spec import replace_attributes
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
build_airflow_polling_sensor,
load_airflow_dag_asset_specs,
)

warehouse_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8081",
username="admin",
password="admin",
),
name="warehouse",
)

metrics_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8082",
username="admin",
password="admin",
),
name="metrics",
)

load_customers_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=warehouse_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "load_customers",
)
)
)
customer_metrics_dag_asset = replace_attributes(
next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
)
# Add a dependency on the load_customers_dag_asset
),
deps=[load_customers_dag_asset],
automation_condition=AutomationCondition.eager(),
)


@multi_asset(specs=[customer_metrics_dag_asset])
def run_customer_metrics() -> MaterializeResult:
run_id = metrics_airflow_instance.trigger_dag("customer_metrics")
metrics_airflow_instance.wait_for_run_completion("customer_metrics", run_id)
if metrics_airflow_instance.get_run_state("customer_metrics", run_id) == "success":
return MaterializeResult(asset_key=customer_metrics_dag_asset.key)
else:
raise Exception("Dag run failed.")


warehouse_sensor = build_airflow_polling_sensor(
mapped_assets=[load_customers_dag_asset],
airflow_instance=warehouse_airflow_instance,
)
metrics_sensor = build_airflow_polling_sensor(
mapped_assets=[customer_metrics_dag_asset],
airflow_instance=metrics_airflow_instance,
)

automation_sensor = AutomationConditionSensorDefinition(
name="automation_sensor",
target="*",
default_status=DefaultSensorStatus.RUNNING,
minimum_interval_seconds=1,
)

defs = Definitions(
assets=[load_customers_dag_asset, run_customer_metrics],
sensors=[warehouse_sensor, metrics_sensor, automation_sensor],
)
```

## Conclusion

That concludes the tutorial! We've federated the execution of our DAGs across two Airflow instances using Dagster's Declarative Automation system. We've also set up cross-instance lineage for our DAGs, and can now observe the lineage and execution of our DAGs in the Dagster UI.
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,7 @@ src="/images/integrations/airlift/dag_lineage.png"
width={320}
height={198}
/>

## Next steps

Next, we'll federate the execution of our DAGs across both Airflow instances. Follow along [here](/integrations/airlift/federation-tutorial/federated-execution).
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,12 @@ Two DAGs have been causing a lot of pain lately for the team: `warehouse.load_cu
title="Setup"
href="/integrations/airlift/federation-tutorial/setup"
></ArticleListItem>
<ArticleListItem
title="Observing DAGs"
href="/integrations/airlift/federation-tutorial/observe"
></ArticleListItem>
<ArticleListItem
title="Federating Execution"
href="/integrations/airlift/federation-tutorial/federated-execution"
></ArticleListItem>
</ArticleList>
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,4 @@ height={300}

## Next steps

In the next section, we'll add asset representations of our DAGs, and set up lineage across both Airflow instances. Follow along [here](/integrations/airlift/federation-tutorial/observe).
In the next section, we'll add asset representations of our DAGs, and set up lineage across both Airflow instances. Follow along [here](/integrations/airlift/federation-tutorial/observe).
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,5 @@ def run_customer_metrics() -> MaterializeResult:

defs = Definitions(
assets=[load_customers_dag_asset, run_customer_metrics],
sensors=[warehouse_sensor, metrics_sensor],
sensors=[warehouse_sensor, metrics_sensor, automation_sensor],
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
OBSERVE_COMPLETE_FILE = ORIG_DEFS_FILE.parent / "stages" / "observe_complete.py"
OBSERVE_WITH_DEPS_FILE = ORIG_DEFS_FILE.parent / "stages" / "observe_with_deps.py"
OBSERVE_SNIPPETS_FILE = SNIPPETS_DIR / "observe.py"
EXECUTABLE_AND_DA_FILE = ORIG_DEFS_FILE.parent / "stages" / "executable_and_da.py"
FEDERATED_EXECUTION_SNIPPETS_FILE = SNIPPETS_DIR / "federated_execution.py"


@pytest.fixture
Expand Down Expand Up @@ -47,7 +49,13 @@ def dagster_fixture(

@pytest.mark.parametrize(
"stage_file",
[OBSERVE_COMPLETE_FILE, OBSERVE_WITH_DEPS_FILE, OBSERVE_SNIPPETS_FILE],
[
OBSERVE_COMPLETE_FILE,
OBSERVE_WITH_DEPS_FILE,
OBSERVE_SNIPPETS_FILE,
FEDERATED_EXECUTION_SNIPPETS_FILE,
EXECUTABLE_AND_DA_FILE,
],
indirect=True,
)
def test_stage(dagster_dev: subprocess.Popen, stage_file: Path) -> None:
Expand Down
111 changes: 111 additions & 0 deletions examples/airlift-federation-tutorial/snippets/federated_execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from dagster import (
AutomationConditionSensorDefinition,
DefaultSensorStatus,
Definitions,
MaterializeResult,
multi_asset,
)
from dagster._core.definitions.asset_spec import replace_attributes
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
build_airflow_polling_sensor,
load_airflow_dag_asset_specs,
)

warehouse_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8081",
username="admin",
password="admin",
),
name="warehouse",
)

metrics_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8082",
username="admin",
password="admin",
),
name="metrics",
)

load_customers_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=warehouse_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "load_customers",
)
)
)
customer_metrics_dag_asset = replace_attributes(
next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
)
),
deps=[load_customers_dag_asset],
automation_condition=AutomationCondition.eager(),
)

warehouse_sensor = build_airflow_polling_sensor(
mapped_assets=[load_customers_dag_asset],
airflow_instance=warehouse_airflow_instance,
)
metrics_sensor = build_airflow_polling_sensor(
mapped_assets=[customer_metrics_dag_asset],
airflow_instance=metrics_airflow_instance,
)


# start_multi_asset
@multi_asset(specs=[customer_metrics_dag_asset])
def run_customer_metrics() -> MaterializeResult:
run_id = metrics_airflow_instance.trigger_dag("customer_metrics")
metrics_airflow_instance.wait_for_run_completion("customer_metrics", run_id)
if metrics_airflow_instance.get_run_state("customer_metrics", run_id) == "success":
return MaterializeResult(asset_key=customer_metrics_dag_asset.key)
else:
raise Exception("Dag run failed.")


# end_multi_asset

# start_multi_asset_defs
defs = Definitions(
assets=[load_customers_dag_asset, run_customer_metrics],
sensors=[warehouse_sensor, metrics_sensor],
)
# end_multi_asset_defs

# start_eager
from dagster import AutomationCondition

customer_metrics_dag_asset = replace_attributes(
customer_metrics_dag_asset,
automation_condition=AutomationCondition.eager(),
)
# end_eager

# start_automation_sensor
automation_sensor = AutomationConditionSensorDefinition(
name="automation_sensor",
target="*",
default_status=DefaultSensorStatus.RUNNING,
minimum_interval_seconds=1,
)
# end_automation_sensor

# start_complete_defs
defs = Definitions(
assets=[load_customers_dag_asset, run_customer_metrics],
sensors=[warehouse_sensor, metrics_sensor, automation_sensor],
)
# end_complete_defs

0 comments on commit 9fd24a1

Please sign in to comment.