Skip to content

Commit

Permalink
[dagster-airlift] Add check to observe step, ensure it's present at e…
Browse files Browse the repository at this point in the history
…ach stage.
  • Loading branch information
dpeng817 committed Nov 8, 2024
1 parent 43350f5 commit b10ff6a
Show file tree
Hide file tree
Showing 7 changed files with 398 additions and 42 deletions.
89 changes: 75 additions & 14 deletions docs/content/integrations/airlift/tutorial.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,15 @@ Then, we will construct our assets:
import os
from pathlib import Path

from dagster import AssetExecutionContext, AssetSpec, Definitions
from dagster import (
AssetCheckResult,
AssetCheckSeverity,
AssetExecutionContext,
AssetKey,
AssetSpec,
Definitions,
asset_check,
)
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
Expand All @@ -212,6 +220,28 @@ from dagster_airlift.core import (
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets


@asset_check(asset=AssetKey(["airflow_instance_one", "dag", "rebuild_customers_list"]))
def validate_exported_csv() -> AssetCheckResult:
csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv"

if not csv_path.exists():
return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist")

rows = len(csv_path.read_text().split("\n"))
if rows < 2:
return AssetCheckResult(
passed=False,
description=f"Export CSV {csv_path} is empty",
severity=AssetCheckSeverity.WARN,
)

return AssetCheckResult(
passed=True,
description=f"Export CSV {csv_path} exists",
metadata={"rows": rows},
)


def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
Expand Down Expand Up @@ -248,6 +278,7 @@ defs = build_defs_from_airflow_instance(
defs=Definitions(
assets=mapped_assets,
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
asset_checks=[validate_exported_csv],
),
)
```
Expand Down Expand Up @@ -279,7 +310,16 @@ If your assets represent a time-partitioned data source, Airlift can automatical
import os
from pathlib import Path

from dagster import AssetExecutionContext, AssetSpec, DailyPartitionsDefinition, Definitions
from dagster import (
AssetCheckResult,
AssetCheckSeverity,
AssetExecutionContext,
AssetKey,
AssetSpec,
DailyPartitionsDefinition,
Definitions,
asset_check,
)
from dagster._time import get_current_datetime_midnight
from dagster_airlift.core import (
AirflowBasicAuthBackend,
Expand All @@ -292,6 +332,28 @@ from dagster_dbt import DbtCliResource, DbtProject, dbt_assets
PARTITIONS_DEF = DailyPartitionsDefinition(start_date=get_current_datetime_midnight())


@asset_check(asset=AssetKey(["customers_csv"]))
def validate_exported_csv() -> AssetCheckResult:
csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv"

if not csv_path.exists():
return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist")

rows = len(csv_path.read_text().split("\n"))
if rows < 2:
return AssetCheckResult(
passed=False,
description=f"Export CSV {csv_path} is empty",
severity=AssetCheckSeverity.WARN,
)

return AssetCheckResult(
passed=True,
description=f"Export CSV {csv_path} exists",
metadata={"rows": rows},
)


def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
Expand Down Expand Up @@ -333,6 +395,7 @@ defs = build_defs_from_airflow_instance(
defs=Definitions(
assets=mapped_assets,
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
asset_checks=[validate_exported_csv],
),
)
```
Expand Down Expand Up @@ -768,19 +831,17 @@ def validate_exported_csv() -> AssetCheckResult:
)
defs = Definitions.merge(
build_defs_from_airflow_instance(
airflow_instance=AirflowInstance(
# other backends available (e.g. MwaaSessionAuthBackend)
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
),
name="airflow_instance_one",
)
defs = build_defs_from_airflow_instance(
airflow_instance=AirflowInstance(
# other backends available (e.g. MwaaSessionAuthBackend)
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
),
name="airflow_instance_one",
),
Definitions(asset_checks=[validate_exported_csv]),
defs=Definitions(asset_checks=[validate_exported_csv]),
)
```

Expand Down
158 changes: 156 additions & 2 deletions docs/content/integrations/airlift/tutorial/observe.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,15 @@ Then, we will construct our assets:
import os
from pathlib import Path

from dagster import AssetExecutionContext, AssetSpec, Definitions
from dagster import (
AssetCheckResult,
AssetCheckSeverity,
AssetExecutionContext,
AssetKey,
AssetSpec,
Definitions,
asset_check,
)
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
Expand All @@ -40,6 +48,28 @@ from dagster_airlift.core import (
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets


@asset_check(asset=AssetKey(["airflow_instance_one", "dag", "rebuild_customers_list"]))
def validate_exported_csv() -> AssetCheckResult:
csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv"

if not csv_path.exists():
return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist")

rows = len(csv_path.read_text().split("\n"))
if rows < 2:
return AssetCheckResult(
passed=False,
description=f"Export CSV {csv_path} is empty",
severity=AssetCheckSeverity.WARN,
)

return AssetCheckResult(
passed=True,
description=f"Export CSV {csv_path} exists",
metadata={"rows": rows},
)


def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
Expand Down Expand Up @@ -76,6 +106,7 @@ defs = build_defs_from_airflow_instance(
defs=Definitions(
assets=mapped_assets,
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
asset_checks=[validate_exported_csv],
),
)
```
Expand All @@ -99,6 +130,97 @@ Kicking off a run of the DAG in Airflow, you should see the newly created assets

