Skip to content

Commit

Permalink
[components] Remove @registered_component_type
Browse files Browse the repository at this point in the history
[INTERNAL_BRANCH=sean/components/rm-registered-component-type]
  • Loading branch information
smackesey committed Feb 27, 2025
1 parent f334f7d commit 25abe4f
Show file tree
Hide file tree
Showing 31 changed files with 14 additions and 156 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from dagster_components import registered_component_type
from dagster_components.lib import SlingReplicationCollection


@registered_component_type(name="custom_subclass")
class CustomSubclass(SlingReplicationCollection): ...
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from collections.abc import Mapping
from typing import Any

from dagster_components import registered_component_type
from dagster_components.lib import SlingReplicationCollection

import dagster as dg


@registered_component_type(name="custom_subclass")
class SubclassWithScope(SlingReplicationCollection):
def get_additional_scope(self) -> Mapping[str, Any]:
def _custom_cron(cron_schedule: str) -> dg.AutomationCondition:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from collections.abc import Iterator

from dagster_components import registered_component_type
from dagster_components.lib import SlingReplicationCollection
from dagster_sling import SlingResource

import dagster as dg


@registered_component_type(name="debug_sling_replication")
class DebugSlingReplicationComponent(SlingReplicationCollection):
def execute(
self, context: dg.AssetExecutionContext, sling: SlingResource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@
ComponentLoadContext,
DefaultComponentScaffolder,
ResolvableSchema,
registered_component_type,
)

class ShellCommandSchema(ResolvableSchema):
...

@registered_component_type
class ShellCommand(Component):
"""COMPONENT SUMMARY HERE.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
FieldResolver,
ResolutionContext,
ResolvableSchema,
registered_component_type,
)

import dagster as dg
Expand All @@ -27,7 +26,6 @@ def resolve_api_key(
return MyApiClient(api_key=schema.api_key)


@registered_component_type(name="my_component")
@dataclass
class MyComponent(Component):
# FieldResolver specifies a function used to map input matching the schema
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
from dagster_components import (
Component,
ComponentLoadContext,
ResolvableSchema,
registered_component_type,
)
from dagster_components import Component, ComponentLoadContext, ResolvableSchema
from pydantic import BaseModel

import dagster as dg


@registered_component_type(name="shell_command")
class ShellCommand(Component):
@classmethod
def get_schema(cls) -> type[ResolvableSchema]: ...
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
from collections.abc import Sequence

from dagster_components import (
AssetSpecSchema,
Component,
ResolvableSchema,
registered_component_type,
)
from dagster_components import AssetSpecSchema, Component, ResolvableSchema


class ShellCommandParams(ResolvableSchema):
path: str
asset_specs: Sequence[AssetSpecSchema]


@registered_component_type(name="shell_command")
class ShellCommand(Component): ...
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
ComponentLoadContext,
ResolutionContext,
ResolvableSchema,
registered_component_type,
)

import dagster as dg
Expand All @@ -26,7 +25,6 @@ def resolve_asset_specs(
return context.resolve_value(schema.asset_specs)


@registered_component_type(name="shell_command")
@dataclass
class ShellCommand(Component):
"""Models a shell script as a Dagster asset."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
Component,
ComponentLoadContext,
ResolvableSchema,
registered_component_type,
)

import dagster as dg
Expand All @@ -17,7 +16,6 @@ class ShellScriptSchema(ResolvableSchema):
asset_specs: Sequence[AssetSpecSchema]


@registered_component_type(name="shell_command")
@dataclass
class ShellCommand(Component):
script_path: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
Component,
ComponentLoadContext,
ResolvableSchema,
registered_component_type,
)

import dagster as dg
Expand All @@ -16,7 +15,6 @@ class ShellScriptSchema(ResolvableSchema):
asset_specs: Sequence[AssetSpecSchema]


