From 6a65363f06ae2bce588ccc92cda381e94b2f18ed Mon Sep 17 00:00:00 2001 From: Chris DeCarolis Date: Tue, 12 Nov 2024 16:39:09 -0800 Subject: [PATCH] [dagster-airlift] Use declarative automation --- docs/content/_navigation.json | 4 + .../federated-execution.mdx | 166 ++++++++++++++++++ .../airlift/federation-tutorial/observe.mdx | 4 + .../airlift/federation-tutorial/overview.mdx | 8 + .../airlift/federation-tutorial/setup.mdx | 2 +- .../dagster_defs/stages/executable_and_da.py | 2 +- .../{test_specs_stage.py => test_stage.py} | 10 +- .../snippets/federated_execution.py | 111 ++++++++++++ 8 files changed, 304 insertions(+), 3 deletions(-) create mode 100644 docs/content/integrations/airlift/federation-tutorial/federated-execution.mdx rename examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/{test_specs_stage.py => test_stage.py} (83%) create mode 100644 examples/airlift-federation-tutorial/snippets/federated_execution.py diff --git a/docs/content/_navigation.json b/docs/content/_navigation.json index 252e586b3aa90..97ecfc6bedbaf 100644 --- a/docs/content/_navigation.json +++ b/docs/content/_navigation.json @@ -953,6 +953,10 @@ { "title": "Part 2: Observe dag lineage in Dagster", "path": "/integrations/airlift/federation-tutorial/observe" + }, + { + "title": "Part 3: Federate across Airflow instances", + "path": "/integrations/airlift/federation-tutorial/federated-execution" } ] }, diff --git a/docs/content/integrations/airlift/federation-tutorial/federated-execution.mdx b/docs/content/integrations/airlift/federation-tutorial/federated-execution.mdx new file mode 100644 index 0000000000000..8bb08785362d5 --- /dev/null +++ b/docs/content/integrations/airlift/federation-tutorial/federated-execution.mdx @@ -0,0 +1,166 @@ +# Airlift Federation Tutorial: Federating Execution Across Airflow Instances + +At this point, we should be [observing our DAGs within Dagster](/integrations/airlift/federation-tutorial/observe), and now we have cross-instance lineage for our DAGs. Now, we'll federate the execution of our DAGs across both Airflow instances by using Dagster's Declarative Automation system. + +## Making `customer_metrics` executable. + +The `load_airflow_dag_asset_specs` function creates asset representations (called `AssetSpec`) of Airflow DAGs, but these assets are not executable. We need to define an execution function in Dagster in order to make them executable. + +In order to federate execution of `customer_metrics`, we first need to make it executable within Dagster. We can do this by using the `@multi_asset` decorator to define how the `customer_metrics` asset should be executed. We'll use the `AirflowInstance` defined earlier to trigger a run of the `customer_metrics` DAG. We then wait for the run to complete, and if it is successful, we'll successfully materialize the asset. If the run fails, we'll raise an exception. + +```python file=../../airlift-federation-tutorial/snippets/federated_execution.py startafter=start_multi_asset endbefore=end_multi_asset +@multi_asset(specs=[customer_metrics_dag_asset]) +def run_customer_metrics() -> MaterializeResult: + run_id = metrics_airflow_instance.trigger_dag("customer_metrics") + metrics_airflow_instance.wait_for_run_completion("customer_metrics", run_id) + if metrics_airflow_instance.get_run_state("customer_metrics", run_id) == "success": + return MaterializeResult(asset_key=customer_metrics_dag_asset.key) + else: + raise Exception("Dag run failed.") +``` + +Now, we'll replace the `customer_metrics_dag_asset` in our `Definitions` object with the `run_customer_metrics` function: + +```python file=../../airlift-federation-tutorial/snippets/federated_execution.py startafter=start_multi_asset_defs endbefore=end_multi_asset_defs +defs = Definitions( + assets=[load_customers_dag_asset, run_customer_metrics], + sensors=[warehouse_sensor, metrics_sensor], +) +``` + +We should be able to go to the Dagster UI and see that the `customer_metrics` asset can now be materialized. + +## Federating execution + +Ultimately, we would like to kick off a run of `customer_metrics` whenever `load_customers` completes successfully. We're already retrieving a materialization when `load_customers` completes, so we can use this to trigger a run of `customer_metrics` by using Declarative Automation. First, we'll add an `AutomationCondition.eager()` to our `customer_metrics_dag_asset`. This will tell Dagster to run the `run_customer_metrics` function whenever the `load_customers` asset is materialized. + +```python file=../../airlift-federation-tutorial/snippets/federated_execution.py startafter=start_eager endbefore=end_eager +from dagster import AutomationCondition + +customer_metrics_dag_asset = replace_attributes( + customer_metrics_dag_asset, + automation_condition=AutomationCondition.eager(), +) +``` + +Now, we can set up Declarative Automation by adding an `AutomationConditionSensorDefinition`. + +```python file=../../airlift-federation-tutorial/snippets/federated_execution.py startafter=start_automation_sensor endbefore=end_automation_sensor +automation_sensor = AutomationConditionSensorDefinition( + name="automation_sensor", + target="*", + default_status=DefaultSensorStatus.RUNNING, + minimum_interval_seconds=1, +) +``` + +We'll add this sensor to our `Definitions` object. + +```python file=../../airlift-federation-tutorial/snippets/federated_execution.py startafter=start_complete_defs endbefore=end_complete_defs +defs = Definitions( + assets=[load_customers_dag_asset, run_customer_metrics], + sensors=[warehouse_sensor, metrics_sensor, automation_sensor], +) +``` + +Now the `run_customer_metrics` function will be executed whenever the `load_customers` asset is materialized. Let's test this out by triggering a run of the `load_customers` DAG in Airflow. When the run completes, we should see a materialization of the `customer_metrics` asset kick off in the Dagster UI, and eventually a run of the `customer_metrics` DAG in the metrics Airflow instance. + +## Complete code + +When all the above steps are complete, your code should look something like this. + +```python file=../../airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/executable_and_da.py +from dagster import ( + AutomationConditionSensorDefinition, + DefaultSensorStatus, + Definitions, + MaterializeResult, + multi_asset, +) +from dagster._core.definitions.asset_spec import replace_attributes +from dagster._core.definitions.declarative_automation.automation_condition import ( + AutomationCondition, +) +from dagster_airlift.core import ( + AirflowBasicAuthBackend, + AirflowInstance, + build_airflow_polling_sensor, + load_airflow_dag_asset_specs, +) + +warehouse_airflow_instance = AirflowInstance( + auth_backend=AirflowBasicAuthBackend( + webserver_url="http://localhost:8081", + username="admin", + password="admin", + ), + name="warehouse", +) + +metrics_airflow_instance = AirflowInstance( + auth_backend=AirflowBasicAuthBackend( + webserver_url="http://localhost:8082", + username="admin", + password="admin", + ), + name="metrics", +) + +load_customers_dag_asset = next( + iter( + load_airflow_dag_asset_specs( + airflow_instance=warehouse_airflow_instance, + dag_selector_fn=lambda dag: dag.dag_id == "load_customers", + ) + ) +) +customer_metrics_dag_asset = replace_attributes( + next( + iter( + load_airflow_dag_asset_specs( + airflow_instance=metrics_airflow_instance, + dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics", + ) + ) + # Add a dependency on the load_customers_dag_asset + ), + deps=[load_customers_dag_asset], + automation_condition=AutomationCondition.eager(), +) + + +@multi_asset(specs=[customer_metrics_dag_asset]) +def run_customer_metrics() -> MaterializeResult: + run_id = metrics_airflow_instance.trigger_dag("customer_metrics") + metrics_airflow_instance.wait_for_run_completion("customer_metrics", run_id) + if metrics_airflow_instance.get_run_state("customer_metrics", run_id) == "success": + return MaterializeResult(asset_key=customer_metrics_dag_asset.key) + else: + raise Exception("Dag run failed.") + + +warehouse_sensor = build_airflow_polling_sensor( + mapped_assets=[load_customers_dag_asset], + airflow_instance=warehouse_airflow_instance, +) +metrics_sensor = build_airflow_polling_sensor( + mapped_assets=[customer_metrics_dag_asset], + airflow_instance=metrics_airflow_instance, +) + +automation_sensor = AutomationConditionSensorDefinition( + name="automation_sensor", + target="*", + default_status=DefaultSensorStatus.RUNNING, + minimum_interval_seconds=1, +) + +defs = Definitions( + assets=[load_customers_dag_asset, run_customer_metrics], + sensors=[warehouse_sensor, metrics_sensor, automation_sensor], +) +``` + +## Conclusion + +That concludes the tutorial! We've federated the execution of our DAGs across two Airflow instances using Dagster's Declarative Automation system. We've also set up cross-instance lineage for our DAGs, and can now observe the lineage and execution of our DAGs in the Dagster UI. diff --git a/docs/content/integrations/airlift/federation-tutorial/observe.mdx b/docs/content/integrations/airlift/federation-tutorial/observe.mdx index e0d156dc4b956..416d56093a32d 100644 --- a/docs/content/integrations/airlift/federation-tutorial/observe.mdx +++ b/docs/content/integrations/airlift/federation-tutorial/observe.mdx @@ -206,3 +206,7 @@ src="/images/integrations/airlift/dag_lineage.png" width={320} height={198} /> + +## Next steps + +Next, we'll federate the execution of our DAGs across both Airflow instances. Follow along [here](/integrations/airlift/federation-tutorial/federated-execution). diff --git a/docs/content/integrations/airlift/federation-tutorial/overview.mdx b/docs/content/integrations/airlift/federation-tutorial/overview.mdx index 4c6080022eea4..b510d805536d4 100644 --- a/docs/content/integrations/airlift/federation-tutorial/overview.mdx +++ b/docs/content/integrations/airlift/federation-tutorial/overview.mdx @@ -26,4 +26,12 @@ Two DAGs have been causing a lot of pain lately for the team: `warehouse.load_cu title="Setup" href="/integrations/airlift/federation-tutorial/setup" > + + diff --git a/docs/content/integrations/airlift/federation-tutorial/setup.mdx b/docs/content/integrations/airlift/federation-tutorial/setup.mdx index 4d500712ff91e..320f640f89b13 100644 --- a/docs/content/integrations/airlift/federation-tutorial/setup.mdx +++ b/docs/content/integrations/airlift/federation-tutorial/setup.mdx @@ -77,4 +77,4 @@ height={300} ## Next steps -In the next section, we'll add asset representations of our DAGs, and set up lineage across both Airflow instances. Follow along [here](/integrations/airlift/federation-tutorial/observe). \ No newline at end of file +In the next section, we'll add asset representations of our DAGs, and set up lineage across both Airflow instances. Follow along [here](/integrations/airlift/federation-tutorial/observe). diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/executable_and_da.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/executable_and_da.py index f9518e55343d3..ff91d7544f4cf 100644 --- a/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/executable_and_da.py +++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/executable_and_da.py @@ -85,5 +85,5 @@ def run_customer_metrics() -> MaterializeResult: defs = Definitions( assets=[load_customers_dag_asset, run_customer_metrics], - sensors=[warehouse_sensor, metrics_sensor], + sensors=[warehouse_sensor, metrics_sensor, automation_sensor], ) diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_specs_stage.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_stage.py similarity index 83% rename from examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_specs_stage.py rename to examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_stage.py index b5f05f123c95a..ba784476f9517 100644 --- a/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_specs_stage.py +++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_stage.py @@ -16,6 +16,8 @@ OBSERVE_COMPLETE_FILE = ORIG_DEFS_FILE.parent / "stages" / "observe_complete.py" OBSERVE_WITH_DEPS_FILE = ORIG_DEFS_FILE.parent / "stages" / "observe_with_deps.py" OBSERVE_SNIPPETS_FILE = SNIPPETS_DIR / "observe.py" +EXECUTABLE_AND_DA_FILE = ORIG_DEFS_FILE.parent / "stages" / "executable_and_da.py" +FEDERATED_EXECUTION_SNIPPETS_FILE = SNIPPETS_DIR / "federated_execution.py" @pytest.fixture @@ -47,7 +49,13 @@ def dagster_fixture( @pytest.mark.parametrize( "stage_file", - [OBSERVE_COMPLETE_FILE, OBSERVE_WITH_DEPS_FILE, OBSERVE_SNIPPETS_FILE], + [ + OBSERVE_COMPLETE_FILE, + OBSERVE_WITH_DEPS_FILE, + OBSERVE_SNIPPETS_FILE, + FEDERATED_EXECUTION_SNIPPETS_FILE, + EXECUTABLE_AND_DA_FILE, + ], indirect=True, ) def test_stage(dagster_dev: subprocess.Popen, stage_file: Path) -> None: diff --git a/examples/airlift-federation-tutorial/snippets/federated_execution.py b/examples/airlift-federation-tutorial/snippets/federated_execution.py new file mode 100644 index 0000000000000..d28e126679b94 --- /dev/null +++ b/examples/airlift-federation-tutorial/snippets/federated_execution.py @@ -0,0 +1,111 @@ +from dagster import ( + AutomationConditionSensorDefinition, + DefaultSensorStatus, + Definitions, + MaterializeResult, + multi_asset, +) +from dagster._core.definitions.asset_spec import replace_attributes +from dagster._core.definitions.declarative_automation.automation_condition import ( + AutomationCondition, +) +from dagster_airlift.core import ( + AirflowBasicAuthBackend, + AirflowInstance, + build_airflow_polling_sensor, + load_airflow_dag_asset_specs, +) + +warehouse_airflow_instance = AirflowInstance( + auth_backend=AirflowBasicAuthBackend( + webserver_url="http://localhost:8081", + username="admin", + password="admin", + ), + name="warehouse", +) + +metrics_airflow_instance = AirflowInstance( + auth_backend=AirflowBasicAuthBackend( + webserver_url="http://localhost:8082", + username="admin", + password="admin", + ), + name="metrics", +) + +load_customers_dag_asset = next( + iter( + load_airflow_dag_asset_specs( + airflow_instance=warehouse_airflow_instance, + dag_selector_fn=lambda dag: dag.dag_id == "load_customers", + ) + ) +) +customer_metrics_dag_asset = replace_attributes( + next( + iter( + load_airflow_dag_asset_specs( + airflow_instance=metrics_airflow_instance, + dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics", + ) + ) + ), + deps=[load_customers_dag_asset], + automation_condition=AutomationCondition.eager(), +) + +warehouse_sensor = build_airflow_polling_sensor( + mapped_assets=[load_customers_dag_asset], + airflow_instance=warehouse_airflow_instance, +) +metrics_sensor = build_airflow_polling_sensor( + mapped_assets=[customer_metrics_dag_asset], + airflow_instance=metrics_airflow_instance, +) + + +# start_multi_asset +@multi_asset(specs=[customer_metrics_dag_asset]) +def run_customer_metrics() -> MaterializeResult: + run_id = metrics_airflow_instance.trigger_dag("customer_metrics") + metrics_airflow_instance.wait_for_run_completion("customer_metrics", run_id) + if metrics_airflow_instance.get_run_state("customer_metrics", run_id) == "success": + return MaterializeResult(asset_key=customer_metrics_dag_asset.key) + else: + raise Exception("Dag run failed.") + + +# end_multi_asset + +# start_multi_asset_defs +defs = Definitions( + assets=[load_customers_dag_asset, run_customer_metrics], + sensors=[warehouse_sensor, metrics_sensor], +) +# end_multi_asset_defs + +# start_eager +from dagster import AutomationCondition + +customer_metrics_dag_asset = replace_attributes( + customer_metrics_dag_asset, + automation_condition=AutomationCondition.eager(), +) +# end_eager + +# start_automation_sensor +automation_sensor = AutomationConditionSensorDefinition( + name="automation_sensor", + target="*", + default_status=DefaultSensorStatus.RUNNING, + minimum_interval_seconds=1, +) +# end_automation_sensor + +# start_complete_defs +defs = Definitions( + assets=[load_customers_dag_asset, run_customer_metrics], + sensors=[warehouse_sensor, metrics_sensor, automation_sensor], +) +# end_complete_defs