Skip to content

Commit

Permalink
[dagster-airlift] Standalone sensor def creation fxn
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Nov 13, 2024
1 parent 8196ec1 commit a68a877
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
AssetEvent as AssetEvent,
DagsterEventTransformerFn as DagsterEventTransformerFn,
)
from .sensor.sensor_builder import (
build_airflow_polling_sensor_defs as build_airflow_polling_sensor_defs,
)
from .sensor.sensor_builder import build_airflow_polling_sensor as build_airflow_polling_sensor
from .top_level_dag_def_api import (
assets_with_dag_mappings as assets_with_dag_mappings,
assets_with_task_mappings as assets_with_task_mappings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
)
from dagster_airlift.core.sensor.sensor_builder import (
DEFAULT_AIRFLOW_SENSOR_INTERVAL_SECONDS,
build_airflow_polling_sensor_defs,
build_airflow_polling_sensor,
)
from dagster_airlift.core.serialization.compute import DagSelectorFn, compute_serialized_data
from dagster_airlift.core.serialization.defs_construction import (
Expand Down Expand Up @@ -223,11 +223,15 @@ def only_include_dag(dag_info: DagInfo) -> bool:

return Definitions.merge(
defs_with_airflow_assets,
build_airflow_polling_sensor_defs(
mapped_assets=mapped_and_constructed_assets,
airflow_instance=airflow_instance,
minimum_interval_seconds=sensor_minimum_interval_seconds,
event_transformer_fn=event_transformer_fn,
Definitions(
sensors=[
build_airflow_polling_sensor(
mapped_assets=mapped_and_constructed_assets,
airflow_instance=airflow_instance,
minimum_interval_seconds=sensor_minimum_interval_seconds,
event_transformer_fn=event_transformer_fn,
)
]
),
)

Expand Down Expand Up @@ -280,10 +284,14 @@ def build_full_automapped_dags_from_airflow_instance(
resolved_defs = replace_assets_in_defs(defs=defs, assets=airflow_assets)
return Definitions.merge(
resolved_defs,
build_airflow_polling_sensor_defs(
minimum_interval_seconds=sensor_minimum_interval_seconds,
mapped_assets=mapped_assets,
airflow_instance=airflow_instance,
Definitions(
sensors=[
build_airflow_polling_sensor(
minimum_interval_seconds=sensor_minimum_interval_seconds,
mapped_assets=mapped_assets,
airflow_instance=airflow_instance,
)
]
),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
AssetMaterialization,
DefaultSensorStatus,
RunRequest,
SensorDefinition,
SensorEvaluationContext,
SensorResult,
_check as check,
sensor,
)
from dagster._core.definitions.asset_check_evaluation import AssetCheckEvaluation
from dagster._core.definitions.asset_selection import AssetSelection
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.events import AssetObservation
from dagster._core.definitions.repository_definition.repository_definition import (
RepositoryDefinition,
Expand Down Expand Up @@ -78,13 +78,13 @@ def check_keys_for_asset_keys(
yield check_spec.key


def build_airflow_polling_sensor_defs(
def build_airflow_polling_sensor(
*,
mapped_assets: Iterable[MappedAsset],
airflow_instance: AirflowInstance,
event_transformer_fn: DagsterEventTransformerFn = default_event_transformer,
minimum_interval_seconds: int = DEFAULT_AIRFLOW_SENSOR_INTERVAL_SECONDS,
) -> Definitions:
) -> SensorDefinition:
"""The constructed sensor polls the Airflow instance for activity, and inserts asset events into Dagster's event log.
The sensor decides which Airflow dags and tasks to monitor by inspecting the metadata of the passed-in Definitions object `mapped_defs`.
Expand Down Expand Up @@ -190,7 +190,7 @@ def airflow_dag_sensor(context: SensorEvaluationContext) -> SensorResult:
else None,
)

return Definitions(sensors=[airflow_dag_sensor])
return airflow_dag_sensor


def sorted_asset_events(
Expand Down

0 comments on commit a68a877

Please sign in to comment.