Skip to content

Commit

Permalink
[Run Timeline] Don't drop job data if locations within the FutureTick…
Browse files Browse the repository at this point in the history
…sQuery fails (#27944)

## Summary & Motivation

Currently, if the FutureTicksQuery itself fails we recover by returning
the ongoing runs and complete runs data that we have.

However, if the query doesn't fail then we end up iterating over all of
the locations within it and constructing a `jobs` array with all of the
rows of the timeline. The problem is that this construction relies on
the jobs returned by FutureTicksQuery. If FutureTicksQuery doesn't
return a particular job then we drop the data for that job completely.
To fix this track which keys we've added via the FutureTicksQuery and
then do a second pass where we add data for any jobs that were not in
the FutureTicksQuery

## How I Tested These Changes

Loaded the Run timeline for a customer with a failing location entry


## Changelog

> Insert changelog entry or delete this section.
  • Loading branch information
salazarm authored and cmpadden committed Feb 20, 2025
1 parent 3f396fa commit ef0f7a5
Show file tree
Hide file tree
Showing 84 changed files with 1,104 additions and 1,725 deletions.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import dagster as dg

from dagster import Definitions
from dagster_components import (
Component,
ComponentLoadContext,
Expand All @@ -26,6 +25,6 @@ def get_schema(cls):
def get_scaffolder(cls) -> DefaultComponentScaffolder:
return DefaultComponentScaffolder()

def build_defs(self, load_context: ComponentLoadContext) -> dg.Definitions:
def build_defs(self, load_context: ComponentLoadContext) -> Definitions:
# Add definition construction logic here.
return dg.Definitions()
return Definitions()
Original file line number Diff line number Diff line change
@@ -1,28 +1,23 @@
import pandas as pd

from dagster import (
AssetCheckResult,
AssetCheckSpec,
AssetExecutionContext,
Definitions,
Output,
asset,
)
import dagster as dg


@asset(check_specs=[AssetCheckSpec(name="orders_id_has_no_nulls", asset="orders")])
def orders(context: AssetExecutionContext):
@dg.asset(
check_specs=[dg.AssetCheckSpec(name="orders_id_has_no_nulls", asset="orders")]
)
def orders(context: dg.AssetExecutionContext):
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})

# save the output and indicate that it's been saved
orders_df.to_csv("orders")
yield Output(value=None)
yield dg.Output(value=None)

