Skip to content

Commit

Permalink
[dagster-airlift] Add initial stage to add specs to federation tutorial
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Nov 12, 2024
1 parent a42d89e commit 32e86ba
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 2 deletions.
2 changes: 1 addition & 1 deletion examples/airlift-federation-tutorial/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ airflow_setup: wipe
mkdir -p $$DAGSTER_HOME
chmod +x $(MAKEFILE_DIR)/scripts/airflow_setup.sh
$(MAKEFILE_DIR)/scripts/airflow_setup.sh $(MAKEFILE_DIR)/airlift_federation_tutorial/upstream_airflow_dags $(UPSTREAM_AIRFLOW_HOME) 8081
$(MAKEFILE_DIR)/scripts/airflow_setup.sh $(MAKEFILE_DIR)/airlift_federation_tutorial/upstream_airflow_dags $(DOWNSTREAM_AIRFLOW_HOME) 8082
$(MAKEFILE_DIR)/scripts/airflow_setup.sh $(MAKEFILE_DIR)/airlift_federation_tutorial/downstream_airflow_dags $(DOWNSTREAM_AIRFLOW_HOME) 8082

upstream_airflow_run:
AIRFLOW_HOME=$(UPSTREAM_AIRFLOW_HOME) airflow standalone
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from dagster import Definitions
from dagster._core.definitions.asset_spec import replace_attributes
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
build_airflow_polling_sensor,
load_airflow_dag_asset_specs,
)

upstream_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8081",
username="admin",
password="admin",
),
name="upstream",
)

downstream_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8082",
username="admin",
password="admin",
),
name="downstream",
)

load_customers_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=upstream_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "load_customers",
)
)
)
customer_metrics_dag_asset = replace_attributes(
next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=downstream_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
)
# Add a dependency on the load_customers_dag_asset
),
deps=[load_customers_dag_asset],
)

upstream_sensor = build_airflow_polling_sensor(
mapped_assets=[load_customers_dag_asset],
airflow_instance=upstream_airflow_instance,
)
downstream_sensor = build_airflow_polling_sensor(
mapped_assets=[customer_metrics_dag_asset],
airflow_instance=downstream_airflow_instance,
)

defs = Definitions(
assets=[load_customers_dag_asset, customer_metrics_dag_asset],
sensors=[upstream_sensor, downstream_sensor],
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os
import shutil
import subprocess
from contextlib import contextmanager
from pathlib import Path
from typing import Generator

Expand Down Expand Up @@ -66,3 +68,23 @@ def dagster_fixture(
finally:
if process:
process.terminate()


@contextmanager
def replace_file(old_file: Path, new_file: Path) -> Generator[None, None, None]:
backup_file = old_file.with_suffix(old_file.suffix + ".bak")
try:
if old_file.exists():
shutil.copy2(old_file, backup_file)
if new_file.exists():
shutil.copy2(new_file, old_file)
else:
raise FileNotFoundError(f"New file {new_file} not found")
yield
finally:
if backup_file.exists():
shutil.copy2(backup_file, old_file)
backup_file.unlink()


ORIG_DEFS_FILE = makefile_dir() / "airlift_federation_tutorial" / "dagster_defs" / "definitions.py"
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import subprocess
from typing import Generator

import pytest
import requests
from airlift_federation_tutorial_tests.conftest import ORIG_DEFS_FILE, makefile_dir, replace_file
from dagster_airlift.in_airflow.gql_queries import ASSET_NODES_QUERY
from dagster_airlift.test.shared_fixtures import stand_up_dagster

STAGE_FILE = ORIG_DEFS_FILE.parent / "stages" / "with_specs.py"


@pytest.fixture
def completed_stage() -> Generator[None, None, None]:
with replace_file(ORIG_DEFS_FILE, STAGE_FILE):
yield


@pytest.fixture(name="dagster_dev")
def dagster_fixture(
upstream_airflow: subprocess.Popen, downstream_airflow: subprocess.Popen, completed_stage: None
) -> Generator[subprocess.Popen, None, None]:
process = None
try:
with stand_up_dagster(
dagster_dev_cmd=["make", "-C", str(makefile_dir()), "dagster_run"],
port=3000,
) as process:
yield process
finally:
if process:
process.terminate()


def test_with_specs(dagster_dev: subprocess.Popen) -> None:
response = requests.post(
# Timeout in seconds
"http://localhost:3000/graphql",
json={"query": ASSET_NODES_QUERY},
timeout=3,
)
assert response.status_code == 200
assert len(response.json()["data"]["assetNodes"]) == 2
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
AirflowInstance as AirflowInstance,
DagSelectorFn as DagSelectorFn,
build_defs_from_airflow_instance as build_defs_from_airflow_instance,
load_airflow_dag_asset_specs as load_airflow_dag_asset_specs,
)
from .multiple_tasks import (
TaskHandleDict as TaskHandleDict,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,12 @@ def enrich_airflow_mapped_assets(
def load_airflow_dag_asset_specs(
airflow_instance: AirflowInstance,
mapped_assets: Optional[Iterable[Union[AssetSpec, AssetsDefinition]]] = None,
dag_selector_fn: Optional[DagSelectorFn] = None,
) -> Sequence[AssetSpec]:
"""Load asset specs for Airflow DAGs from the provided :py:class:`AirflowInstance`, and link upstreams from mapped assets."""
serialized_data = AirflowInstanceDefsLoader(
airflow_instance=airflow_instance, mapped_assets=mapped_assets or []
airflow_instance=airflow_instance,
mapped_assets=mapped_assets or [],
dag_selector_fn=dag_selector_fn,
).get_or_fetch_state()
return list(spec_iterator(construct_dag_assets_defs(serialized_data)))

0 comments on commit 32e86ba

Please sign in to comment.