_Note: There will be some delay between task completion and assets materializing in Dagster, managed by the sensor. This sensor runs every 30 seconds by default (you can reduce down to one second via the `minimum_interval_seconds` argument to `sensor`), so there will be some delay._

### Moving the asset check

Now that we've introduced an asset explicitly for the `customers.csv` file output by the DAG, we should move the asset check constructed during the Peering step to instead be on the `customers_csv` asset. Simply change the `asset` targeted by the `@asset_check` decorator to be `AssetKey(["customers_csv"])`. Doing this ensures that even when we delete the DAG, the asset check will live on.

When done, our code will look like this.

```python file=../../experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe_check_on_asset.py
import os
from pathlib import Path

from dagster import (
AssetCheckResult,
AssetCheckSeverity,
AssetExecutionContext,
AssetKey,
AssetSpec,
Definitions,
asset_check,
)
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
assets_with_task_mappings,
build_defs_from_airflow_instance,
)
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets


@asset_check(asset=AssetKey(["customers_csv"]))
def validate_exported_csv() -> AssetCheckResult:
csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv"

if not csv_path.exists():
return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist")

rows = len(csv_path.read_text().split("\n"))
if rows < 2:
return AssetCheckResult(
passed=False,
description=f"Export CSV {csv_path} is empty",
severity=AssetCheckSeverity.WARN,
)

return AssetCheckResult(
passed=True,
description=f"Export CSV {csv_path} exists",
metadata={"rows": rows},
)


def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
return Path(env_val)


@dbt_assets(
manifest=dbt_project_path() / "target" / "manifest.json",
project=DbtProject(dbt_project_path()),
)
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()


mapped_assets = assets_with_task_mappings(
dag_id="rebuild_customers_list",
task_mappings={
"load_raw_customers": [AssetSpec(key=["raw_data", "raw_customers"])],
"build_dbt_models": [dbt_project_assets],
"export_customers": [AssetSpec(key="customers_csv", deps=["customers"])],
},
)


defs = build_defs_from_airflow_instance(
airflow_instance=AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
),
name="airflow_instance_one",
),
defs=Definitions(
assets=mapped_assets,
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
asset_checks=[validate_exported_csv],
),
)
```

### Adding partitions

If your assets represent a time-partitioned data source, Airlift can automatically associate your materializations to the relevant partitions. In the case of `rebuild_customers_list`, data is daily partitioned in each created table, and as a result we've added a `@daily` cron schedule to the DAG to make sure it runs every day. We can likewise add a `DailyPartitionsDefinition` to each of our assets.
Expand All @@ -107,7 +229,16 @@ If your assets represent a time-partitioned data source, Airlift can automatical
import os
from pathlib import Path

from dagster import AssetExecutionContext, AssetSpec, DailyPartitionsDefinition, Definitions
from dagster import (
AssetCheckResult,
AssetCheckSeverity,
AssetExecutionContext,
AssetKey,
AssetSpec,
DailyPartitionsDefinition,
Definitions,
asset_check,
)
from dagster._time import get_current_datetime_midnight
from dagster_airlift.core import (
AirflowBasicAuthBackend,
Expand All @@ -120,6 +251,28 @@ from dagster_dbt import DbtCliResource, DbtProject, dbt_assets
PARTITIONS_DEF = DailyPartitionsDefinition(start_date=get_current_datetime_midnight())


@asset_check(asset=AssetKey(["customers_csv"]))
def validate_exported_csv() -> AssetCheckResult:
csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv"

if not csv_path.exists():
return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist")

rows = len(csv_path.read_text().split("\n"))
if rows < 2:
return AssetCheckResult(
passed=False,
description=f"Export CSV {csv_path} is empty",
severity=AssetCheckSeverity.WARN,
)

return AssetCheckResult(
passed=True,
description=f"Export CSV {csv_path} exists",
metadata={"rows": rows},
)


def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
Expand Down Expand Up @@ -161,6 +314,7 @@ defs = build_defs_from_airflow_instance(
defs=Definitions(
assets=mapped_assets,
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
asset_checks=[validate_exported_csv],
),
)
```
Expand Down
22 changes: 10 additions & 12 deletions docs/content/integrations/airlift/tutorial/peer.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,17 @@ def validate_exported_csv() -> AssetCheckResult:
)


defs = Definitions.merge(
build_defs_from_airflow_instance(
airflow_instance=AirflowInstance(
# other backends available (e.g. MwaaSessionAuthBackend)
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
),
name="airflow_instance_one",
)
defs = build_defs_from_airflow_instance(
airflow_instance=AirflowInstance(
# other backends available (e.g. MwaaSessionAuthBackend)
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
),
name="airflow_instance_one",
),
Definitions(asset_checks=[validate_exported_csv]),
defs=Definitions(asset_checks=[validate_exported_csv]),
)
```

Expand Down
Loading

0 comments on commit b10ff6a

Please sign in to comment.