Skip to content

Commit

Permalink
[dagster-components] Introduce resolved OpSpec, backfill_policy (#28071)
Browse files Browse the repository at this point in the history
## Summary

Introduces a new `backfill_policy` schema entry to `OpSpecSchema`, which
is resolved to an actual `BackfillPolicy` during its conversion to an
`OpSpec`:

```yaml

type: dbt_project@dagster_components

attributes:
  op:
    backfill_policy:
      type: multi_run
      max_partitions_per_run: 5
```


## How I Tested These Changes

New unit tests.
  • Loading branch information
benpankow authored Feb 27, 2025
1 parent f8ffc52 commit cbbaac4
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
from dagster._core.definitions.asset_selection import AssetSelection
from dagster._core.definitions.asset_spec import AssetSpec, map_asset_specs
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.backfill_policy import BackfillPolicy
from dagster._core.definitions.definitions_class import Definitions
from dagster._record import replace
from pydantic import BaseModel, Field
from pydantic.dataclasses import dataclass
from typing_extensions import TypeAlias

from dagster_components.core.schema.base import FieldResolver, ResolvableSchema
Expand All @@ -26,11 +28,50 @@ def _resolve_asset_key(key: str, context: ResolutionContext) -> AssetKey:
PostProcessorFn: TypeAlias = Callable[[Definitions], Definitions]


class OpSpecSchema(ResolvableSchema):
@dataclass
class SingleRunBackfillPolicySchema:
type: Literal["single_run"] = "single_run"


@dataclass
class MultiRunBackfillPolicySchema:
type: Literal["multi_run"] = "multi_run"
max_partitions_per_run: int = 1


def resolve_backfill_policy(
context: ResolutionContext, schema: "OpSpecSchema"
) -> Optional[BackfillPolicy]:
if schema.backfill_policy is None:
return None

if schema.backfill_policy.type == "single_run":
return BackfillPolicy.single_run()
elif schema.backfill_policy.type == "multi_run":
return BackfillPolicy.multi_run(
max_partitions_per_run=schema.backfill_policy.max_partitions_per_run
)

raise ValueError(f"Invalid backfill policy: {schema.backfill_policy}")


@dataclass
class OpSpec:
name: Optional[str] = None
tags: Optional[dict[str, str]] = None
backfill_policy: Annotated[Optional[BackfillPolicy], FieldResolver(resolve_backfill_policy)] = (
None
)


class OpSpecSchema(ResolvableSchema[OpSpec]):
name: Optional[str] = Field(default=None, description="The name of the op.")
tags: Optional[dict[str, str]] = Field(
default=None, description="Arbitrary metadata for the op."
)
backfill_policy: Optional[
Union[SingleRunBackfillPolicySchema, MultiRunBackfillPolicySchema]
] = Field(default=None, description="The backfill policy to use for the assets.")


class AssetDepSchema(ResolvableSchema[AssetDep]):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from dagster_components.core.schema.objects import (
AssetAttributesSchema,
AssetPostProcessorSchema,
OpSpec,
OpSpecSchema,
PostProcessorFn,
ResolutionContext,
Expand Down Expand Up @@ -61,7 +62,7 @@ class DbtProjectComponent(Component):
"""Expose a DBT project to Dagster as a set of assets."""

dbt: Annotated[DbtCliResource, FieldResolver(resolve_dbt)]
op: Optional[OpSpecSchema] = Field(
op: Optional[OpSpec] = Field(
None, description="Customizations to the op underlying the dbt run."
)
translator: Annotated[DagsterDbtTranslator, FieldResolver(resolve_translator)] = Field(
Expand Down Expand Up @@ -111,6 +112,7 @@ def build_defs(self, context: ComponentLoadContext) -> Definitions:
dagster_dbt_translator=self.translator,
select=self.select,
exclude=self.exclude,
backfill_policy=self.op.backfill_policy if self.op else None,
)
def _fn(context: AssetExecutionContext):
yield from self.execute(context=context, dbt=self.dbt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from dagster_components.core.schema.objects import (
AssetAttributesSchema,
AssetPostProcessorSchema,
OpSpec,
OpSpecSchema,
PostProcessorFn,
)
Expand Down Expand Up @@ -48,7 +49,7 @@ def resolve_translator(

class SlingReplicationSpec(BaseModel):
path: str
op: Optional[OpSpecSchema]
op: Optional[OpSpec]
translator: Annotated[Optional[DagsterSlingTranslator], FieldResolver(resolve_translator)]
include_metadata: list[SlingMetadataAddons]

Expand Down Expand Up @@ -121,13 +122,14 @@ def get_schema(cls) -> type[SlingReplicationCollectionSchema]:
def build_asset(
self, context: ComponentLoadContext, replication_spec: SlingReplicationSpec
) -> AssetsDefinition:
op_spec = replication_spec.op or OpSpecSchema()
op_spec = replication_spec.op or OpSpec()

@sling_assets(
name=op_spec.name or Path(replication_spec.path).stem,
op_tags=op_spec.tags,
replication_config=context.path / replication_spec.path,
dagster_sling_translator=replication_spec.translator,
backfill_policy=op_spec.backfill_policy,
)
def _asset(context: AssetExecutionContext):
yield from self.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dagster import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.backfill_policy import BackfillPolicy, BackfillPolicyType
from dagster_components.core.component_decl_builder import ComponentFileModel
from dagster_components.core.component_defs_builder import (
YamlComponentDecl,
Expand All @@ -22,7 +23,7 @@
from dagster_components_tests.utils import assert_assets, get_asset_keys, script_load_context

if TYPE_CHECKING:
from dagster._core.definitions.assets import AssetsDefinition
from dagster import AssetsDefinition

STUB_LOCATION_PATH = Path(__file__).parent.parent / "code_locations" / "dbt_project_location"
COMPONENT_RELPATH = "components/jaffle_shop_dbt"
Expand Down Expand Up @@ -50,14 +51,25 @@ def dbt_path() -> Iterator[Path]:
yield Path(temp_dir)


def test_python_params(dbt_path: Path) -> None:
@pytest.mark.parametrize(
"backfill_policy", [None, "single_run", "multi_run", "multi_run_with_max_partitions"]
)
def test_python_params(dbt_path: Path, backfill_policy: Optional[str]) -> None:
backfill_policy_arg = {}
if backfill_policy == "single_run":
backfill_policy_arg["backfill_policy"] = {"type": "single_run"}
elif backfill_policy == "multi_run":
backfill_policy_arg["backfill_policy"] = {"type": "multi_run"}
elif backfill_policy == "multi_run_with_max_partitions":
backfill_policy_arg["backfill_policy"] = {"type": "multi_run", "max_partitions_per_run": 3}

decl_node = YamlComponentDecl(
path=dbt_path / COMPONENT_RELPATH,
component_file_model=ComponentFileModel(
type="dbt_project",
attributes={
"dbt": {"project_dir": "jaffle_shop"},
"op": {"name": "some_op", "tags": {"tag1": "value"}},
"op": {"name": "some_op", "tags": {"tag1": "value"}, **backfill_policy_arg},
},
),
)
Expand All @@ -67,8 +79,23 @@ def test_python_params(dbt_path: Path) -> None:
)
assert get_asset_keys(component) == JAFFLE_SHOP_KEYS
defs = component.build_defs(script_load_context())
assert defs.get_assets_def("stg_customers").op.name == "some_op"
assert defs.get_assets_def("stg_customers").op.tags["tag1"] == "value"
assets_def: AssetsDefinition = defs.get_assets_def("stg_customers")
assert assets_def.op.name == "some_op"
assert assets_def.op.tags["tag1"] == "value"

if backfill_policy is None:
assert assets_def.backfill_policy is None
elif backfill_policy == "single_run":
assert isinstance(assets_def.backfill_policy, BackfillPolicy)
assert assets_def.backfill_policy.policy_type == BackfillPolicyType.SINGLE_RUN
elif backfill_policy == "multi_run":
assert isinstance(assets_def.backfill_policy, BackfillPolicy)
assert assets_def.backfill_policy.policy_type == BackfillPolicyType.MULTI_RUN
assert assets_def.backfill_policy.max_partitions_per_run == 1
elif backfill_policy == "multi_run_with_max_partitions":
assert isinstance(assets_def.backfill_policy, BackfillPolicy)
assert assets_def.backfill_policy.policy_type == BackfillPolicyType.MULTI_RUN
assert assets_def.backfill_policy.max_partitions_per_run == 3


def test_load_from_path(dbt_path: Path) -> None:
Expand Down

0 comments on commit cbbaac4

Please sign in to comment.