From 5e67354c8bf3d4fab89ee8f95298ace31a26b928 Mon Sep 17 00:00:00 2001 From: Chris DeCarolis Date: Tue, 12 Nov 2024 13:30:06 -0800 Subject: [PATCH] [dagster-airlift] Implement dags --- .../airlift/federation-tutorial/setup.mdx | 8 +- .../airlift-federation-tutorial/.gitignore | 1 + examples/airlift-federation-tutorial/Makefile | 22 ++-- .../airlift_federation_tutorial/__init__.py | 0 .../airlift_federation_tutorial/constants.py | 16 +++ .../dagster_defs/stages/executable_and_da.py | 28 ++--- .../dagster_defs/stages/with_specs.py | 22 ++-- .../downstream_airflow_dags/dags.py | 6 -- .../metrics_airflow_dags/dags.py | 67 ++++++++++++ .../upstream_airflow_dags/dags.py | 6 -- .../warehouse_airflow_dags/dags.py | 55 ++++++++++ .../conftest.py | 41 +++++-- .../test_executable_stage.py | 2 +- .../test_instances.py | 30 ++++++ .../test_load.py | 39 ------- .../test_specs_stage.py | 2 +- .../raw_customers.csv | 100 ++++++++++++++++++ 17 files changed, 345 insertions(+), 100 deletions(-) create mode 100644 examples/airlift-federation-tutorial/airlift_federation_tutorial/__init__.py create mode 100644 examples/airlift-federation-tutorial/airlift_federation_tutorial/constants.py delete mode 100644 examples/airlift-federation-tutorial/airlift_federation_tutorial/downstream_airflow_dags/dags.py create mode 100644 examples/airlift-federation-tutorial/airlift_federation_tutorial/metrics_airflow_dags/dags.py delete mode 100644 examples/airlift-federation-tutorial/airlift_federation_tutorial/upstream_airflow_dags/dags.py create mode 100644 examples/airlift-federation-tutorial/airlift_federation_tutorial/warehouse_airflow_dags/dags.py create mode 100644 examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_instances.py delete mode 100644 examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_load.py create mode 100644 examples/airlift-federation-tutorial/raw_customers.csv diff --git a/docs/content/integrations/airlift/federation-tutorial/setup.mdx b/docs/content/integrations/airlift/federation-tutorial/setup.mdx index 29bbc87380c78..38febe0c0df64 100644 --- a/docs/content/integrations/airlift/federation-tutorial/setup.mdx +++ b/docs/content/integrations/airlift/federation-tutorial/setup.mdx @@ -44,16 +44,16 @@ Finally, let's run the two Airflow instances with environment variables set: In one shell run: ```bash -make upstream_airflow_run +make warehouse_airflow_run ``` In a separate shell, run: ```bash -make downstream_airflow_run +make metrics_airflow_run ``` -This will run two Airflow Web UIs, one for each Airflow instance. You should now be able to access the upstream Airflow UI at `http://localhost:8081`, with the default username and password set to `admin`. +This will run two Airflow Web UIs, one for each Airflow instance. You should now be able to access the warehouse Airflow UI at `http://localhost:8081`, with the default username and password set to `admin`. You should be able to see the `load_customers` DAG in the Airflow UI. @@ -64,7 +64,7 @@ width={1484} height={300} /> -Similarly, you should be able to access the downstream Airflow UI at `http://localhost:8082`, with the default username and password set to `admin`. +Similarly, you should be able to access the metrics Airflow UI at `http://localhost:8082`, with the default username and password set to `admin`. You should be able to see the `customer_metrics` DAG in the Airflow UI. diff --git a/examples/airlift-federation-tutorial/.gitignore b/examples/airlift-federation-tutorial/.gitignore index a1ff4089436e8..f9f21e96a888e 100644 --- a/examples/airlift-federation-tutorial/.gitignore +++ b/examples/airlift-federation-tutorial/.gitignore @@ -1,6 +1,7 @@ *.**airflow_home* *.dagster_home* customers.csv +*.db # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/examples/airlift-federation-tutorial/Makefile b/examples/airlift-federation-tutorial/Makefile index a87409fafd207..e5c9048a1d566 100644 --- a/examples/airlift-federation-tutorial/Makefile +++ b/examples/airlift-federation-tutorial/Makefile @@ -7,8 +7,8 @@ endef MAKEFILE_DIR := $(GET_MAKEFILE_DIR) export TUTORIAL_EXAMPLE_DIR := $(MAKEFILE_DIR) export DAGSTER_HOME := $(MAKEFILE_DIR)/.dagster_home -export UPSTREAM_AIRFLOW_HOME := $(MAKEFILE_DIR)/.upstream_airflow_home -export DOWNSTREAM_AIRFLOW_HOME := $(MAKEFILE_DIR)/.downstream_airflow_home +export WAREHOUSE_AIRFLOW_HOME := $(MAKEFILE_DIR)/.warehouse_airflow_home +export METRICS_AIRFLOW_HOME := $(MAKEFILE_DIR)/.metrics_airflow_home export DAGSTER_URL := http://localhost:3000 # Detect OS and use appropriate date command @@ -30,18 +30,18 @@ airflow_install: uv pip install -e $(MAKEFILE_DIR) airflow_setup: wipe - mkdir -p $$UPSTREAM_AIRFLOW_HOME - mkdir -p $$DOWNSTREAM_AIRFLOW_HOME + mkdir -p $$WAREHOUSE_AIRFLOW_HOME + mkdir -p $$METRICS_AIRFLOW_HOME mkdir -p $$DAGSTER_HOME chmod +x $(MAKEFILE_DIR)/scripts/airflow_setup.sh - $(MAKEFILE_DIR)/scripts/airflow_setup.sh $(MAKEFILE_DIR)/airlift_federation_tutorial/upstream_airflow_dags $(UPSTREAM_AIRFLOW_HOME) 8081 - $(MAKEFILE_DIR)/scripts/airflow_setup.sh $(MAKEFILE_DIR)/airlift_federation_tutorial/downstream_airflow_dags $(DOWNSTREAM_AIRFLOW_HOME) 8082 + $(MAKEFILE_DIR)/scripts/airflow_setup.sh $(MAKEFILE_DIR)/airlift_federation_tutorial/warehouse_airflow_dags $(WAREHOUSE_AIRFLOW_HOME) 8081 + $(MAKEFILE_DIR)/scripts/airflow_setup.sh $(MAKEFILE_DIR)/airlift_federation_tutorial/metrics_airflow_dags $(METRICS_AIRFLOW_HOME) 8082 -upstream_airflow_run: - AIRFLOW_HOME=$(UPSTREAM_AIRFLOW_HOME) airflow standalone +warehouse_airflow_run: + AIRFLOW_HOME=$(WAREHOUSE_AIRFLOW_HOME) airflow standalone -downstream_airflow_run: - AIRFLOW_HOME=$(DOWNSTREAM_AIRFLOW_HOME) airflow standalone +metrics_airflow_run: + AIRFLOW_HOME=$(METRICS_AIRFLOW_HOME) airflow standalone dagster_run: dagster dev -m airlift_federation_tutorial.dagster_defs.definitions -p 3000 @@ -54,5 +54,5 @@ update_readme_snippets: python ../../scripts/update_readme_snippets.py $(MAKEFILE_DIR)/README.md wipe: - rm -rf $$UPSTREAM_AIRFLOW_HOME $$DOWNSTREAM_AIRFLOW_HOME $$DAGSTER_HOME + rm -rf $$WAREHOUSE_AIRFLOW_HOME $$METRICS_AIRFLOW_HOME $$DAGSTER_HOME diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial/__init__.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial/constants.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial/constants.py new file mode 100644 index 0000000000000..309399d2dc5e0 --- /dev/null +++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial/constants.py @@ -0,0 +1,16 @@ +from pathlib import Path + +FEDERATION_TUTORIAL_ROOT_DIR = Path(__file__).parent.parent +DUCKDB_PATH = FEDERATION_TUTORIAL_ROOT_DIR / "federation_tutorial.db" + +CUSTOMERS_CSV_PATH = FEDERATION_TUTORIAL_ROOT_DIR / "raw_customers.csv" +CUSTOMERS_COLS = [ + "id", + "first_name", + "last_name", +] +CUSTOMERS_SCHEMA = "raw_data" +CUSTOMERS_DB_NAME = "federation_tutorial" +CUSTOMERS_TABLE_NAME = "raw_customers" +METRICS_SCHEMA = "metrics" +METRICS_TABLE_NAME = "customer_count" diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/executable_and_da.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/executable_and_da.py index e6f6b044ab1bb..f9518e55343d3 100644 --- a/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/executable_and_da.py +++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/executable_and_da.py @@ -16,28 +16,28 @@ load_airflow_dag_asset_specs, ) -upstream_airflow_instance = AirflowInstance( +warehouse_airflow_instance = AirflowInstance( auth_backend=AirflowBasicAuthBackend( webserver_url="http://localhost:8081", username="admin", password="admin", ), - name="upstream", + name="warehouse", ) -downstream_airflow_instance = AirflowInstance( +metrics_airflow_instance = AirflowInstance( auth_backend=AirflowBasicAuthBackend( webserver_url="http://localhost:8082", username="admin", password="admin", ), - name="downstream", + name="metrics", ) load_customers_dag_asset = next( iter( load_airflow_dag_asset_specs( - airflow_instance=upstream_airflow_instance, + airflow_instance=warehouse_airflow_instance, dag_selector_fn=lambda dag: dag.dag_id == "load_customers", ) ) @@ -46,7 +46,7 @@ next( iter( load_airflow_dag_asset_specs( - airflow_instance=downstream_airflow_instance, + airflow_instance=metrics_airflow_instance, dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics", ) ) @@ -59,21 +59,21 @@ @multi_asset(specs=[customer_metrics_dag_asset]) def run_customer_metrics() -> MaterializeResult: - run_id = downstream_airflow_instance.trigger_dag("customer_metrics") - downstream_airflow_instance.wait_for_run_completion("customer_metrics", run_id) - if downstream_airflow_instance.get_run_state("customer_metrics", run_id) == "success": + run_id = metrics_airflow_instance.trigger_dag("customer_metrics") + metrics_airflow_instance.wait_for_run_completion("customer_metrics", run_id) + if metrics_airflow_instance.get_run_state("customer_metrics", run_id) == "success": return MaterializeResult(asset_key=customer_metrics_dag_asset.key) else: raise Exception("Dag run failed.") -upstream_sensor = build_airflow_polling_sensor( +warehouse_sensor = build_airflow_polling_sensor( mapped_assets=[load_customers_dag_asset], - airflow_instance=upstream_airflow_instance, + airflow_instance=warehouse_airflow_instance, ) -downstream_sensor = build_airflow_polling_sensor( +metrics_sensor = build_airflow_polling_sensor( mapped_assets=[customer_metrics_dag_asset], - airflow_instance=downstream_airflow_instance, + airflow_instance=metrics_airflow_instance, ) automation_sensor = AutomationConditionSensorDefinition( @@ -85,5 +85,5 @@ def run_customer_metrics() -> MaterializeResult: defs = Definitions( assets=[load_customers_dag_asset, run_customer_metrics], - sensors=[upstream_sensor, downstream_sensor], + sensors=[warehouse_sensor, metrics_sensor], ) diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/with_specs.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/with_specs.py index d2b2a4e632c0d..69a2888046618 100644 --- a/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/with_specs.py +++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/with_specs.py @@ -7,28 +7,28 @@ load_airflow_dag_asset_specs, ) -upstream_airflow_instance = AirflowInstance( +warehouse_airflow_instance = AirflowInstance( auth_backend=AirflowBasicAuthBackend( webserver_url="http://localhost:8081", username="admin", password="admin", ), - name="upstream", + name="warehouse", ) -downstream_airflow_instance = AirflowInstance( +metrics_airflow_instance = AirflowInstance( auth_backend=AirflowBasicAuthBackend( webserver_url="http://localhost:8082", username="admin", password="admin", ), - name="downstream", + name="metrics", ) load_customers_dag_asset = next( iter( load_airflow_dag_asset_specs( - airflow_instance=upstream_airflow_instance, + airflow_instance=warehouse_airflow_instance, dag_selector_fn=lambda dag: dag.dag_id == "load_customers", ) ) @@ -37,7 +37,7 @@ next( iter( load_airflow_dag_asset_specs( - airflow_instance=downstream_airflow_instance, + airflow_instance=metrics_airflow_instance, dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics", ) ) @@ -46,16 +46,16 @@ deps=[load_customers_dag_asset], ) -upstream_sensor = build_airflow_polling_sensor( +warehouse_sensor = build_airflow_polling_sensor( mapped_assets=[load_customers_dag_asset], - airflow_instance=upstream_airflow_instance, + airflow_instance=warehouse_airflow_instance, ) -downstream_sensor = build_airflow_polling_sensor( +metrics_sensor = build_airflow_polling_sensor( mapped_assets=[customer_metrics_dag_asset], - airflow_instance=downstream_airflow_instance, + airflow_instance=metrics_airflow_instance, ) defs = Definitions( assets=[load_customers_dag_asset, customer_metrics_dag_asset], - sensors=[upstream_sensor, downstream_sensor], + sensors=[warehouse_sensor, metrics_sensor], ) diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial/downstream_airflow_dags/dags.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial/downstream_airflow_dags/dags.py deleted file mode 100644 index 61beff7adbdd8..0000000000000 --- a/examples/airlift-federation-tutorial/airlift_federation_tutorial/downstream_airflow_dags/dags.py +++ /dev/null @@ -1,6 +0,0 @@ -from airflow import DAG - -with DAG( - dag_id="customer_metrics", -) as dag: - pass diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial/metrics_airflow_dags/dags.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial/metrics_airflow_dags/dags.py new file mode 100644 index 0000000000000..7574d9e5cf756 --- /dev/null +++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial/metrics_airflow_dags/dags.py @@ -0,0 +1,67 @@ +import os +from datetime import datetime +from typing import Union + +import duckdb +import pandas as pd +from airflow import DAG +from airflow.operators.python import PythonOperator +from airlift_federation_tutorial.constants import ( + CUSTOMERS_DB_NAME, + CUSTOMERS_SCHEMA, + CUSTOMERS_TABLE_NAME, + DUCKDB_PATH, + METRICS_SCHEMA, + METRICS_TABLE_NAME, +) + + +def calculate_customer_count() -> None: + os.environ["NO_PROXY"] = "*" + + con = duckdb.connect(str(DUCKDB_PATH)) + + result: Union[tuple, None] = con.execute( + f"SELECT COUNT(*) FROM {CUSTOMERS_DB_NAME}.{CUSTOMERS_SCHEMA}.{CUSTOMERS_TABLE_NAME}" + ).fetchone() + if not result: + raise ValueError("No customers found") + count_df = pd.DataFrame([{"date": datetime.now().date(), "total_customers": result[0]}]) # noqa: F841 # used by duckdb + + con.execute(f"CREATE SCHEMA IF NOT EXISTS {METRICS_SCHEMA}").fetchall() + + con.execute(f""" + CREATE TABLE IF NOT EXISTS {CUSTOMERS_DB_NAME}.{METRICS_SCHEMA}.{METRICS_TABLE_NAME} ( + date DATE, + total_customers INTEGER + ) + """).fetchall() + + con.execute( + f"INSERT INTO {CUSTOMERS_DB_NAME}.{METRICS_SCHEMA}.{METRICS_TABLE_NAME} SELECT * FROM count_df" + ).fetchall() + + con.close() + + +with DAG( + dag_id="customer_metrics", + is_paused_upon_creation=False, +) as dag: + count_task = PythonOperator( + task_id="calculate_customer_count", + python_callable=calculate_customer_count, + dag=dag, + ) + +for dag_id in ["orders_metrics", "products_metrics", "payments_metrics", "sales_metrics"]: + with DAG( + dag_id=dag_id, + is_paused_upon_creation=False, + ) as dag: + PythonOperator( + task_id="task", + python_callable=lambda: None, + dag=dag, + ) + globals()[dag_id] = dag diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial/upstream_airflow_dags/dags.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial/upstream_airflow_dags/dags.py deleted file mode 100644 index 2df9b05c87edc..0000000000000 --- a/examples/airlift-federation-tutorial/airlift_federation_tutorial/upstream_airflow_dags/dags.py +++ /dev/null @@ -1,6 +0,0 @@ -from airflow import DAG - -with DAG( - dag_id="load_customers", -) as dag: - pass diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial/warehouse_airflow_dags/dags.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial/warehouse_airflow_dags/dags.py new file mode 100644 index 0000000000000..b42ff8937a030 --- /dev/null +++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial/warehouse_airflow_dags/dags.py @@ -0,0 +1,55 @@ +import os + +import duckdb +import pandas as pd +from airflow import DAG +from airflow.operators.python import PythonOperator +from airlift_federation_tutorial.constants import ( + CUSTOMERS_COLS, + CUSTOMERS_CSV_PATH, + CUSTOMERS_DB_NAME, + CUSTOMERS_SCHEMA, + CUSTOMERS_TABLE_NAME, + DUCKDB_PATH, +) + + +def load_customers() -> None: + # https://github.com/apache/airflow/discussions/24463 + os.environ["NO_PROXY"] = "*" + df = pd.read_csv( # noqa: F841 # used by duckdb + CUSTOMERS_CSV_PATH, + names=CUSTOMERS_COLS, + ) + + # Connect to DuckDB and create a new table + con = duckdb.connect(str(DUCKDB_PATH)) + con.execute(f"CREATE SCHEMA IF NOT EXISTS {CUSTOMERS_SCHEMA}").fetchall() + con.execute( + f"CREATE TABLE IF NOT EXISTS {CUSTOMERS_DB_NAME}.{CUSTOMERS_SCHEMA}.{CUSTOMERS_TABLE_NAME} AS SELECT * FROM df" + ).fetchall() + con.close() + + +with DAG( + dag_id="load_customers", + is_paused_upon_creation=False, +) as dag: + PythonOperator( + task_id="load_customers_to_warehouse", + python_callable=load_customers, + dag=dag, + ) + +# Define some dummy DAGs to simulate a big Airflow instance +for dag_id in ["load_orders", "load_products", "load_payments", "load_sales"]: + with DAG( + dag_id=dag_id, + is_paused_upon_creation=False, + ) as dag: + PythonOperator( + task_id="task", + python_callable=lambda: None, + dag=dag, + ) + globals()[dag_id] = dag diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/conftest.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/conftest.py index d55b440fdf68c..37443b64565a3 100644 --- a/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/conftest.py +++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/conftest.py @@ -6,6 +6,7 @@ from typing import Generator import pytest +from dagster_airlift.core import AirflowBasicAuthBackend, AirflowInstance from dagster_airlift.test.shared_fixtures import stand_up_airflow, stand_up_dagster @@ -13,6 +14,25 @@ def makefile_dir() -> Path: return Path(__file__).parent.parent +def airflow_test_instance(port: int) -> AirflowInstance: + return AirflowInstance( + auth_backend=AirflowBasicAuthBackend( + webserver_url=f"http://localhost:{port}", + username="admin", + password="admin", + ), + name="test", + ) + + +def warehouse_instance() -> AirflowInstance: + return airflow_test_instance(8081) + + +def metrics_instance() -> AirflowInstance: + return airflow_test_instance(8082) + + @pytest.fixture(name="local_env") def local_env_fixture() -> Generator[None, None, None]: try: @@ -22,12 +42,12 @@ def local_env_fixture() -> Generator[None, None, None]: subprocess.run(["make", "wipe"], cwd=makefile_dir(), check=True) -@pytest.fixture(name="upstream_airflow") -def upstream_airflow_fixture(local_env: None) -> Generator[subprocess.Popen, None, None]: +@pytest.fixture(name="warehouse_airflow") +def warehouse_airflow_fixture(local_env: None) -> Generator[subprocess.Popen, None, None]: process = None try: with stand_up_airflow( - airflow_cmd=["make", "upstream_airflow_run"], + airflow_cmd=["make", "warehouse_airflow_run"], env=os.environ, cwd=makefile_dir(), port=8081, @@ -38,12 +58,12 @@ def upstream_airflow_fixture(local_env: None) -> Generator[subprocess.Popen, Non process.terminate() -@pytest.fixture(name="downstream_airflow") -def downstream_airflow_fixture(local_env: None) -> Generator[subprocess.Popen, None, None]: +@pytest.fixture(name="metrics_airflow") +def metrics_airflow_fixture(local_env: None) -> Generator[subprocess.Popen, None, None]: process = None try: with stand_up_airflow( - airflow_cmd=["make", "downstream_airflow_run"], + airflow_cmd=["make", "metrics_airflow_run"], env=os.environ, cwd=makefile_dir(), port=8082, @@ -56,7 +76,7 @@ def downstream_airflow_fixture(local_env: None) -> Generator[subprocess.Popen, N @pytest.fixture(name="dagster_dev") def dagster_fixture( - upstream_airflow: subprocess.Popen, downstream_airflow: subprocess.Popen + warehouse_airflow: subprocess.Popen, metrics_airflow: subprocess.Popen ) -> Generator[subprocess.Popen, None, None]: process = None try: @@ -88,3 +108,10 @@ def replace_file(old_file: Path, new_file: Path) -> Generator[None, None, None]: ORIG_DEFS_FILE = makefile_dir() / "airlift_federation_tutorial" / "dagster_defs" / "definitions.py" + + +def assert_successful_dag_run(af_instance: AirflowInstance, dag_id: str) -> None: + run_id = af_instance.trigger_dag(dag_id) + assert run_id is not None + af_instance.wait_for_run_completion(dag_id=dag_id, run_id=run_id) + assert af_instance.get_run_state(dag_id=dag_id, run_id=run_id) == "success" diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_executable_stage.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_executable_stage.py index 8b682d3c9404c..644449b1e19b9 100644 --- a/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_executable_stage.py +++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_executable_stage.py @@ -18,7 +18,7 @@ def completed_stage() -> Generator[None, None, None]: @pytest.fixture(name="dagster_dev") def dagster_fixture( - upstream_airflow: subprocess.Popen, downstream_airflow: subprocess.Popen, completed_stage: None + warehouse_airflow: subprocess.Popen, metrics_airflow: subprocess.Popen, completed_stage: None ) -> Generator[subprocess.Popen, None, None]: process = None try: diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_instances.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_instances.py new file mode 100644 index 0000000000000..c379d029f7972 --- /dev/null +++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_instances.py @@ -0,0 +1,30 @@ +import subprocess + +import requests +from airlift_federation_tutorial_tests.conftest import ( + assert_successful_dag_run, + metrics_instance, + warehouse_instance, +) +from dagster_airlift.in_airflow.gql_queries import VERIFICATION_QUERY + + +def test_load_warehouse(warehouse_airflow: subprocess.Popen) -> None: + af_instance = warehouse_instance() + assert len(af_instance.list_dags()) == 5 + assert_successful_dag_run(af_instance, "load_customers") + + +def test_load_metrics(metrics_airflow: subprocess.Popen) -> None: + assert len(metrics_instance().list_dags()) == 5 + assert_successful_dag_run(metrics_instance(), "customer_metrics") + + +def test_load_dagster(dagster_dev: subprocess.Popen) -> None: + response = requests.post( + # Timeout in seconds + "http://localhost:3000/graphql", + json={"query": VERIFICATION_QUERY}, + timeout=3, + ) + assert response.status_code == 200 diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_load.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_load.py deleted file mode 100644 index 1b88cb8184fa0..0000000000000 --- a/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_load.py +++ /dev/null @@ -1,39 +0,0 @@ -import subprocess - -import requests -from dagster_airlift.core import AirflowBasicAuthBackend, AirflowInstance -from dagster_airlift.in_airflow.gql_queries import VERIFICATION_QUERY - - -def test_load_upstream(upstream_airflow: subprocess.Popen) -> None: - af_instance = AirflowInstance( - auth_backend=AirflowBasicAuthBackend( - webserver_url="http://localhost:8081", - username="admin", - password="admin", - ), - name="test", - ) - assert len(af_instance.list_dags()) == 1 - - -def test_load_downstream(downstream_airflow: subprocess.Popen) -> None: - af_instance = AirflowInstance( - auth_backend=AirflowBasicAuthBackend( - webserver_url="http://localhost:8082", - username="admin", - password="admin", - ), - name="test", - ) - assert len(af_instance.list_dags()) == 1 - - -def test_load_dagster(dagster_dev: subprocess.Popen) -> None: - response = requests.post( - # Timeout in seconds - "http://localhost:3000/graphql", - json={"query": VERIFICATION_QUERY}, - timeout=3, - ) - assert response.status_code == 200 diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_specs_stage.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_specs_stage.py index 7c96d838d5d56..c827ebef9b555 100644 --- a/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_specs_stage.py +++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_specs_stage.py @@ -18,7 +18,7 @@ def completed_stage() -> Generator[None, None, None]: @pytest.fixture(name="dagster_dev") def dagster_fixture( - upstream_airflow: subprocess.Popen, downstream_airflow: subprocess.Popen, completed_stage: None + warehouse_airflow: subprocess.Popen, metrics_airflow: subprocess.Popen, completed_stage: None ) -> Generator[subprocess.Popen, None, None]: process = None try: diff --git a/examples/airlift-federation-tutorial/raw_customers.csv b/examples/airlift-federation-tutorial/raw_customers.csv new file mode 100644 index 0000000000000..46fbdc093fb75 --- /dev/null +++ b/examples/airlift-federation-tutorial/raw_customers.csv @@ -0,0 +1,100 @@ +1,Michael,P. +2,Shawn,M. +3,Kathleen,P. +4,Jimmy,C. +5,Katherine,R. +6,Sarah,R. +7,Martin,M. +8,Frank,R. +9,Jennifer,F. +10,Henry,W. +11,Fred,S. +12,Amy,D. +13,Kathleen,M. +14,Steve,F. +15,Teresa,H. +16,Amanda,H. +17,Kimberly,R. +18,Johnny,K. +19,Virginia,F. +20,Anna,A. +21,Willie,H. +22,Sean,H. +23,Mildred,A. +24,David,G. +25,Victor,H. +26,Aaron,R. +27,Benjamin,B. +28,Lisa,W. +29,Benjamin,K. +30,Christina,W. +31,Jane,G. +32,Thomas,O. +33,Katherine,M. +34,Jennifer,S. +35,Sara,T. +36,Harold,O. +37,Shirley,J. +38,Dennis,J. +39,Louise,W. +40,Maria,A. +41,Gloria,C. +42,Diana,S. +43,Kelly,N. +44,Jane,R. +45,Scott,B. +46,Norma,C. +47,Marie,P. +48,Lillian,C. +49,Judy,N. +50,Billy,L. +51,Howard,R. +52,Laura,F. +53,Anne,B. +54,Rose,M. +55,Nicholas,R. +56,Joshua,K. +57,Paul,W. +58,Kathryn,K. +59,Adam,A. +60,Norma,W. +61,Timothy,R. +62,Elizabeth,P. +63,Edward,G. +64,David,C. +65,Brenda,W. +66,Adam,W. +67,Michael,H. +68,Jesse,E. +69,Janet,P. +70,Helen,F. +71,Gerald,C. +72,Kathryn,O. +73,Alan,B. +74,Harry,A. +75,Andrea,H. +76,Barbara,W. +77,Anne,W. +78,Harry,H. +79,Jack,R. +80,Phillip,H. +81,Shirley,H. +82,Arthur,D. +83,Virginia,R. +84,Christina,R. +85,Theresa,M. +86,Jason,C. +87,Phillip,B. +88,Adam,T. +89,Margaret,J. +90,Paul,P. +91,Todd,W. +92,Willie,O. +93,Frances,R. +94,Gregory,H. +95,Lisa,P. +96,Jacqueline,A. +97,Shirley,D. +98,Nicole,M. +99,Mary,G. +100,Jean,M.