From e23ec9ccdabdbeaf80741f1125d6e8c345c96b67 Mon Sep 17 00:00:00 2001 From: Chris DeCarolis Date: Mon, 11 Nov 2024 16:34:46 -0800 Subject: [PATCH] [dagster-airlift] Standalone sensor def creation fxn --- .../dagster_airlift/core/__init__.py | 4 +-- .../dagster_airlift/core/load_defs.py | 28 ++++++++++++------- .../core/sensor/sensor_builder.py | 8 +++--- 3 files changed, 23 insertions(+), 17 deletions(-) diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/__init__.py b/examples/experimental/dagster-airlift/dagster_airlift/core/__init__.py index bae86bc9602d1..d09a25a1136ce 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/__init__.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/__init__.py @@ -18,9 +18,7 @@ AssetEvent as AssetEvent, DagsterEventTransformerFn as DagsterEventTransformerFn, ) -from .sensor.sensor_builder import ( - build_airflow_polling_sensor_defs as build_airflow_polling_sensor_defs, -) +from .sensor.sensor_builder import build_airflow_polling_sensor as build_airflow_polling_sensor from .top_level_dag_def_api import ( assets_with_dag_mappings as assets_with_dag_mappings, assets_with_task_mappings as assets_with_task_mappings, diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py b/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py index be9dd6791c519..3e2044a2a8619 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py @@ -19,7 +19,7 @@ ) from dagster_airlift.core.sensor.sensor_builder import ( DEFAULT_AIRFLOW_SENSOR_INTERVAL_SECONDS, - build_airflow_polling_sensor_defs, + build_airflow_polling_sensor, ) from dagster_airlift.core.serialization.compute import DagSelectorFn, compute_serialized_data from dagster_airlift.core.serialization.defs_construction import ( @@ -223,11 +223,15 @@ def only_include_dag(dag_info: DagInfo) -> bool: return Definitions.merge( defs_with_airflow_assets, - build_airflow_polling_sensor_defs( - mapped_assets=mapped_and_constructed_assets, - airflow_instance=airflow_instance, - minimum_interval_seconds=sensor_minimum_interval_seconds, - event_transformer_fn=event_transformer_fn, + Definitions( + sensors=[ + build_airflow_polling_sensor( + mapped_assets=mapped_and_constructed_assets, + airflow_instance=airflow_instance, + minimum_interval_seconds=sensor_minimum_interval_seconds, + event_transformer_fn=event_transformer_fn, + ) + ] ), ) @@ -280,10 +284,14 @@ def build_full_automapped_dags_from_airflow_instance( resolved_defs = replace_assets_in_defs(defs=defs, assets=airflow_assets) return Definitions.merge( resolved_defs, - build_airflow_polling_sensor_defs( - minimum_interval_seconds=sensor_minimum_interval_seconds, - mapped_assets=mapped_assets, - airflow_instance=airflow_instance, + Definitions( + sensors=[ + build_airflow_polling_sensor( + minimum_interval_seconds=sensor_minimum_interval_seconds, + mapped_assets=mapped_assets, + airflow_instance=airflow_instance, + ) + ] ), ) diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/sensor/sensor_builder.py b/examples/experimental/dagster-airlift/dagster_airlift/core/sensor/sensor_builder.py index 4b92b262e45b4..b56ae6e4781e8 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/sensor/sensor_builder.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/sensor/sensor_builder.py @@ -7,6 +7,7 @@ AssetMaterialization, DefaultSensorStatus, RunRequest, + SensorDefinition, SensorEvaluationContext, SensorResult, _check as check, @@ -14,7 +15,6 @@ ) from dagster._core.definitions.asset_check_evaluation import AssetCheckEvaluation from dagster._core.definitions.asset_selection import AssetSelection -from dagster._core.definitions.definitions_class import Definitions from dagster._core.definitions.events import AssetObservation from dagster._core.definitions.repository_definition.repository_definition import ( RepositoryDefinition, @@ -78,13 +78,13 @@ def check_keys_for_asset_keys( yield check_spec.key -def build_airflow_polling_sensor_defs( +def build_airflow_polling_sensor( *, mapped_assets: Iterable[MappedAsset], airflow_instance: AirflowInstance, event_transformer_fn: DagsterEventTransformerFn = default_event_transformer, minimum_interval_seconds: int = DEFAULT_AIRFLOW_SENSOR_INTERVAL_SECONDS, -) -> Definitions: +) -> SensorDefinition: """The constructed sensor polls the Airflow instance for activity, and inserts asset events into Dagster's event log. The sensor decides which Airflow dags and tasks to monitor by inspecting the metadata of the passed-in Definitions object `mapped_defs`. @@ -190,7 +190,7 @@ def airflow_dag_sensor(context: SensorEvaluationContext) -> SensorResult: else None, ) - return Definitions(sensors=[airflow_dag_sensor]) + return airflow_dag_sensor def sorted_asset_events(