Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executable and da strawman #25931

Draft
wants to merge 1 commit into
base: dpeng817/federation-tutorial
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from typing import AbstractSet

from dagster import (
AutomationConditionSensorDefinition,
DefaultSensorStatus,
Definitions,
MaterializeResult,
multi_asset,
)
from dagster._core.definitions.asset_spec import replace_attributes
from dagster._core.definitions.asset_spec import merge_attributes
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)
Expand Down Expand Up @@ -34,45 +36,50 @@
name="metrics",
)

load_customers_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=warehouse_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "load_customers",
)

def dag_id_matches(spec, dag_ids: AbstractSet[str]) -> bool:
return spec.metadata.get("Dag ID") in dag_ids


warehouse_specs = load_airflow_dag_asset_specs(airflow_instance=warehouse_airflow_instance)

load_customers_dag_specs = [
spec for spec in warehouse_specs if dag_id_matches(spec, {"load_customers"})
]

is_customer_dag = lambda spec: dag_id_matches(spec, {"customer_metrics"})

metrics_specs = [
merge_attributes(
spec, deps=load_customers_dag_specs, automation_condition=AutomationCondition.eager()
)
)
customer_metrics_dag_asset = replace_attributes(
next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
)
# Add a dependency on the load_customers_dag_asset
),
deps=[load_customers_dag_asset],
automation_condition=AutomationCondition.eager(),
if not is_customer_dag(spec)
else spec
for spec in load_airflow_dag_asset_specs(airflow_instance=metrics_airflow_instance)
]

customer_metrics, remaining_metrics_specs = (
[spec for spec in metrics_specs if is_customer_dag(spec)],
[spec for spec in metrics_specs if not is_customer_dag(spec)],
)


@multi_asset(specs=[customer_metrics_dag_asset])
@multi_asset(specs=customer_metrics)
def run_customer_metrics() -> MaterializeResult:
run_id = metrics_airflow_instance.trigger_dag("customer_metrics")
metrics_airflow_instance.wait_for_run_completion("customer_metrics", run_id)
if metrics_airflow_instance.get_run_state("customer_metrics", run_id) == "success":
return MaterializeResult(asset_key=customer_metrics_dag_asset.key)
return MaterializeResult(asset_key=customer_metrics[0].key)
else:
raise Exception("Dag run failed.")


warehouse_sensor = build_airflow_polling_sensor(
mapped_assets=[load_customers_dag_asset],
mapped_assets=warehouse_specs,
airflow_instance=warehouse_airflow_instance,
)
metrics_sensor = build_airflow_polling_sensor(
mapped_assets=[customer_metrics_dag_asset],
mapped_assets=metrics_specs,
airflow_instance=metrics_airflow_instance,
)

Expand All @@ -84,6 +91,6 @@ def run_customer_metrics() -> MaterializeResult:
)

defs = Definitions(
assets=[load_customers_dag_asset, run_customer_metrics],
assets=[run_customer_metrics, *remaining_metrics_specs, *warehouse_specs],
sensors=[warehouse_sensor, metrics_sensor, automation_sensor],
)
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ def merge_attributes(
owners: Sequence[str] = ...,
tags: Mapping[str, str] = ...,
kinds: Set[str] = ...,
automation_condition: AutomationCondition = ...,
) -> "AssetSpec":
"""Returns a new AssetSpec with the specified attributes merged with the current attributes."""
current_tags_without_kinds = {
Expand Down