Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-airlift] Add check to observe step, ensure it's present at each stage. #25828

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 75 additions & 14 deletions docs/content/integrations/airlift/tutorial.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,15 @@ Then, we will construct our assets:
import os
from pathlib import Path

from dagster import AssetExecutionContext, AssetSpec, Definitions
from dagster import (
AssetCheckResult,
AssetCheckSeverity,
AssetExecutionContext,
AssetKey,
AssetSpec,
Definitions,
asset_check,
)
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
Expand All @@ -212,6 +220,28 @@ from dagster_airlift.core import (
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets


@asset_check(asset=AssetKey(["airflow_instance_one", "dag", "rebuild_customers_list"]))
def validate_exported_csv() -> AssetCheckResult:
csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv"

if not csv_path.exists():
return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist")

rows = len(csv_path.read_text().split("\n"))
if rows < 2:
return AssetCheckResult(
passed=False,
description=f"Export CSV {csv_path} is empty",
severity=AssetCheckSeverity.WARN,
)

return AssetCheckResult(
passed=True,
description=f"Export CSV {csv_path} exists",
metadata={"rows": rows},
)


def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
Expand Down Expand Up @@ -248,6 +278,7 @@ defs = build_defs_from_airflow_instance(
defs=Definitions(
assets=mapped_assets,
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
asset_checks=[validate_exported_csv],
),
)
```
Expand Down Expand Up @@ -279,7 +310,16 @@ If your assets represent a time-partitioned data source, Airlift can automatical
import os
from pathlib import Path

from dagster import AssetExecutionContext, AssetSpec, DailyPartitionsDefinition, Definitions
from dagster import (
AssetCheckResult,
AssetCheckSeverity,
AssetExecutionContext,
AssetKey,
AssetSpec,
DailyPartitionsDefinition,
Definitions,
asset_check,
)
from dagster._time import get_current_datetime_midnight
from dagster_airlift.core import (
AirflowBasicAuthBackend,
Expand All @@ -292,6 +332,28 @@ from dagster_dbt import DbtCliResource, DbtProject, dbt_assets
PARTITIONS_DEF = DailyPartitionsDefinition(start_date=get_current_datetime_midnight())


@asset_check(asset=AssetKey(["customers_csv"]))
def validate_exported_csv() -> AssetCheckResult:
csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv"

if not csv_path.exists():
return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist")

rows = len(csv_path.read_text().split("\n"))
if rows < 2:
return AssetCheckResult(
passed=False,
description=f"Export CSV {csv_path} is empty",
severity=AssetCheckSeverity.WARN,
)

return AssetCheckResult(
passed=True,
description=f"Export CSV {csv_path} exists",
metadata={"rows": rows},
)


def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
Expand Down Expand Up @@ -333,6 +395,7 @@ defs = build_defs_from_airflow_instance(
defs=Definitions(
assets=mapped_assets,
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
asset_checks=[validate_exported_csv],
),
)
```
Expand Down Expand Up @@ -768,19 +831,17 @@ def validate_exported_csv() -> AssetCheckResult:
)


defs = Definitions.merge(
build_defs_from_airflow_instance(
airflow_instance=AirflowInstance(
# other backends available (e.g. MwaaSessionAuthBackend)
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
),
name="airflow_instance_one",
)
defs = build_defs_from_airflow_instance(
airflow_instance=AirflowInstance(
# other backends available (e.g. MwaaSessionAuthBackend)
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
),
name="airflow_instance_one",
),
Definitions(asset_checks=[validate_exported_csv]),
defs=Definitions(asset_checks=[validate_exported_csv]),
)
```

Expand Down
158 changes: 156 additions & 2 deletions docs/content/integrations/airlift/tutorial/observe.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,15 @@ Then, we will construct our assets:
import os
from pathlib import Path

from dagster import AssetExecutionContext, AssetSpec, Definitions
from dagster import (
AssetCheckResult,
AssetCheckSeverity,
AssetExecutionContext,
AssetKey,
AssetSpec,
Definitions,
asset_check,
)
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
Expand All @@ -40,6 +48,28 @@ from dagster_airlift.core import (
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets


@asset_check(asset=AssetKey(["airflow_instance_one", "dag", "rebuild_customers_list"]))
def validate_exported_csv() -> AssetCheckResult:
csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv"

if not csv_path.exists():
return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist")

rows = len(csv_path.read_text().split("\n"))
if rows < 2:
return AssetCheckResult(
passed=False,
description=f"Export CSV {csv_path} is empty",
severity=AssetCheckSeverity.WARN,
)

return AssetCheckResult(
passed=True,
description=f"Export CSV {csv_path} exists",
metadata={"rows": rows},
)


def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
Expand Down Expand Up @@ -76,6 +106,7 @@ defs = build_defs_from_airflow_instance(
defs=Definitions(
assets=mapped_assets,
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
asset_checks=[validate_exported_csv],
),
)
```
Expand All @@ -99,6 +130,97 @@ Kicking off a run of the DAG in Airflow, you should see the newly created assets

