From 44390d7cc3d555fd5b3769842a6a6ec3d7c8b6cb Mon Sep 17 00:00:00 2001 From: Chris DeCarolis Date: Mon, 11 Nov 2024 15:14:42 -0800 Subject: [PATCH] [dagster-airlift] Federation tutorial --- .../airlift-federation-tutorial/.gitignore | 183 ++++++++++++++++++ examples/airlift-federation-tutorial/Makefile | 58 ++++++ .../airlift-federation-tutorial/README.md | 3 + .../dagster_defs/__init__.py | 0 .../dagster_defs/definitions.py | 5 + .../downstream_airflow_dags/dags.py | 6 + .../upstream_airflow_dags/dags.py | 6 + .../conftest.py | 68 +++++++ .../test_load.py | 39 ++++ .../airlift-federation-tutorial/conftest.py | 1 + .../pyproject.toml | 6 + .../scripts/airflow_setup.sh | 42 ++++ examples/airlift-federation-tutorial/setup.py | 12 ++ examples/airlift-federation-tutorial/tox.ini | 29 +++ .../dagster_airlift/test/shared_fixtures.py | 24 ++- 15 files changed, 476 insertions(+), 6 deletions(-) create mode 100644 examples/airlift-federation-tutorial/.gitignore create mode 100644 examples/airlift-federation-tutorial/Makefile create mode 100644 examples/airlift-federation-tutorial/README.md create mode 100644 examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/__init__.py create mode 100644 examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/definitions.py create mode 100644 examples/airlift-federation-tutorial/airlift_federation_tutorial/downstream_airflow_dags/dags.py create mode 100644 examples/airlift-federation-tutorial/airlift_federation_tutorial/upstream_airflow_dags/dags.py create mode 100644 examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/conftest.py create mode 100644 examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_load.py create mode 100644 examples/airlift-federation-tutorial/conftest.py create mode 100644 examples/airlift-federation-tutorial/pyproject.toml create mode 100755 examples/airlift-federation-tutorial/scripts/airflow_setup.sh create mode 100644 examples/airlift-federation-tutorial/setup.py create mode 100644 examples/airlift-federation-tutorial/tox.ini diff --git a/examples/airlift-federation-tutorial/.gitignore b/examples/airlift-federation-tutorial/.gitignore new file mode 100644 index 0000000000000..a1ff4089436e8 --- /dev/null +++ b/examples/airlift-federation-tutorial/.gitignore @@ -0,0 +1,183 @@ +*.**airflow_home* +*.dagster_home* +customers.csv + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +mlruns/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py + +# Flask stuff: +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# dotenv +.env +.envrc + +# virtualenv +.direnv/ +.venv +venv/ +ENV/ +Pipfile +Pipfile.lock + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# ruff +.ruff_cache/ + +# mypy +.mypy_cache/ + +tags +!python_modules/dagster/dagster/_core/definitions/tags + +.pytest_cache +.DS_Store + +docs/_build +python_modules/dagster/docs/_build + +dagit_run_logs + +python_modules/libraries/dagster-aws/dagster_aws/ecs/config.yaml + +python_modules/dagster-webserver/node_modules/ +python_modules/dagster-webserver/yarn.lock + +# old dagit stuff +python_modules/dagit/node_modules/ +python_modules/dagit/yarn.lock +js_modules/dagit + +# Gatsby stuff +docs/gatsby/**/node_modules/ +docs/gatsby/**/_build +docs/gatsby/**/public +# Next stuff +docs/next/.mdx-data +docs/next/public/sitemap.xml +# Data +data +# Don't ignore data folders in examples +!examples/*/data +!examples/**/**/data + +# Dask +dask-worker-space + +# PyCharm IDE Config files +.idea/ + +# Codemod bookmarks +.codemod.bookmark + +# Examples outputs +examples/docs_snippets/docs_snippets/**/**/output/ +examples/docs_snippets/docs_snippets/**/**/output/ +examples/**/**/example.db + +# Telemetry instance id +.telemetry + +test_results.xml + +# GitHub Codespaces +pythonenv*/ + +# Vim project-local settings +.vim + +# DuckDB +*.duckdb + +# PyRight config +pyrightconfig* + +# Scripts working directory +scripts/.build + +# dbt .user files +.user.yml diff --git a/examples/airlift-federation-tutorial/Makefile b/examples/airlift-federation-tutorial/Makefile new file mode 100644 index 0000000000000..9aa195a2e49db --- /dev/null +++ b/examples/airlift-federation-tutorial/Makefile @@ -0,0 +1,58 @@ +.PHONY: help + +define GET_MAKEFILE_DIR +$(shell dirname $(realpath $(lastword $(MAKEFILE_LIST))) | sed 's:/*$$::') +endef + +MAKEFILE_DIR := $(GET_MAKEFILE_DIR) +export TUTORIAL_EXAMPLE_DIR := $(MAKEFILE_DIR) +export DAGSTER_HOME := $(MAKEFILE_DIR)/.dagster_home +export UPSTREAM_AIRFLOW_HOME := $(MAKEFILE_DIR)/.upstream_airflow_home +export DOWNSTREAM_AIRFLOW_HOME := $(MAKEFILE_DIR)/.downstream_airflow_home +export DAGSTER_URL := http://localhost:3000 + +# Detect OS and use appropriate date command +UNAME_S := $(shell uname -s) +ifeq ($(UNAME_S),Darwin) + TOMORROW_DATE := $(shell date -v+1d +"%Y-%m-%d") +else + TOMORROW_DATE := $(shell date -d "+1 day" +"%Y-%m-%d") +endif + +help: + @egrep -h '\s##\s' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}' + + +### TUTORIAL COMMANDS ### +airflow_install: + pip install uv && \ + uv pip install dagster-airlift[in-airflow,dbt,tutorial] && \ + uv pip install -e $(MAKEFILE_DIR) + +airflow_setup: wipe + mkdir -p $$UPSTREAM_AIRFLOW_HOME + mkdir -p $$DOWNSTREAM_AIRFLOW_HOME + mkdir -p $$DAGSTER_HOME + chmod +x $(MAKEFILE_DIR)/scripts/airflow_setup.sh + $(MAKEFILE_DIR)/scripts/airflow_setup.sh $(MAKEFILE_DIR)/airlift_federation_tutorial/upstream_airflow_dags $(UPSTREAM_AIRFLOW_HOME) 8081 + $(MAKEFILE_DIR)/scripts/airflow_setup.sh $(MAKEFILE_DIR)/airlift_federation_tutorial/upstream_airflow_dags $(DOWNSTREAM_AIRFLOW_HOME) 8082 + +upstream_airflow_run: + AIRFLOW_HOME=$(UPSTREAM_AIRFLOW_HOME) airflow standalone + +downstream_airflow_run: + AIRFLOW_HOME=$(DOWNSTREAM_AIRFLOW_HOME) airflow standalone + +dagster_run: + dagster dev -m airlift_federation_tutorial.dagster_defs.definitions -p 3000 + +clean: + airflow db clean --yes --clean-before-timestamp $(TOMORROW_DATE) + dagster asset wipe --all --noprompt + +update_readme_snippets: + python ../../scripts/update_readme_snippets.py $(MAKEFILE_DIR)/README.md + +wipe: + rm -rf $$UPSTREAM_AIRFLOW_HOME $$DOWNSTREAM_AIRFLOW_HOME $$DAGSTER_HOME + diff --git a/examples/airlift-federation-tutorial/README.md b/examples/airlift-federation-tutorial/README.md new file mode 100644 index 0000000000000..271eb1b96533c --- /dev/null +++ b/examples/airlift-federation-tutorial/README.md @@ -0,0 +1,3 @@ +## Airlift Federation Tutorial Code + +Work in progress. \ No newline at end of file diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/__init__.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/definitions.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/definitions.py new file mode 100644 index 0000000000000..1ec727f921b69 --- /dev/null +++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/definitions.py @@ -0,0 +1,5 @@ +from dagster import Definitions + +# Use this empty file to follow along with the tutorial + +defs = Definitions() diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial/downstream_airflow_dags/dags.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial/downstream_airflow_dags/dags.py new file mode 100644 index 0000000000000..61beff7adbdd8 --- /dev/null +++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial/downstream_airflow_dags/dags.py @@ -0,0 +1,6 @@ +from airflow import DAG + +with DAG( + dag_id="customer_metrics", +) as dag: + pass diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial/upstream_airflow_dags/dags.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial/upstream_airflow_dags/dags.py new file mode 100644 index 0000000000000..2df9b05c87edc --- /dev/null +++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial/upstream_airflow_dags/dags.py @@ -0,0 +1,6 @@ +from airflow import DAG + +with DAG( + dag_id="load_customers", +) as dag: + pass diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/conftest.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/conftest.py new file mode 100644 index 0000000000000..d19de28d07364 --- /dev/null +++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/conftest.py @@ -0,0 +1,68 @@ +import os +import subprocess +from pathlib import Path +from typing import Generator + +import pytest +from dagster_airlift.test.shared_fixtures import stand_up_airflow, stand_up_dagster + + +def makefile_dir() -> Path: + return Path(__file__).parent.parent + + +@pytest.fixture(name="local_env") +def local_env_fixture() -> Generator[None, None, None]: + try: + subprocess.run(["make", "airflow_setup"], cwd=makefile_dir(), check=True) + yield + finally: + subprocess.run(["make", "wipe"], cwd=makefile_dir(), check=True) + + +@pytest.fixture(name="upstream_airflow") +def upstream_airflow_fixture(local_env: None) -> Generator[subprocess.Popen, None, None]: + process = None + try: + with stand_up_airflow( + airflow_cmd=["make", "upstream_airflow_run"], + env=os.environ, + cwd=makefile_dir(), + port=8081, + ) as process: + yield process + finally: + if process: + process.terminate() + + +@pytest.fixture(name="downstream_airflow") +def downstream_airflow_fixture(local_env: None) -> Generator[subprocess.Popen, None, None]: + process = None + try: + with stand_up_airflow( + airflow_cmd=["make", "downstream_airflow_run"], + env=os.environ, + cwd=makefile_dir(), + port=8082, + ) as process: + yield process + finally: + if process: + process.terminate() + + +@pytest.fixture(name="dagster_dev") +def dagster_fixture( + upstream_airflow: subprocess.Popen, downstream_airflow: subprocess.Popen +) -> Generator[subprocess.Popen, None, None]: + process = None + try: + with stand_up_dagster( + dagster_dev_cmd=["make", "-C", str(makefile_dir()), "dagster_run"], + port=3000, + ) as process: + yield process + finally: + if process: + process.terminate() diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_load.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_load.py new file mode 100644 index 0000000000000..1b88cb8184fa0 --- /dev/null +++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_load.py @@ -0,0 +1,39 @@ +import subprocess + +import requests +from dagster_airlift.core import AirflowBasicAuthBackend, AirflowInstance +from dagster_airlift.in_airflow.gql_queries import VERIFICATION_QUERY + + +def test_load_upstream(upstream_airflow: subprocess.Popen) -> None: + af_instance = AirflowInstance( + auth_backend=AirflowBasicAuthBackend( + webserver_url="http://localhost:8081", + username="admin", + password="admin", + ), + name="test", + ) + assert len(af_instance.list_dags()) == 1 + + +def test_load_downstream(downstream_airflow: subprocess.Popen) -> None: + af_instance = AirflowInstance( + auth_backend=AirflowBasicAuthBackend( + webserver_url="http://localhost:8082", + username="admin", + password="admin", + ), + name="test", + ) + assert len(af_instance.list_dags()) == 1 + + +def test_load_dagster(dagster_dev: subprocess.Popen) -> None: + response = requests.post( + # Timeout in seconds + "http://localhost:3000/graphql", + json={"query": VERIFICATION_QUERY}, + timeout=3, + ) + assert response.status_code == 200 diff --git a/examples/airlift-federation-tutorial/conftest.py b/examples/airlift-federation-tutorial/conftest.py new file mode 100644 index 0000000000000..15102201ce4c1 --- /dev/null +++ b/examples/airlift-federation-tutorial/conftest.py @@ -0,0 +1 @@ +pytest_plugins = ["dagster_airlift.test.shared_fixtures"] diff --git a/examples/airlift-federation-tutorial/pyproject.toml b/examples/airlift-federation-tutorial/pyproject.toml new file mode 100644 index 0000000000000..6bbc17fc42277 --- /dev/null +++ b/examples/airlift-federation-tutorial/pyproject.toml @@ -0,0 +1,6 @@ +[build-system] +requires = ["setuptools"] +build-backend = "setuptools.build_meta" + +[tool.dagster] +module_name = "airlift_federation_tutorial.definitions" diff --git a/examples/airlift-federation-tutorial/scripts/airflow_setup.sh b/examples/airlift-federation-tutorial/scripts/airflow_setup.sh new file mode 100755 index 0000000000000..bb61e59c82bf9 --- /dev/null +++ b/examples/airlift-federation-tutorial/scripts/airflow_setup.sh @@ -0,0 +1,42 @@ +#!/bin/bash + +# Check if the required arguments are provided +if [ -z "$1" ] || [ -z "$2" ]; then + echo "Usage: $0 [port]" + exit 1 +fi + +DAGS_FOLDER=$1 +AIRFLOW_HOME_DIR=$2 +# Set default port to 8080 if not provided as third argument +PORT=${3:-8080} + +# Validate that the provided paths are absolute paths +if [[ "$DAGS_FOLDER" != /* ]] || [[ "$AIRFLOW_HOME_DIR" != /* ]]; then + echo "Error: Both paths must be absolute paths." + exit 1 +fi + +# Create the airflow.cfg file in the specified AIRFLOW_HOME_DIR +cat < $AIRFLOW_HOME_DIR/airflow.cfg +[core] +dags_folder = $DAGS_FOLDER +dagbag_import_timeout = 30 +load_examples = False +[api] +auth_backend = airflow.api.auth.backend.basic_auth +[webserver] +expose_config = True +web_server_port = $PORT + +EOL + +# call airflow command to create the default user +AIRFLOW_HOME=$AIRFLOW_HOME_DIR airflow db migrate && \ +AIRFLOW_HOME=$AIRFLOW_HOME_DIR airflow users create \ + --username admin \ + --password admin \ + --firstname Peter \ + --lastname Parker \ + --role Admin \ + --email spiderman@superhero.org \ No newline at end of file diff --git a/examples/airlift-federation-tutorial/setup.py b/examples/airlift-federation-tutorial/setup.py new file mode 100644 index 0000000000000..2e3c5854a808f --- /dev/null +++ b/examples/airlift-federation-tutorial/setup.py @@ -0,0 +1,12 @@ +from setuptools import find_packages, setup + +setup( + name="airlift-federation-tutorial", + packages=find_packages(), + install_requires=[ + "dagster", + "dagster-webserver", + "dagster-airlift[dbt,core]", + ], + extras_require={"test": ["pytest"]}, +) diff --git a/examples/airlift-federation-tutorial/tox.ini b/examples/airlift-federation-tutorial/tox.ini new file mode 100644 index 0000000000000..54747f04d594b --- /dev/null +++ b/examples/airlift-federation-tutorial/tox.ini @@ -0,0 +1,29 @@ +[tox] +skipsdist = true + +[testenv] +download = True +passenv = + CI_* + COVERALLS_REPO_TOKEN + BUILDKITE* +install_command = uv pip install {opts} {packages} +deps = + -e ../../python_modules/dagster[test] + -e ../../python_modules/dagster-webserver + -e ../../python_modules/dagster-test + -e ../../python_modules/dagster-pipes + -e ../../python_modules/dagster-graphql + -e ../../python_modules/libraries/dagster-dbt + -e ../experimental/dagster-airlift[core,dbt,test,in-airflow] + -e . + pandas +allowlist_externals = + /bin/bash + uv + make +commands = + # We need to rebuild the UI to ensure that the dagster-webserver can run + make -C ../../../../.. rebuild_ui + !windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster' + pytest -c ../../../../../pyproject.toml ./airlift_federation_tutorial_tests --snapshot-warn-unused -vv {posargs} diff --git a/examples/experimental/dagster-airlift/dagster_airlift/test/shared_fixtures.py b/examples/experimental/dagster-airlift/dagster_airlift/test/shared_fixtures.py index 21d89d28491dc..a5f57e59cf66d 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/test/shared_fixtures.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/test/shared_fixtures.py @@ -13,6 +13,9 @@ from dagster._core.test_utils import environ from dagster._time import get_current_timestamp +from dagster_airlift.core.airflow_instance import AirflowInstance +from dagster_airlift.core.basic_auth import AirflowBasicAuthBackend + #################################################################################################### # AIRFLOW SETUP FIXTURES @@ -21,8 +24,15 @@ #################################################################################################### def _airflow_is_ready(port) -> bool: try: - response = requests.get(f"http://localhost:{port}") - return response.status_code == 200 + af_instance = AirflowInstance( + auth_backend=AirflowBasicAuthBackend( + webserver_url=f"http://localhost:{port}", + username="admin", + password="admin", + ), + name="test", + ) + return len(af_instance.list_dags()) > 0 except: return False @@ -105,9 +115,9 @@ def airflow_instance_fixture(setup: None) -> Generator[subprocess.Popen, None, N # Sets up the dagster environment for testing. Running at localhost:3333. # Callsites are expected to provide implementations for dagster_defs_path fixture. #################################################################################################### -def _dagster_is_ready() -> bool: +def _dagster_is_ready(port: int) -> bool: try: - response = requests.get("http://localhost:3333") + response = requests.get(f"http://localhost:{port}") return response.status_code == 200 except: return False @@ -141,7 +151,9 @@ def setup_dagster( @contextmanager -def stand_up_dagster(dagster_dev_cmd: List[str]) -> Generator[subprocess.Popen, None, None]: +def stand_up_dagster( + dagster_dev_cmd: List[str], port: int = 3333 +) -> Generator[subprocess.Popen, None, None]: """Stands up a dagster instance using the dagster dev CLI. dagster_defs_path must be provided by a fixture included in the callsite. """ @@ -158,7 +170,7 @@ def stand_up_dagster(dagster_dev_cmd: List[str]) -> Generator[subprocess.Popen, dagster_ready = False initial_time = get_current_timestamp() while get_current_timestamp() - initial_time < 60: - if _dagster_is_ready(): + if _dagster_is_ready(port): dagster_ready = True break time.sleep(1)