diff --git a/docs/content/integrations/airlift/tutorial.mdx b/docs/content/integrations/airlift/tutorial.mdx index 8dcf6d2bd924d..cab04083e663f 100644 --- a/docs/content/integrations/airlift/tutorial.mdx +++ b/docs/content/integrations/airlift/tutorial.mdx @@ -202,7 +202,15 @@ Then, we will construct our assets: import os from pathlib import Path -from dagster import AssetExecutionContext, AssetSpec, Definitions +from dagster import ( + AssetCheckResult, + AssetCheckSeverity, + AssetExecutionContext, + AssetKey, + AssetSpec, + Definitions, + asset_check, +) from dagster_airlift.core import ( AirflowBasicAuthBackend, AirflowInstance, @@ -212,6 +220,28 @@ from dagster_airlift.core import ( from dagster_dbt import DbtCliResource, DbtProject, dbt_assets +@asset_check(asset=AssetKey(["airflow_instance_one", "dag", "rebuild_customers_list"])) +def validate_exported_csv() -> AssetCheckResult: + csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv" + + if not csv_path.exists(): + return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist") + + rows = len(csv_path.read_text().split("\n")) + if rows < 2: + return AssetCheckResult( + passed=False, + description=f"Export CSV {csv_path} is empty", + severity=AssetCheckSeverity.WARN, + ) + + return AssetCheckResult( + passed=True, + description=f"Export CSV {csv_path} exists", + metadata={"rows": rows}, + ) + + def dbt_project_path() -> Path: env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR") assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set" @@ -248,6 +278,7 @@ defs = build_defs_from_airflow_instance( defs=Definitions( assets=mapped_assets, resources={"dbt": DbtCliResource(project_dir=dbt_project_path())}, + asset_checks=[validate_exported_csv], ), ) ``` @@ -279,7 +310,16 @@ If your assets represent a time-partitioned data source, Airlift can automatical import os from pathlib import Path -from dagster import AssetExecutionContext, AssetSpec, DailyPartitionsDefinition, Definitions +from dagster import ( + AssetCheckResult, + AssetCheckSeverity, + AssetExecutionContext, + AssetKey, + AssetSpec, + DailyPartitionsDefinition, + Definitions, + asset_check, +) from dagster._time import get_current_datetime_midnight from dagster_airlift.core import ( AirflowBasicAuthBackend, @@ -292,6 +332,28 @@ from dagster_dbt import DbtCliResource, DbtProject, dbt_assets PARTITIONS_DEF = DailyPartitionsDefinition(start_date=get_current_datetime_midnight()) +@asset_check(asset=AssetKey(["customers_csv"])) +def validate_exported_csv() -> AssetCheckResult: + csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv" + + if not csv_path.exists(): + return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist") + + rows = len(csv_path.read_text().split("\n")) + if rows < 2: + return AssetCheckResult( + passed=False, + description=f"Export CSV {csv_path} is empty", + severity=AssetCheckSeverity.WARN, + ) + + return AssetCheckResult( + passed=True, + description=f"Export CSV {csv_path} exists", + metadata={"rows": rows}, + ) + + def dbt_project_path() -> Path: env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR") assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set" @@ -333,6 +395,7 @@ defs = build_defs_from_airflow_instance( defs=Definitions( assets=mapped_assets, resources={"dbt": DbtCliResource(project_dir=dbt_project_path())}, + asset_checks=[validate_exported_csv], ), ) ``` @@ -768,19 +831,17 @@ def validate_exported_csv() -> AssetCheckResult: ) -defs = Definitions.merge( - build_defs_from_airflow_instance( - airflow_instance=AirflowInstance( - # other backends available (e.g. MwaaSessionAuthBackend) - auth_backend=AirflowBasicAuthBackend( - webserver_url="http://localhost:8080", - username="admin", - password="admin", - ), - name="airflow_instance_one", - ) +defs = build_defs_from_airflow_instance( + airflow_instance=AirflowInstance( + # other backends available (e.g. MwaaSessionAuthBackend) + auth_backend=AirflowBasicAuthBackend( + webserver_url="http://localhost:8080", + username="admin", + password="admin", + ), + name="airflow_instance_one", ), - Definitions(asset_checks=[validate_exported_csv]), + defs=Definitions(asset_checks=[validate_exported_csv]), ) ``` diff --git a/docs/content/integrations/airlift/tutorial/observe.mdx b/docs/content/integrations/airlift/tutorial/observe.mdx index 92802551a40a5..89352d695d7b8 100644 --- a/docs/content/integrations/airlift/tutorial/observe.mdx +++ b/docs/content/integrations/airlift/tutorial/observe.mdx @@ -30,7 +30,15 @@ Then, we will construct our assets: import os from pathlib import Path -from dagster import AssetExecutionContext, AssetSpec, Definitions +from dagster import ( + AssetCheckResult, + AssetCheckSeverity, + AssetExecutionContext, + AssetKey, + AssetSpec, + Definitions, + asset_check, +) from dagster_airlift.core import ( AirflowBasicAuthBackend, AirflowInstance, @@ -40,6 +48,28 @@ from dagster_airlift.core import ( from dagster_dbt import DbtCliResource, DbtProject, dbt_assets +@asset_check(asset=AssetKey(["airflow_instance_one", "dag", "rebuild_customers_list"])) +def validate_exported_csv() -> AssetCheckResult: + csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv" + + if not csv_path.exists(): + return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist") + + rows = len(csv_path.read_text().split("\n")) + if rows < 2: + return AssetCheckResult( + passed=False, + description=f"Export CSV {csv_path} is empty", + severity=AssetCheckSeverity.WARN, + ) + + return AssetCheckResult( + passed=True, + description=f"Export CSV {csv_path} exists", + metadata={"rows": rows}, + ) + + def dbt_project_path() -> Path: env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR") assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set" @@ -76,6 +106,7 @@ defs = build_defs_from_airflow_instance( defs=Definitions( assets=mapped_assets, resources={"dbt": DbtCliResource(project_dir=dbt_project_path())}, + asset_checks=[validate_exported_csv], ), ) ``` @@ -99,6 +130,97 @@ Kicking off a run of the DAG in Airflow, you should see the newly created assets _Note: There will be some delay between task completion and assets materializing in Dagster, managed by the sensor. This sensor runs every 30 seconds by default (you can reduce down to one second via the `minimum_interval_seconds` argument to `sensor`), so there will be some delay._ +### Moving the asset check + +Now that we've introduced an asset explicitly for the `customers.csv` file output by the DAG, we should move the asset check constructed during the Peering step to instead be on the `customers_csv` asset. Simply change the `asset` targeted by the `@asset_check` decorator to be `AssetKey(["customers_csv"])`. Doing this ensures that even when we delete the DAG, the asset check will live on. + +When done, our code will look like this. + +```python file=../../experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe_check_on_asset.py +import os +from pathlib import Path + +from dagster import ( + AssetCheckResult, + AssetCheckSeverity, + AssetExecutionContext, + AssetKey, + AssetSpec, + Definitions, + asset_check, +) +from dagster_airlift.core import ( + AirflowBasicAuthBackend, + AirflowInstance, + assets_with_task_mappings, + build_defs_from_airflow_instance, +) +from dagster_dbt import DbtCliResource, DbtProject, dbt_assets + + +@asset_check(asset=AssetKey(["customers_csv"])) +def validate_exported_csv() -> AssetCheckResult: + csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv" + + if not csv_path.exists(): + return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist") + + rows = len(csv_path.read_text().split("\n")) + if rows < 2: + return AssetCheckResult( + passed=False, + description=f"Export CSV {csv_path} is empty", + severity=AssetCheckSeverity.WARN, + ) + + return AssetCheckResult( + passed=True, + description=f"Export CSV {csv_path} exists", + metadata={"rows": rows}, + ) + + +def dbt_project_path() -> Path: + env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR") + assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set" + return Path(env_val) + + +@dbt_assets( + manifest=dbt_project_path() / "target" / "manifest.json", + project=DbtProject(dbt_project_path()), +) +def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): + yield from dbt.cli(["build"], context=context).stream() + + +mapped_assets = assets_with_task_mappings( + dag_id="rebuild_customers_list", + task_mappings={ + "load_raw_customers": [AssetSpec(key=["raw_data", "raw_customers"])], + "build_dbt_models": [dbt_project_assets], + "export_customers": [AssetSpec(key="customers_csv", deps=["customers"])], + }, +) + + +defs = build_defs_from_airflow_instance( + airflow_instance=AirflowInstance( + auth_backend=AirflowBasicAuthBackend( + webserver_url="http://localhost:8080", + username="admin", + password="admin", + ), + name="airflow_instance_one", + ), + defs=Definitions( + assets=mapped_assets, + resources={"dbt": DbtCliResource(project_dir=dbt_project_path())}, + asset_checks=[validate_exported_csv], + ), +) +``` + ### Adding partitions If your assets represent a time-partitioned data source, Airlift can automatically associate your materializations to the relevant partitions. In the case of `rebuild_customers_list`, data is daily partitioned in each created table, and as a result we've added a `@daily` cron schedule to the DAG to make sure it runs every day. We can likewise add a `DailyPartitionsDefinition` to each of our assets. @@ -107,7 +229,16 @@ If your assets represent a time-partitioned data source, Airlift can automatical import os from pathlib import Path -from dagster import AssetExecutionContext, AssetSpec, DailyPartitionsDefinition, Definitions +from dagster import ( + AssetCheckResult, + AssetCheckSeverity, + AssetExecutionContext, + AssetKey, + AssetSpec, + DailyPartitionsDefinition, + Definitions, + asset_check, +) from dagster._time import get_current_datetime_midnight from dagster_airlift.core import ( AirflowBasicAuthBackend, @@ -120,6 +251,28 @@ from dagster_dbt import DbtCliResource, DbtProject, dbt_assets PARTITIONS_DEF = DailyPartitionsDefinition(start_date=get_current_datetime_midnight()) +@asset_check(asset=AssetKey(["customers_csv"])) +def validate_exported_csv() -> AssetCheckResult: + csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv" + + if not csv_path.exists(): + return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist") + + rows = len(csv_path.read_text().split("\n")) + if rows < 2: + return AssetCheckResult( + passed=False, + description=f"Export CSV {csv_path} is empty", + severity=AssetCheckSeverity.WARN, + ) + + return AssetCheckResult( + passed=True, + description=f"Export CSV {csv_path} exists", + metadata={"rows": rows}, + ) + + def dbt_project_path() -> Path: env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR") assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set" @@ -161,6 +314,7 @@ defs = build_defs_from_airflow_instance( defs=Definitions( assets=mapped_assets, resources={"dbt": DbtCliResource(project_dir=dbt_project_path())}, + asset_checks=[validate_exported_csv], ), ) ``` diff --git a/docs/content/integrations/airlift/tutorial/peer.mdx b/docs/content/integrations/airlift/tutorial/peer.mdx index e679cd0853555..23b1a22777376 100644 --- a/docs/content/integrations/airlift/tutorial/peer.mdx +++ b/docs/content/integrations/airlift/tutorial/peer.mdx @@ -130,19 +130,17 @@ def validate_exported_csv() -> AssetCheckResult: ) -defs = Definitions.merge( - build_defs_from_airflow_instance( - airflow_instance=AirflowInstance( - # other backends available (e.g. MwaaSessionAuthBackend) - auth_backend=AirflowBasicAuthBackend( - webserver_url="http://localhost:8080", - username="admin", - password="admin", - ), - name="airflow_instance_one", - ) +defs = build_defs_from_airflow_instance( + airflow_instance=AirflowInstance( + # other backends available (e.g. MwaaSessionAuthBackend) + auth_backend=AirflowBasicAuthBackend( + webserver_url="http://localhost:8080", + username="admin", + password="admin", + ), + name="airflow_instance_one", ), - Definitions(asset_checks=[validate_exported_csv]), + defs=Definitions(asset_checks=[validate_exported_csv]), ) ``` diff --git a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe.py b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe.py index ea1e8e65413ba..73af301732ee8 100644 --- a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe.py +++ b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe.py @@ -1,7 +1,15 @@ import os from pathlib import Path -from dagster import AssetExecutionContext, AssetSpec, Definitions +from dagster import ( + AssetCheckResult, + AssetCheckSeverity, + AssetExecutionContext, + AssetKey, + AssetSpec, + Definitions, + asset_check, +) from dagster_airlift.core import ( AirflowBasicAuthBackend, AirflowInstance, @@ -11,6 +19,28 @@ from dagster_dbt import DbtCliResource, DbtProject, dbt_assets +@asset_check(asset=AssetKey(["airflow_instance_one", "dag", "rebuild_customers_list"])) +def validate_exported_csv() -> AssetCheckResult: + csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv" + + if not csv_path.exists(): + return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist") + + rows = len(csv_path.read_text().split("\n")) + if rows < 2: + return AssetCheckResult( + passed=False, + description=f"Export CSV {csv_path} is empty", + severity=AssetCheckSeverity.WARN, + ) + + return AssetCheckResult( + passed=True, + description=f"Export CSV {csv_path} exists", + metadata={"rows": rows}, + ) + + def dbt_project_path() -> Path: env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR") assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set" @@ -47,5 +77,6 @@ def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): defs=Definitions( assets=mapped_assets, resources={"dbt": DbtCliResource(project_dir=dbt_project_path())}, + asset_checks=[validate_exported_csv], ), ) diff --git a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe_check_on_asset.py b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe_check_on_asset.py new file mode 100644 index 0000000000000..360950591cf82 --- /dev/null +++ b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe_check_on_asset.py @@ -0,0 +1,82 @@ +import os +from pathlib import Path + +from dagster import ( + AssetCheckResult, + AssetCheckSeverity, + AssetExecutionContext, + AssetKey, + AssetSpec, + Definitions, + asset_check, +) +from dagster_airlift.core import ( + AirflowBasicAuthBackend, + AirflowInstance, + assets_with_task_mappings, + build_defs_from_airflow_instance, +) +from dagster_dbt import DbtCliResource, DbtProject, dbt_assets + + +@asset_check(asset=AssetKey(["customers_csv"])) +def validate_exported_csv() -> AssetCheckResult: + csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv" + + if not csv_path.exists(): + return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist") + + rows = len(csv_path.read_text().split("\n")) + if rows < 2: + return AssetCheckResult( + passed=False, + description=f"Export CSV {csv_path} is empty", + severity=AssetCheckSeverity.WARN, + ) + + return AssetCheckResult( + passed=True, + description=f"Export CSV {csv_path} exists", + metadata={"rows": rows}, + ) + + +def dbt_project_path() -> Path: + env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR") + assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set" + return Path(env_val) + + +@dbt_assets( + manifest=dbt_project_path() / "target" / "manifest.json", + project=DbtProject(dbt_project_path()), +) +def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): + yield from dbt.cli(["build"], context=context).stream() + + +mapped_assets = assets_with_task_mappings( + dag_id="rebuild_customers_list", + task_mappings={ + "load_raw_customers": [AssetSpec(key=["raw_data", "raw_customers"])], + "build_dbt_models": [dbt_project_assets], + "export_customers": [AssetSpec(key="customers_csv", deps=["customers"])], + }, +) + + +defs = build_defs_from_airflow_instance( + airflow_instance=AirflowInstance( + auth_backend=AirflowBasicAuthBackend( + webserver_url="http://localhost:8080", + username="admin", + password="admin", + ), + name="airflow_instance_one", + ), + defs=Definitions( + assets=mapped_assets, + resources={"dbt": DbtCliResource(project_dir=dbt_project_path())}, + asset_checks=[validate_exported_csv], + ), +) diff --git a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe_with_partitions.py b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe_with_partitions.py index 264b9db1adefe..a9d88160b09a1 100644 --- a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe_with_partitions.py +++ b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe_with_partitions.py @@ -1,7 +1,16 @@ import os from pathlib import Path -from dagster import AssetExecutionContext, AssetSpec, DailyPartitionsDefinition, Definitions +from dagster import ( + AssetCheckResult, + AssetCheckSeverity, + AssetExecutionContext, + AssetKey, + AssetSpec, + DailyPartitionsDefinition, + Definitions, + asset_check, +) from dagster._time import get_current_datetime_midnight from dagster_airlift.core import ( AirflowBasicAuthBackend, @@ -14,6 +23,28 @@ PARTITIONS_DEF = DailyPartitionsDefinition(start_date=get_current_datetime_midnight()) +@asset_check(asset=AssetKey(["customers_csv"])) +def validate_exported_csv() -> AssetCheckResult: + csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv" + + if not csv_path.exists(): + return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist") + + rows = len(csv_path.read_text().split("\n")) + if rows < 2: + return AssetCheckResult( + passed=False, + description=f"Export CSV {csv_path} is empty", + severity=AssetCheckSeverity.WARN, + ) + + return AssetCheckResult( + passed=True, + description=f"Export CSV {csv_path} exists", + metadata={"rows": rows}, + ) + + def dbt_project_path() -> Path: env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR") assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set" @@ -55,5 +86,6 @@ def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): defs=Definitions( assets=mapped_assets, resources={"dbt": DbtCliResource(project_dir=dbt_project_path())}, + asset_checks=[validate_exported_csv], ), ) diff --git a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/peer_with_check.py b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/peer_with_check.py index 60f5a53b60f70..6abcef989641d 100644 --- a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/peer_with_check.py +++ b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/peer_with_check.py @@ -33,17 +33,15 @@ def validate_exported_csv() -> AssetCheckResult: ) -defs = Definitions.merge( - build_defs_from_airflow_instance( - airflow_instance=AirflowInstance( - # other backends available (e.g. MwaaSessionAuthBackend) - auth_backend=AirflowBasicAuthBackend( - webserver_url="http://localhost:8080", - username="admin", - password="admin", - ), - name="airflow_instance_one", - ) +defs = build_defs_from_airflow_instance( + airflow_instance=AirflowInstance( + # other backends available (e.g. MwaaSessionAuthBackend) + auth_backend=AirflowBasicAuthBackend( + webserver_url="http://localhost:8080", + username="admin", + password="admin", + ), + name="airflow_instance_one", ), - Definitions(asset_checks=[validate_exported_csv]), + defs=Definitions(asset_checks=[validate_exported_csv]), )