_Note: There will be some delay between task completion and assets materializing in Dagster, managed by the sensor. This sensor runs every 30 seconds by default (you can reduce down to one second via the `minimum_interval_seconds` argument to `sensor`)._

### Moving the asset check

Now that we've introduced an asset explicitly for the `customers.csv` file output by the DAG, we should move the asset check constructed during the Peering step to instead be on the `customers_csv` asset. Simply change the `asset` targeted by the `@asset_check` decorator to be `AssetKey(["customers_csv"])`. Doing this ensures that even when we delete the DAG, the asset check will live on.

When done, our code will look like this.

```python file=../../experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe_check_on_asset.py
import os
from pathlib import Path

from dagster import (
AssetCheckResult,
AssetCheckSeverity,
AssetExecutionContext,
AssetKey,
AssetSpec,
Definitions,
asset_check,
)
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
assets_with_task_mappings,
build_defs_from_airflow_instance,
)
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets


@asset_check(asset=AssetKey(["customers_csv"]))
def validate_exported_csv() -> AssetCheckResult:
csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv"

if not csv_path.exists():
return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist")

rows = len(csv_path.read_text().split("\n"))
if rows < 2:
return AssetCheckResult(
passed=False,
description=f"Export CSV {csv_path} is empty",
severity=AssetCheckSeverity.WARN,
)

return AssetCheckResult(
passed=True,
description=f"Export CSV {csv_path} exists",
metadata={"rows": rows},
)


def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
return Path(env_val)


@dbt_assets(
manifest=dbt_project_path() / "target" / "manifest.json",
project=DbtProject(dbt_project_path()),
)
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()


mapped_assets = assets_with_task_mappings(
dag_id="rebuild_customers_list",
task_mappings={
"load_raw_customers": [AssetSpec(key=["raw_data", "raw_customers"])],
"build_dbt_models": [dbt_project_assets],
"export_customers": [AssetSpec(key="customers_csv", deps=["customers"])],
},
)


defs = build_defs_from_airflow_instance(
airflow_instance=AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
),
name="airflow_instance_one",
),
defs=Definitions(
assets=mapped_assets,
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
asset_checks=[validate_exported_csv],
),
)
```

### Adding partitions

If your Airflow tasks produce time-partitioned assets, Airlift can automatically associate your materializations to the relevant partitions. In the case of `rebuild_customers_list`, data is daily partitioned in each created table, and and the Airflow DAG runs on a `@daily` cron schedule. We can likewise add a `DailyPartitionsDefinition` to each of our assets.
Expand All @@ -107,7 +229,16 @@ If your Airflow tasks produce time-partitioned assets, Airlift can automatically
import os
from pathlib import Path

from dagster import AssetExecutionContext, AssetSpec, DailyPartitionsDefinition, Definitions
from dagster import (
AssetCheckResult,
AssetCheckSeverity,
AssetExecutionContext,
AssetKey,
AssetSpec,
DailyPartitionsDefinition,
Definitions,
asset_check,
)
from dagster._time import get_current_datetime_midnight
from dagster_airlift.core import (
AirflowBasicAuthBackend,
Expand All @@ -120,6 +251,28 @@ from dagster_dbt import DbtCliResource, DbtProject, dbt_assets
PARTITIONS_DEF = DailyPartitionsDefinition(start_date=get_current_datetime_midnight())


@asset_check(asset=AssetKey(["customers_csv"]))
def validate_exported_csv() -> AssetCheckResult:
csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv"

if not csv_path.exists():
return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist")

rows = len(csv_path.read_text().split("\n"))
if rows < 2:
return AssetCheckResult(
passed=False,
description=f"Export CSV {csv_path} is empty",
severity=AssetCheckSeverity.WARN,
)

return AssetCheckResult(
passed=True,
description=f"Export CSV {csv_path} exists",
metadata={"rows": rows},
)


def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
Expand Down Expand Up @@ -161,6 +314,7 @@ defs = build_defs_from_airflow_instance(
defs=Definitions(
assets=mapped_assets,
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
asset_checks=[validate_exported_csv],
),
)
```
Expand Down
22 changes: 10 additions & 12 deletions docs/content/integrations/airlift/tutorial/peer.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,17 @@ def validate_exported_csv() -> AssetCheckResult:
)


defs = Definitions.merge(
build_defs_from_airflow_instance(
airflow_instance=AirflowInstance(
# other backends available (e.g. MwaaSessionAuthBackend)
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
),
name="airflow_instance_one",
)
defs = build_defs_from_airflow_instance(
airflow_instance=AirflowInstance(
# other backends available (e.g. MwaaSessionAuthBackend)
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
),
name="airflow_instance_one",
),
Definitions(asset_checks=[validate_exported_csv]),
defs=Definitions(asset_checks=[validate_exported_csv]),
)
```

Expand Down
Loading