# check it
num_null_order_ids = orders_df["order_id"].isna().sum()
yield AssetCheckResult(
yield dg.AssetCheckResult(
passed=bool(num_null_order_ids == 0),
)


defs = Definitions(assets=[orders])
defs = dg.Definitions(assets=[orders])
Original file line number Diff line number Diff line change
@@ -1,50 +1,46 @@
from collections.abc import Mapping
from unittest.mock import MagicMock

from dagster import (
AssetCheckResult,
AssetChecksDefinition,
Definitions,
asset,
asset_check,
)
import dagster as dg


@asset
@dg.asset
def orders(): ...


@asset
@dg.asset
def items(): ...


def make_check(check_blob: Mapping[str, str]) -> AssetChecksDefinition:
@asset_check(
def make_check(check_blob: Mapping[str, str]) -> dg.AssetChecksDefinition:
@dg.asset_check(
name=check_blob["name"],
asset=check_blob["asset"],
asset=check_blob["dg.asset"],
required_resource_keys={"db_connection"},
)
def _check(context):
rows = context.resources.db_connection.execute(check_blob["sql"])
return AssetCheckResult(passed=len(rows) == 0, metadata={"num_rows": len(rows)})
return dg.AssetCheckResult(
passed=len(rows) == 0, metadata={"num_rows": len(rows)}
)

return _check


check_blobs = [
{
"name": "orders_id_has_no_nulls",
"asset": "orders",
"dg.asset": "orders",
"sql": "select * from orders where order_id is null",
},
{
"name": "items_id_has_no_nulls",
"asset": "items",
"dg.asset": "items",
"sql": "select * from items where item_id is null",
},
]

defs = Definitions(
defs = dg.Definitions(
assets=[orders, items],
asset_checks=[make_check(check_blob) for check_blob in check_blobs],
resources={"db_connection": MagicMock()},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,47 +1,42 @@
from dagster import (
AssetSelection,
Definitions,
ScheduleDefinition,
asset,
asset_check,
define_asset_job,
)
import dagster as dg


@asset
@dg.asset
def my_asset(): ...


@asset_check(asset=my_asset)
@dg.asset_check(asset=my_asset)
def check_1(): ...


@asset_check(asset=my_asset)
@dg.asset_check(asset=my_asset)
def check_2(): ...


# includes my_asset and both checks
my_job = define_asset_job("my_job", selection=AssetSelection.assets(my_asset))
my_job = dg.define_asset_job("my_job", selection=dg.AssetSelection.assets(my_asset))


# includes only my_asset
my_asset_only_job = define_asset_job(
my_asset_only_job = dg.define_asset_job(
"my_asset_only_job",
selection=AssetSelection.assets(my_asset).without_checks(),
selection=dg.AssetSelection.assets(my_asset).without_checks(),
)

# includes check_1 and check_2, but not my_asset
checks_only_job = define_asset_job(
"checks_only_job", selection=AssetSelection.checks_for_assets(my_asset)
checks_only_job = dg.define_asset_job(
"checks_only_job", selection=dg.AssetSelection.checks_for_assets(my_asset)
)

# includes only check_1
check_1_job = define_asset_job("check_1_job", selection=AssetSelection.checks(check_1))
check_1_job = dg.define_asset_job(
"check_1_job", selection=dg.AssetSelection.checks(check_1)
)

# schedule my_job to run every day at midnight
basic_schedule = ScheduleDefinition(job=my_job, cron_schedule="0 0 * * *")
basic_schedule = dg.ScheduleDefinition(job=my_job, cron_schedule="0 0 * * *")

defs = Definitions(
defs = dg.Definitions(
assets=[my_asset],
asset_checks=[check_1, check_2],
jobs=[my_job, my_asset_only_job, checks_only_job, check_1_job],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
from datetime import timedelta

from dagster import (
Definitions,
asset,
build_last_update_freshness_checks,
build_sensor_for_freshness_checks,
)
import dagster as dg


@asset
@dg.asset
def my_asset(): ...


asset1_freshness_checks = build_last_update_freshness_checks(
asset1_freshness_checks = dg.build_last_update_freshness_checks(
assets=[my_asset], lower_bound_delta=timedelta(hours=2)
)
freshness_checks_sensor = build_sensor_for_freshness_checks(
freshness_checks_sensor = dg.build_sensor_for_freshness_checks(
freshness_checks=asset1_freshness_checks
)
defs = Definitions(
defs = dg.Definitions(
assets=[my_asset],
asset_checks=asset1_freshness_checks,
sensors=[freshness_checks_sensor],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,23 @@
from collections.abc import Iterable

from dagster import (
AssetCheckExecutionContext,
AssetCheckResult,
AssetCheckSeverity,
AssetCheckSpec,
multi_asset_check,
)
import dagster as dg


@multi_asset_check(
@dg.multi_asset_check(
specs=[
AssetCheckSpec(name="asset_check_one", asset="my_asset_one"),
AssetCheckSpec(name="asset_check_two", asset="my_asset_two"),
dg.AssetCheckSpec(name="asset_check_one", asset="my_asset_one"),
dg.AssetCheckSpec(name="asset_check_two", asset="my_asset_two"),
]
)
def the_check(context: AssetCheckExecutionContext) -> Iterable[AssetCheckResult]:
yield AssetCheckResult(
def the_check(context: dg.AssetCheckExecutionContext) -> Iterable[dg.AssetCheckResult]:
yield dg.AssetCheckResult(
passed=False,
severity=AssetCheckSeverity.WARN,
severity=dg.AssetCheckSeverity.WARN,
description="The asset is over 0.5",
asset_key="asset_check_one",
)

yield AssetCheckResult(
yield dg.AssetCheckResult(
passed=True,
description="The asset is fresh.",
asset_key="asset_check_two",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
from dagster import (
AssetCheckResult,
AssetCheckSeverity,
Definitions,
asset,
asset_check,
)
import dagster as dg


@asset
@dg.asset
def my_asset(): ...


@asset_check(asset=my_asset)
@dg.asset_check(asset=my_asset)
def my_check():
is_serious = ...
return AssetCheckResult(
return dg.AssetCheckResult(
passed=False,
severity=AssetCheckSeverity.ERROR if is_serious else AssetCheckSeverity.WARN,
severity=dg.AssetCheckSeverity.ERROR
if is_serious
else dg.AssetCheckSeverity.WARN,
)


defs = Definitions(assets=[my_asset], asset_checks=[my_check])
defs = dg.Definitions(assets=[my_asset], asset_checks=[my_check])
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,14 @@

from dagster_snowflake import SnowflakeResource, fetch_last_updated_timestamps

from dagster import (
AssetSelection,
AssetSpec,
Definitions,
EnvVar,
MetadataValue,
ObserveResult,
ScheduleDefinition,
build_last_update_freshness_checks,
define_asset_job,
multi_observable_source_asset,
)
import dagster as dg

TABLE_SCHEMA = "PUBLIC"
table_names = ["charges", "customers"]
asset_specs = [AssetSpec(table_name) for table_name in table_names]
asset_specs = [dg.AssetSpec(table_name) for table_name in table_names]


@multi_observable_source_asset(specs=asset_specs)
@dg.multi_observable_source_asset(specs=asset_specs)
def source_tables(snowflake: SnowflakeResource):
with snowflake.get_connection() as conn:
freshness_results = fetch_last_updated_timestamps(
Expand All @@ -29,42 +18,42 @@ def source_tables(snowflake: SnowflakeResource):
schema=TABLE_SCHEMA,
)
for table_name, last_updated in freshness_results.items():
yield ObserveResult(
yield dg.ObserveResult(
asset_key=table_name,
metadata={
"dagster/last_updated_timestamp": MetadataValue.timestamp(
"dagster/last_updated_timestamp": dg.MetadataValue.timestamp(
last_updated
)
},
)


source_tables_observation_schedule = ScheduleDefinition(
job=define_asset_job(
source_tables_observation_schedule = dg.ScheduleDefinition(
job=dg.define_asset_job(
"source_tables_observation_job",
selection=AssetSelection.assets(source_tables),
selection=dg.AssetSelection.assets(source_tables),
),
# Runs every minute. Usually, a much less frequent cadence is necessary,
# but a short cadence makes it easier to play around with this example.
cron_schedule="* * * * *",
)


source_table_freshness_checks = build_last_update_freshness_checks(
source_table_freshness_checks = dg.build_last_update_freshness_checks(
assets=[source_tables],
lower_bound_delta=timedelta(hours=2),
)


defs = Definitions(
defs = dg.Definitions(
assets=[source_tables],
asset_checks=source_table_freshness_checks,
schedules=[source_tables_observation_schedule],
resources={
"snowflake": SnowflakeResource(
user=EnvVar("SNOWFLAKE_USER"),
account=EnvVar("SNOWFLAKE_ACCOUNT"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
user=dg.EnvVar("SNOWFLAKE_USER"),
account=dg.EnvVar("SNOWFLAKE_ACCOUNT"),
password=dg.EnvVar("SNOWFLAKE_PASSWORD"),
)
},
)
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
# start_multi_observable_marker
from dagster_snowflake import SnowflakeResource, fetch_last_updated_timestamps

from dagster import (
AssetSpec,
MetadataValue,
ObserveResult,
multi_observable_source_asset,
)
import dagster as dg

TABLE_SCHEMA = "PUBLIC"
table_names = ["charges", "customers"]
asset_specs = [AssetSpec(table_name) for table_name in table_names]
asset_specs = [dg.AssetSpec(table_name) for table_name in table_names]


@multi_observable_source_asset(specs=asset_specs)
@dg.multi_observable_source_asset(specs=asset_specs)
def source_tables(snowflake: SnowflakeResource):
with snowflake.get_connection() as conn:
freshness_results = fetch_last_updated_timestamps(
Expand All @@ -22,10 +17,10 @@ def source_tables(snowflake: SnowflakeResource):
schema=TABLE_SCHEMA,
)
for table_name, last_updated in freshness_results.items():
yield ObserveResult(
yield dg.ObserveResult(
asset_key=table_name,
metadata={
"dagster/last_updated_timestamp": MetadataValue.timestamp(
"dagster/last_updated_timestamp": dg.MetadataValue.timestamp(
last_updated
)
},
Expand Down
Loading

0 comments on commit ef0f7a5

Please sign in to comment.