@registered_component_type(name="shell_command")
class ShellCommand(Component):
"""Models a shell script as a Dagster asset."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
ComponentLoadContext,
ResolutionContext,
ResolvableSchema,
registered_component_type,
)

import dagster as dg
Expand All @@ -27,7 +26,6 @@ def resolve_asset_specs(
return context.resolve_value(schema.asset_specs)


@registered_component_type(name="shell_command")
@dataclass
class ShellCommand(Component):
script_path: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
ComponentScaffolder,
ComponentScaffoldRequest,
ResolvableSchema,
registered_component_type,
scaffold_component_yaml,
)

Expand Down Expand Up @@ -46,7 +45,6 @@ class ShellScriptSchema(ResolvableSchema):
asset_specs: Sequence[AssetSpecSchema]


@registered_component_type(name="shell_command")
@dataclass
class ShellCommand(Component):
"""Models a shell script as a Dagster asset."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
Component as Component,
ComponentLoadContext as ComponentLoadContext,
component as component,
registered_component_type as registered_component_type,
)
from dagster_components.core.component_defs_builder import (
build_component_defs as build_component_defs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dagster import _check as check
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.errors import DagsterError
from dagster._utils import pushd, snakecase
from dagster._utils import pushd
from typing_extensions import Self

from dagster_components.core.component_key import ComponentKey
Expand Down Expand Up @@ -310,55 +310,9 @@ def build_defs(self, context: ComponentLoadContext) -> Definitions:
return importlib.import_module(component_module_name)


COMPONENT_REGISTRY_KEY_ATTR = "__dagster_component_registry_key"
COMPONENT_LOADER_FN_ATTR = "__dagster_component_loader_fn"


def registered_component_type(
cls: Optional[type[Component]] = None, *, name: Optional[str] = None
) -> Any:
"""Decorator for registering a component type. You must annotate a component
type with this decorator in order for it to be inspectable and loaded by tools.
Args:
cls (Optional[Type[Component]]): The target of the decorator: the component class
to register. The class must inherit from Component.
name (Optional[str]): The name to register the component type under. If not
provided, the name will be the snake-cased version of the class name. The
name is used as a key in operations like scaffolding and loading.
"""
if cls is None:

def wrapper(actual_cls: type[Component]) -> type[Component]:
check.inst_param(actual_cls, "actual_cls", type)
setattr(
actual_cls,
COMPONENT_REGISTRY_KEY_ATTR,
name or snakecase(actual_cls.__name__),
)
return actual_cls

return wrapper
else:
# called without params
check.inst_param(cls, "cls", type)
setattr(cls, COMPONENT_REGISTRY_KEY_ATTR, name or snakecase(cls.__name__))
return cls


def is_registered_component_type(cls: type) -> bool:
return hasattr(cls, COMPONENT_REGISTRY_KEY_ATTR)


def get_component_type_name(component_type: type[Component]) -> str:
check.param_invariant(
is_registered_component_type(component_type),
"component_type",
"Expected a registered component. Use @component to register a component.",
)
return getattr(component_type, COMPONENT_REGISTRY_KEY_ATTR)


T_Component = TypeVar("T_Component", bound=Component)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from pydantic.dataclasses import dataclass

from dagster_components import Component, ComponentLoadContext, FieldResolver
from dagster_components.core.component import registered_component_type
from dagster_components.core.schema.base import ResolvableSchema
from dagster_components.core.schema.metadata import ResolvableFieldInfo
from dagster_components.core.schema.objects import (
Expand Down Expand Up @@ -52,7 +51,6 @@ def resolve_translator(
)


@registered_component_type(name="dbt_project")
@dataclass(config=ConfigDict(arbitrary_types_allowed=True)) # omits translator prop from schema
class DbtProjectComponent(Component):
"""Expose a DBT project to Dagster as a set of assets."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,14 @@
from pydantic import Field
from pydantic.dataclasses import dataclass

from dagster_components import (
Component,
ComponentLoadContext,
ResolvableSchema,
registered_component_type,
)
from dagster_components import Component, ComponentLoadContext, ResolvableSchema
from dagster_components.lib.definitions_component.scaffolder import DefinitionsComponentScaffolder


class DefinitionsParamSchema(ResolvableSchema):
definitions_path: Optional[str] = None


@registered_component_type(name="definitions")
@dataclass
class DefinitionsComponent(Component):
"""Wraps an arbitrary set of Dagster definitions."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,7 @@
from pydantic.dataclasses import dataclass

from dagster_components import FieldResolver
from dagster_components.core.component import (
Component,
ComponentLoadContext,
registered_component_type,
)
from dagster_components.core.component import Component, ComponentLoadContext
from dagster_components.core.schema.base import ResolvableSchema
from dagster_components.core.schema.context import ResolutionContext
from dagster_components.core.schema.objects import AssetSpecSchema
Expand Down Expand Up @@ -47,7 +43,6 @@ def resolve_specs_by_path(
return {spec.path: spec.assets for spec in context.resolve_value(schema.scripts)}


@registered_component_type(name="pipes_subprocess_script_collection")
@dataclass
class PipesSubprocessScriptCollection(Component):
"""Assets that wrap Python scripts executed with Dagster's PipesSubprocessClient."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from typing_extensions import TypeAlias

from dagster_components import Component, ComponentLoadContext, FieldResolver
from dagster_components.core.component import registered_component_type
from dagster_components.core.component_scaffolder import ComponentScaffolder
from dagster_components.core.schema.base import ResolvableSchema
from dagster_components.core.schema.context import ResolutionContext
Expand Down Expand Up @@ -87,7 +86,6 @@ def resolve_resource(
)


@registered_component_type
@dataclass
class SlingReplicationCollection(Component):
"""Expose one or more Sling replications to Dagster as assets."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.execution.context.asset_execution_context import AssetExecutionContext

from dagster_components import Component, ComponentLoadContext, registered_component_type
from dagster_components import Component, ComponentLoadContext
from dagster_components.core.component_scaffolder import DefaultComponentScaffolder


@registered_component_type(name="all_metadata_empty_asset")
class AllMetadataEmptyAsset(Component):
@classmethod
def get_scaffolder(cls) -> DefaultComponentScaffolder:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dagster._core.execution.context.asset_execution_context import AssetExecutionContext
from pydantic import Field

from dagster_components import Component, ComponentLoadContext, registered_component_type
from dagster_components import Component, ComponentLoadContext
from dagster_components.core.component_scaffolder import DefaultComponentScaffolder
from dagster_components.core.schema.base import ResolvableSchema
from dagster_components.core.schema.metadata import ResolvableFieldInfo
Expand All @@ -31,7 +31,6 @@ class ComplexAssetSchema(ResolvableSchema["ComplexSchemaAsset"]):
asset_post_processors: Optional[Sequence[AssetPostProcessorSchema]] = None


@registered_component_type(name="complex_schema_asset")
class ComplexSchemaAsset(Component):
"""An asset that has a complex schema."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dagster._core.execution.context.asset_execution_context import AssetExecutionContext
from pydantic import BaseModel

from dagster_components import Component, ComponentLoadContext, registered_component_type
from dagster_components import Component, ComponentLoadContext
from dagster_components.core.component_scaffolder import (
ComponentScaffolder,
DefaultComponentScaffolder,
Expand All @@ -16,7 +16,6 @@ class SimpleAssetSchema(BaseModel):
value: str


@registered_component_type(name="simple_asset")
class SimpleAsset(Component):
"""A simple asset that returns a constant string value."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dagster._core.pipes.subprocess import PipesSubprocessClient
from pydantic import BaseModel

from dagster_components import Component, ComponentLoadContext, registered_component_type
from dagster_components import Component, ComponentLoadContext
from dagster_components.core.component_scaffolder import (
ComponentScaffolder,
ComponentScaffoldRequest,
Expand Down Expand Up @@ -46,7 +46,6 @@ def scaffold(
"""


@registered_component_type(name="simple_pipes_script_asset")
class SimplePipesScriptAsset(Component):
"""A simple asset that runs a Python script with the Pipes subprocess client.
Expand Down
Loading

0 comments on commit 25abe4f

Please sign in to comment.