diff --git a/docs/content/_navigation.json b/docs/content/_navigation.json
index 252e586b3aa90..97ecfc6bedbaf 100644
--- a/docs/content/_navigation.json
+++ b/docs/content/_navigation.json
@@ -953,6 +953,10 @@
{
"title": "Part 2: Observe dag lineage in Dagster",
"path": "/integrations/airlift/federation-tutorial/observe"
+ },
+ {
+ "title": "Part 3: Federate across Airflow instances",
+ "path": "/integrations/airlift/federation-tutorial/federated-execution"
}
]
},
diff --git a/docs/content/integrations/airlift/federation-tutorial/federated-execution.mdx b/docs/content/integrations/airlift/federation-tutorial/federated-execution.mdx
new file mode 100644
index 0000000000000..8bb08785362d5
--- /dev/null
+++ b/docs/content/integrations/airlift/federation-tutorial/federated-execution.mdx
@@ -0,0 +1,166 @@
+# Airlift Federation Tutorial: Federating Execution Across Airflow Instances
+
+At this point, we should be [observing our DAGs within Dagster](/integrations/airlift/federation-tutorial/observe), and now we have cross-instance lineage for our DAGs. Now, we'll federate the execution of our DAGs across both Airflow instances by using Dagster's Declarative Automation system.
+
+## Making `customer_metrics` executable.
+
+The `load_airflow_dag_asset_specs` function creates asset representations (called `AssetSpec`) of Airflow DAGs, but these assets are not executable. We need to define an execution function in Dagster in order to make them executable.
+
+In order to federate execution of `customer_metrics`, we first need to make it executable within Dagster. We can do this by using the `@multi_asset` decorator to define how the `customer_metrics` asset should be executed. We'll use the `AirflowInstance` defined earlier to trigger a run of the `customer_metrics` DAG. We then wait for the run to complete, and if it is successful, we'll successfully materialize the asset. If the run fails, we'll raise an exception.
+
+```python file=../../airlift-federation-tutorial/snippets/federated_execution.py startafter=start_multi_asset endbefore=end_multi_asset
+@multi_asset(specs=[customer_metrics_dag_asset])
+def run_customer_metrics() -> MaterializeResult:
+ run_id = metrics_airflow_instance.trigger_dag("customer_metrics")
+ metrics_airflow_instance.wait_for_run_completion("customer_metrics", run_id)
+ if metrics_airflow_instance.get_run_state("customer_metrics", run_id) == "success":
+ return MaterializeResult(asset_key=customer_metrics_dag_asset.key)
+ else:
+ raise Exception("Dag run failed.")
+```
+
+Now, we'll replace the `customer_metrics_dag_asset` in our `Definitions` object with the `run_customer_metrics` function:
+
+```python file=../../airlift-federation-tutorial/snippets/federated_execution.py startafter=start_multi_asset_defs endbefore=end_multi_asset_defs
+defs = Definitions(
+ assets=[load_customers_dag_asset, run_customer_metrics],
+ sensors=[warehouse_sensor, metrics_sensor],
+)
+```
+
+We should be able to go to the Dagster UI and see that the `customer_metrics` asset can now be materialized.
+
+## Federating execution
+
+Ultimately, we would like to kick off a run of `customer_metrics` whenever `load_customers` completes successfully. We're already retrieving a materialization when `load_customers` completes, so we can use this to trigger a run of `customer_metrics` by using Declarative Automation. First, we'll add an `AutomationCondition.eager()` to our `customer_metrics_dag_asset`. This will tell Dagster to run the `run_customer_metrics` function whenever the `load_customers` asset is materialized.
+
+```python file=../../airlift-federation-tutorial/snippets/federated_execution.py startafter=start_eager endbefore=end_eager
+from dagster import AutomationCondition
+
+customer_metrics_dag_asset = replace_attributes(
+ customer_metrics_dag_asset,
+ automation_condition=AutomationCondition.eager(),
+)
+```
+
+Now, we can set up Declarative Automation by adding an `AutomationConditionSensorDefinition`.
+
+```python file=../../airlift-federation-tutorial/snippets/federated_execution.py startafter=start_automation_sensor endbefore=end_automation_sensor
+automation_sensor = AutomationConditionSensorDefinition(
+ name="automation_sensor",
+ target="*",
+ default_status=DefaultSensorStatus.RUNNING,
+ minimum_interval_seconds=1,
+)
+```
+
+We'll add this sensor to our `Definitions` object.
+
+```python file=../../airlift-federation-tutorial/snippets/federated_execution.py startafter=start_complete_defs endbefore=end_complete_defs
+defs = Definitions(
+ assets=[load_customers_dag_asset, run_customer_metrics],
+ sensors=[warehouse_sensor, metrics_sensor, automation_sensor],
+)
+```
+
+Now the `run_customer_metrics` function will be executed whenever the `load_customers` asset is materialized. Let's test this out by triggering a run of the `load_customers` DAG in Airflow. When the run completes, we should see a materialization of the `customer_metrics` asset kick off in the Dagster UI, and eventually a run of the `customer_metrics` DAG in the metrics Airflow instance.
+
+## Complete code
+
+When all the above steps are complete, your code should look something like this.
+
+```python file=../../airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/executable_and_da.py
+from dagster import (
+ AutomationConditionSensorDefinition,
+ DefaultSensorStatus,
+ Definitions,
+ MaterializeResult,
+ multi_asset,
+)
+from dagster._core.definitions.asset_spec import replace_attributes
+from dagster._core.definitions.declarative_automation.automation_condition import (
+ AutomationCondition,
+)
+from dagster_airlift.core import (
+ AirflowBasicAuthBackend,
+ AirflowInstance,
+ build_airflow_polling_sensor,
+ load_airflow_dag_asset_specs,
+)
+
+warehouse_airflow_instance = AirflowInstance(
+ auth_backend=AirflowBasicAuthBackend(
+ webserver_url="http://localhost:8081",
+ username="admin",
+ password="admin",
+ ),
+ name="warehouse",
+)
+
+metrics_airflow_instance = AirflowInstance(
+ auth_backend=AirflowBasicAuthBackend(
+ webserver_url="http://localhost:8082",
+ username="admin",
+ password="admin",
+ ),
+ name="metrics",
+)
+
+load_customers_dag_asset = next(
+ iter(
+ load_airflow_dag_asset_specs(
+ airflow_instance=warehouse_airflow_instance,
+ dag_selector_fn=lambda dag: dag.dag_id == "load_customers",
+ )
+ )
+)
+customer_metrics_dag_asset = replace_attributes(
+ next(
+ iter(
+ load_airflow_dag_asset_specs(
+ airflow_instance=metrics_airflow_instance,
+ dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
+ )
+ )
+ # Add a dependency on the load_customers_dag_asset
+ ),
+ deps=[load_customers_dag_asset],
+ automation_condition=AutomationCondition.eager(),
+)
+
+
+@multi_asset(specs=[customer_metrics_dag_asset])
+def run_customer_metrics() -> MaterializeResult:
+ run_id = metrics_airflow_instance.trigger_dag("customer_metrics")
+ metrics_airflow_instance.wait_for_run_completion("customer_metrics", run_id)
+ if metrics_airflow_instance.get_run_state("customer_metrics", run_id) == "success":
+ return MaterializeResult(asset_key=customer_metrics_dag_asset.key)
+ else:
+ raise Exception("Dag run failed.")
+
+
+warehouse_sensor = build_airflow_polling_sensor(
+ mapped_assets=[load_customers_dag_asset],
+ airflow_instance=warehouse_airflow_instance,
+)
+metrics_sensor = build_airflow_polling_sensor(
+ mapped_assets=[customer_metrics_dag_asset],
+ airflow_instance=metrics_airflow_instance,
+)
+
+automation_sensor = AutomationConditionSensorDefinition(
+ name="automation_sensor",
+ target="*",
+ default_status=DefaultSensorStatus.RUNNING,
+ minimum_interval_seconds=1,
+)
+
+defs = Definitions(
+ assets=[load_customers_dag_asset, run_customer_metrics],
+ sensors=[warehouse_sensor, metrics_sensor, automation_sensor],
+)
+```
+
+## Conclusion
+
+That concludes the tutorial! We've federated the execution of our DAGs across two Airflow instances using Dagster's Declarative Automation system. We've also set up cross-instance lineage for our DAGs, and can now observe the lineage and execution of our DAGs in the Dagster UI.
diff --git a/docs/content/integrations/airlift/federation-tutorial/observe.mdx b/docs/content/integrations/airlift/federation-tutorial/observe.mdx
index e0d156dc4b956..416d56093a32d 100644
--- a/docs/content/integrations/airlift/federation-tutorial/observe.mdx
+++ b/docs/content/integrations/airlift/federation-tutorial/observe.mdx
@@ -206,3 +206,7 @@ src="/images/integrations/airlift/dag_lineage.png"
width={320}
height={198}
/>
+
+## Next steps
+
+Next, we'll federate the execution of our DAGs across both Airflow instances. Follow along [here](/integrations/airlift/federation-tutorial/federated-execution).
diff --git a/docs/content/integrations/airlift/federation-tutorial/overview.mdx b/docs/content/integrations/airlift/federation-tutorial/overview.mdx
index 4c6080022eea4..b510d805536d4 100644
--- a/docs/content/integrations/airlift/federation-tutorial/overview.mdx
+++ b/docs/content/integrations/airlift/federation-tutorial/overview.mdx
@@ -26,4 +26,12 @@ Two DAGs have been causing a lot of pain lately for the team: `warehouse.load_cu
title="Setup"
href="/integrations/airlift/federation-tutorial/setup"
>
+
+
diff --git a/docs/content/integrations/airlift/federation-tutorial/setup.mdx b/docs/content/integrations/airlift/federation-tutorial/setup.mdx
index 4d500712ff91e..320f640f89b13 100644
--- a/docs/content/integrations/airlift/federation-tutorial/setup.mdx
+++ b/docs/content/integrations/airlift/federation-tutorial/setup.mdx
@@ -77,4 +77,4 @@ height={300}
## Next steps
-In the next section, we'll add asset representations of our DAGs, and set up lineage across both Airflow instances. Follow along [here](/integrations/airlift/federation-tutorial/observe).
\ No newline at end of file
+In the next section, we'll add asset representations of our DAGs, and set up lineage across both Airflow instances. Follow along [here](/integrations/airlift/federation-tutorial/observe).
diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/executable_and_da.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/executable_and_da.py
index f9518e55343d3..ff91d7544f4cf 100644
--- a/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/executable_and_da.py
+++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/executable_and_da.py
@@ -85,5 +85,5 @@ def run_customer_metrics() -> MaterializeResult:
defs = Definitions(
assets=[load_customers_dag_asset, run_customer_metrics],
- sensors=[warehouse_sensor, metrics_sensor],
+ sensors=[warehouse_sensor, metrics_sensor, automation_sensor],
)
diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_specs_stage.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_stage.py
similarity index 83%
rename from examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_specs_stage.py
rename to examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_stage.py
index b5f05f123c95a..ba784476f9517 100644
--- a/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_specs_stage.py
+++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_stage.py
@@ -16,6 +16,8 @@
OBSERVE_COMPLETE_FILE = ORIG_DEFS_FILE.parent / "stages" / "observe_complete.py"
OBSERVE_WITH_DEPS_FILE = ORIG_DEFS_FILE.parent / "stages" / "observe_with_deps.py"
OBSERVE_SNIPPETS_FILE = SNIPPETS_DIR / "observe.py"
+EXECUTABLE_AND_DA_FILE = ORIG_DEFS_FILE.parent / "stages" / "executable_and_da.py"
+FEDERATED_EXECUTION_SNIPPETS_FILE = SNIPPETS_DIR / "federated_execution.py"
@pytest.fixture
@@ -47,7 +49,13 @@ def dagster_fixture(
@pytest.mark.parametrize(
"stage_file",
- [OBSERVE_COMPLETE_FILE, OBSERVE_WITH_DEPS_FILE, OBSERVE_SNIPPETS_FILE],
+ [
+ OBSERVE_COMPLETE_FILE,
+ OBSERVE_WITH_DEPS_FILE,
+ OBSERVE_SNIPPETS_FILE,
+ FEDERATED_EXECUTION_SNIPPETS_FILE,
+ EXECUTABLE_AND_DA_FILE,
+ ],
indirect=True,
)
def test_stage(dagster_dev: subprocess.Popen, stage_file: Path) -> None:
diff --git a/examples/airlift-federation-tutorial/snippets/federated_execution.py b/examples/airlift-federation-tutorial/snippets/federated_execution.py
new file mode 100644
index 0000000000000..d28e126679b94
--- /dev/null
+++ b/examples/airlift-federation-tutorial/snippets/federated_execution.py
@@ -0,0 +1,111 @@
+from dagster import (
+ AutomationConditionSensorDefinition,
+ DefaultSensorStatus,
+ Definitions,
+ MaterializeResult,
+ multi_asset,
+)
+from dagster._core.definitions.asset_spec import replace_attributes
+from dagster._core.definitions.declarative_automation.automation_condition import (
+ AutomationCondition,
+)
+from dagster_airlift.core import (
+ AirflowBasicAuthBackend,
+ AirflowInstance,
+ build_airflow_polling_sensor,
+ load_airflow_dag_asset_specs,
+)
+
+warehouse_airflow_instance = AirflowInstance(
+ auth_backend=AirflowBasicAuthBackend(
+ webserver_url="http://localhost:8081",
+ username="admin",
+ password="admin",
+ ),
+ name="warehouse",
+)
+
+metrics_airflow_instance = AirflowInstance(
+ auth_backend=AirflowBasicAuthBackend(
+ webserver_url="http://localhost:8082",
+ username="admin",
+ password="admin",
+ ),
+ name="metrics",
+)
+
+load_customers_dag_asset = next(
+ iter(
+ load_airflow_dag_asset_specs(
+ airflow_instance=warehouse_airflow_instance,
+ dag_selector_fn=lambda dag: dag.dag_id == "load_customers",
+ )
+ )
+)
+customer_metrics_dag_asset = replace_attributes(
+ next(
+ iter(
+ load_airflow_dag_asset_specs(
+ airflow_instance=metrics_airflow_instance,
+ dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
+ )
+ )
+ ),
+ deps=[load_customers_dag_asset],
+ automation_condition=AutomationCondition.eager(),
+)
+
+warehouse_sensor = build_airflow_polling_sensor(
+ mapped_assets=[load_customers_dag_asset],
+ airflow_instance=warehouse_airflow_instance,
+)
+metrics_sensor = build_airflow_polling_sensor(
+ mapped_assets=[customer_metrics_dag_asset],
+ airflow_instance=metrics_airflow_instance,
+)
+
+
+# start_multi_asset
+@multi_asset(specs=[customer_metrics_dag_asset])
+def run_customer_metrics() -> MaterializeResult:
+ run_id = metrics_airflow_instance.trigger_dag("customer_metrics")
+ metrics_airflow_instance.wait_for_run_completion("customer_metrics", run_id)
+ if metrics_airflow_instance.get_run_state("customer_metrics", run_id) == "success":
+ return MaterializeResult(asset_key=customer_metrics_dag_asset.key)
+ else:
+ raise Exception("Dag run failed.")
+
+
+# end_multi_asset
+
+# start_multi_asset_defs
+defs = Definitions(
+ assets=[load_customers_dag_asset, run_customer_metrics],
+ sensors=[warehouse_sensor, metrics_sensor],
+)
+# end_multi_asset_defs
+
+# start_eager
+from dagster import AutomationCondition
+
+customer_metrics_dag_asset = replace_attributes(
+ customer_metrics_dag_asset,
+ automation_condition=AutomationCondition.eager(),
+)
+# end_eager
+
+# start_automation_sensor
+automation_sensor = AutomationConditionSensorDefinition(
+ name="automation_sensor",
+ target="*",
+ default_status=DefaultSensorStatus.RUNNING,
+ minimum_interval_seconds=1,
+)
+# end_automation_sensor
+
+# start_complete_defs
+defs = Definitions(
+ assets=[load_customers_dag_asset, run_customer_metrics],
+ sensors=[warehouse_sensor, metrics_sensor, automation_sensor],
+)
+# end_complete_defs