Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-airlift][federation-tutorial] Add initial stage to add specs to federation tutorial #25861

Merged
merged 1 commit into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/airlift-federation-tutorial/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ airflow_setup: wipe
mkdir -p $$DAGSTER_HOME
chmod +x $(MAKEFILE_DIR)/scripts/airflow_setup.sh
$(MAKEFILE_DIR)/scripts/airflow_setup.sh $(MAKEFILE_DIR)/airlift_federation_tutorial/upstream_airflow_dags $(UPSTREAM_AIRFLOW_HOME) 8081
$(MAKEFILE_DIR)/scripts/airflow_setup.sh $(MAKEFILE_DIR)/airlift_federation_tutorial/upstream_airflow_dags $(DOWNSTREAM_AIRFLOW_HOME) 8082
$(MAKEFILE_DIR)/scripts/airflow_setup.sh $(MAKEFILE_DIR)/airlift_federation_tutorial/downstream_airflow_dags $(DOWNSTREAM_AIRFLOW_HOME) 8082

upstream_airflow_run:
AIRFLOW_HOME=$(UPSTREAM_AIRFLOW_HOME) airflow standalone
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from dagster import Definitions
from dagster._core.definitions.asset_spec import replace_attributes
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
build_airflow_polling_sensor,
load_airflow_dag_asset_specs,
)

upstream_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8081",
username="admin",
password="admin",
),
name="upstream",
)

downstream_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8082",
username="admin",
password="admin",
),
name="downstream",
)

load_customers_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=upstream_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "load_customers",
)
)
)
customer_metrics_dag_asset = replace_attributes(
next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=downstream_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
)
# Add a dependency on the load_customers_dag_asset
),
deps=[load_customers_dag_asset],
)

upstream_sensor = build_airflow_polling_sensor(
mapped_assets=[load_customers_dag_asset],
airflow_instance=upstream_airflow_instance,
)
downstream_sensor = build_airflow_polling_sensor(
mapped_assets=[customer_metrics_dag_asset],
airflow_instance=downstream_airflow_instance,
)

defs = Definitions(
assets=[load_customers_dag_asset, customer_metrics_dag_asset],
sensors=[upstream_sensor, downstream_sensor],
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os
import shutil
import subprocess
from contextlib import contextmanager
from pathlib import Path
from typing import Generator

Expand Down Expand Up @@ -66,3 +68,23 @@ def dagster_fixture(
finally:
if process:
process.terminate()


@contextmanager
def replace_file(old_file: Path, new_file: Path) -> Generator[None, None, None]:
backup_file = old_file.with_suffix(old_file.suffix + ".bak")
try:
if old_file.exists():
shutil.copy2(old_file, backup_file)
if new_file.exists():
shutil.copy2(new_file, old_file)
else:
raise FileNotFoundError(f"New file {new_file} not found")
yield
finally:
if backup_file.exists():
shutil.copy2(backup_file, old_file)
backup_file.unlink()


ORIG_DEFS_FILE = makefile_dir() / "airlift_federation_tutorial" / "dagster_defs" / "definitions.py"
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import subprocess
from typing import Generator

import pytest
import requests
from airlift_federation_tutorial_tests.conftest import ORIG_DEFS_FILE, makefile_dir, replace_file
from dagster_airlift.in_airflow.gql_queries import ASSET_NODES_QUERY
from dagster_airlift.test.shared_fixtures import stand_up_dagster

STAGE_FILE = ORIG_DEFS_FILE.parent / "stages" / "with_specs.py"


@pytest.fixture
def completed_stage() -> Generator[None, None, None]:
with replace_file(ORIG_DEFS_FILE, STAGE_FILE):
yield


@pytest.fixture(name="dagster_dev")
def dagster_fixture(
upstream_airflow: subprocess.Popen, downstream_airflow: subprocess.Popen, completed_stage: None
) -> Generator[subprocess.Popen, None, None]:
process = None
try:
with stand_up_dagster(
dagster_dev_cmd=["make", "-C", str(makefile_dir()), "dagster_run"],
port=3000,
) as process:
yield process
finally:
if process:
process.terminate()


def test_with_specs(dagster_dev: subprocess.Popen) -> None:
response = requests.post(
# Timeout in seconds
"http://localhost:3000/graphql",
json={"query": ASSET_NODES_QUERY},
timeout=3,
)
assert response.status_code == 200
assert len(response.json()["data"]["assetNodes"]) == 2
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
AirflowInstance as AirflowInstance,
DagSelectorFn as DagSelectorFn,
build_defs_from_airflow_instance as build_defs_from_airflow_instance,
load_airflow_dag_asset_specs as load_airflow_dag_asset_specs,
)
from .multiple_tasks import (
TaskHandleDict as TaskHandleDict,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,12 @@ def enrich_airflow_mapped_assets(
def load_airflow_dag_asset_specs(
airflow_instance: AirflowInstance,
mapped_assets: Optional[Iterable[Union[AssetSpec, AssetsDefinition]]] = None,
dag_selector_fn: Optional[DagSelectorFn] = None,
) -> Sequence[AssetSpec]:
"""Load asset specs for Airflow DAGs from the provided :py:class:`AirflowInstance`, and link upstreams from mapped assets."""
serialized_data = AirflowInstanceDefsLoader(
airflow_instance=airflow_instance, mapped_assets=mapped_assets or []
airflow_instance=airflow_instance,
mapped_assets=mapped_assets or [],
dag_selector_fn=dag_selector_fn,
).get_or_fetch_state()
return list(spec_iterator(construct_dag_assets_defs(serialized_data)))