-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[dagster-airlift] Federation tutorial
- Loading branch information
Showing
15 changed files
with
476 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,183 @@ | ||
*.**airflow_home* | ||
*.dagster_home* | ||
customers.csv | ||
|
||
# Byte-compiled / optimized / DLL files | ||
__pycache__/ | ||
*.py[cod] | ||
*$py.class | ||
|
||
# C extensions | ||
*.so | ||
|
||
# Distribution / packaging | ||
.Python | ||
env/ | ||
build/ | ||
develop-eggs/ | ||
dist/ | ||
downloads/ | ||
eggs/ | ||
.eggs/ | ||
lib/ | ||
lib64/ | ||
parts/ | ||
sdist/ | ||
var/ | ||
wheels/ | ||
*.egg-info/ | ||
.installed.cfg | ||
*.egg | ||
|
||
# PyInstaller | ||
# Usually these files are written by a python script from a template | ||
# before PyInstaller builds the exe, so as to inject date/other infos into it. | ||
*.manifest | ||
*.spec | ||
|
||
# Installer logs | ||
pip-log.txt | ||
pip-delete-this-directory.txt | ||
|
||
# Unit test / coverage reports | ||
htmlcov/ | ||
.tox/ | ||
.coverage | ||
.coverage.* | ||
.cache | ||
nosetests.xml | ||
coverage.xml | ||
*.cover | ||
.hypothesis/ | ||
mlruns/ | ||
|
||
# Translations | ||
*.mo | ||
*.pot | ||
|
||
# Django stuff: | ||
*.log | ||
local_settings.py | ||
|
||
# Flask stuff: | ||
.webassets-cache | ||
|
||
# Scrapy stuff: | ||
.scrapy | ||
|
||
# Sphinx documentation | ||
docs/_build/ | ||
|
||
# PyBuilder | ||
target/ | ||
|
||
# Jupyter Notebook | ||
.ipynb_checkpoints | ||
|
||
# pyenv | ||
.python-version | ||
|
||
# celery beat schedule file | ||
celerybeat-schedule | ||
|
||
# SageMath parsed files | ||
*.sage.py | ||
|
||
# dotenv | ||
.env | ||
.envrc | ||
|
||
# virtualenv | ||
.direnv/ | ||
.venv | ||
venv/ | ||
ENV/ | ||
Pipfile | ||
Pipfile.lock | ||
|
||
# Spyder project settings | ||
.spyderproject | ||
.spyproject | ||
|
||
# Rope project settings | ||
.ropeproject | ||
|
||
# mkdocs documentation | ||
/site | ||
|
||
# ruff | ||
.ruff_cache/ | ||
|
||
# mypy | ||
.mypy_cache/ | ||
|
||
tags | ||
!python_modules/dagster/dagster/_core/definitions/tags | ||
|
||
.pytest_cache | ||
.DS_Store | ||
|
||
docs/_build | ||
python_modules/dagster/docs/_build | ||
|
||
dagit_run_logs | ||
|
||
python_modules/libraries/dagster-aws/dagster_aws/ecs/config.yaml | ||
|
||
python_modules/dagster-webserver/node_modules/ | ||
python_modules/dagster-webserver/yarn.lock | ||
|
||
# old dagit stuff | ||
python_modules/dagit/node_modules/ | ||
python_modules/dagit/yarn.lock | ||
js_modules/dagit | ||
|
||
# Gatsby stuff | ||
docs/gatsby/**/node_modules/ | ||
docs/gatsby/**/_build | ||
docs/gatsby/**/public | ||
# Next stuff | ||
docs/next/.mdx-data | ||
docs/next/public/sitemap.xml | ||
# Data | ||
data | ||
# Don't ignore data folders in examples | ||
!examples/*/data | ||
!examples/**/**/data | ||
|
||
# Dask | ||
dask-worker-space | ||
|
||
# PyCharm IDE Config files | ||
.idea/ | ||
|
||
# Codemod bookmarks | ||
.codemod.bookmark | ||
|
||
# Examples outputs | ||
examples/docs_snippets/docs_snippets/**/**/output/ | ||
examples/docs_snippets/docs_snippets/**/**/output/ | ||
examples/**/**/example.db | ||
|
||
# Telemetry instance id | ||
.telemetry | ||
|
||
test_results.xml | ||
|
||
# GitHub Codespaces | ||
pythonenv*/ | ||
|
||
# Vim project-local settings | ||
.vim | ||
|
||
# DuckDB | ||
*.duckdb | ||
|
||
# PyRight config | ||
pyrightconfig* | ||
|
||
# Scripts working directory | ||
scripts/.build | ||
|
||
# dbt .user files | ||
.user.yml |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
.PHONY: help | ||
|
||
define GET_MAKEFILE_DIR | ||
$(shell dirname $(realpath $(lastword $(MAKEFILE_LIST))) | sed 's:/*$$::') | ||
endef | ||
|
||
MAKEFILE_DIR := $(GET_MAKEFILE_DIR) | ||
export TUTORIAL_EXAMPLE_DIR := $(MAKEFILE_DIR) | ||
export DAGSTER_HOME := $(MAKEFILE_DIR)/.dagster_home | ||
export UPSTREAM_AIRFLOW_HOME := $(MAKEFILE_DIR)/.upstream_airflow_home | ||
export DOWNSTREAM_AIRFLOW_HOME := $(MAKEFILE_DIR)/.downstream_airflow_home | ||
export DAGSTER_URL := http://localhost:3000 | ||
|
||
# Detect OS and use appropriate date command | ||
UNAME_S := $(shell uname -s) | ||
ifeq ($(UNAME_S),Darwin) | ||
TOMORROW_DATE := $(shell date -v+1d +"%Y-%m-%d") | ||
else | ||
TOMORROW_DATE := $(shell date -d "+1 day" +"%Y-%m-%d") | ||
endif | ||
|
||
help: | ||
@egrep -h '\s##\s' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}' | ||
|
||
|
||
### TUTORIAL COMMANDS ### | ||
airflow_install: | ||
pip install uv && \ | ||
uv pip install dagster-airlift[in-airflow,dbt,tutorial] && \ | ||
uv pip install -e $(MAKEFILE_DIR) | ||
|
||
airflow_setup: wipe | ||
mkdir -p $$UPSTREAM_AIRFLOW_HOME | ||
mkdir -p $$DOWNSTREAM_AIRFLOW_HOME | ||
mkdir -p $$DAGSTER_HOME | ||
chmod +x $(MAKEFILE_DIR)/scripts/airflow_setup.sh | ||
$(MAKEFILE_DIR)/scripts/airflow_setup.sh $(MAKEFILE_DIR)/airlift_federation_tutorial/upstream_airflow_dags $(UPSTREAM_AIRFLOW_HOME) 8081 | ||
$(MAKEFILE_DIR)/scripts/airflow_setup.sh $(MAKEFILE_DIR)/airlift_federation_tutorial/upstream_airflow_dags $(DOWNSTREAM_AIRFLOW_HOME) 8082 | ||
|
||
upstream_airflow_run: | ||
AIRFLOW_HOME=$(UPSTREAM_AIRFLOW_HOME) airflow standalone | ||
|
||
downstream_airflow_run: | ||
AIRFLOW_HOME=$(DOWNSTREAM_AIRFLOW_HOME) airflow standalone | ||
|
||
dagster_run: | ||
dagster dev -m airlift_federation_tutorial.dagster_defs.definitions -p 3000 | ||
|
||
clean: | ||
airflow db clean --yes --clean-before-timestamp $(TOMORROW_DATE) | ||
dagster asset wipe --all --noprompt | ||
|
||
update_readme_snippets: | ||
python ../../scripts/update_readme_snippets.py $(MAKEFILE_DIR)/README.md | ||
|
||
wipe: | ||
rm -rf $$UPSTREAM_AIRFLOW_HOME $$DOWNSTREAM_AIRFLOW_HOME $$DAGSTER_HOME | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
## Airlift Federation Tutorial Code | ||
|
||
Work in progress. |
Empty file.
5 changes: 5 additions & 0 deletions
5
examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/definitions.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from dagster import Definitions | ||
|
||
# Use this empty file to follow along with the tutorial | ||
|
||
defs = Definitions() |
6 changes: 6 additions & 0 deletions
6
...s/airlift-federation-tutorial/airlift_federation_tutorial/downstream_airflow_dags/dags.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from airflow import DAG | ||
|
||
with DAG( | ||
dag_id="customer_metrics", | ||
) as dag: | ||
pass |
6 changes: 6 additions & 0 deletions
6
...les/airlift-federation-tutorial/airlift_federation_tutorial/upstream_airflow_dags/dags.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from airflow import DAG | ||
|
||
with DAG( | ||
dag_id="load_customers", | ||
) as dag: | ||
pass |
68 changes: 68 additions & 0 deletions
68
examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/conftest.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
import os | ||
import subprocess | ||
from pathlib import Path | ||
from typing import Generator | ||
|
||
import pytest | ||
from dagster_airlift.test.shared_fixtures import stand_up_airflow, stand_up_dagster | ||
|
||
|
||
def makefile_dir() -> Path: | ||
return Path(__file__).parent.parent | ||
|
||
|
||
@pytest.fixture(name="local_env") | ||
def local_env_fixture() -> Generator[None, None, None]: | ||
try: | ||
subprocess.run(["make", "airflow_setup"], cwd=makefile_dir(), check=True) | ||
yield | ||
finally: | ||
subprocess.run(["make", "wipe"], cwd=makefile_dir(), check=True) | ||
|
||
|
||
@pytest.fixture(name="upstream_airflow") | ||
def upstream_airflow_fixture(local_env: None) -> Generator[subprocess.Popen, None, None]: | ||
process = None | ||
try: | ||
with stand_up_airflow( | ||
airflow_cmd=["make", "upstream_airflow_run"], | ||
env=os.environ, | ||
cwd=makefile_dir(), | ||
port=8081, | ||
) as process: | ||
yield process | ||
finally: | ||
if process: | ||
process.terminate() | ||
|
||
|
||
@pytest.fixture(name="downstream_airflow") | ||
def downstream_airflow_fixture(local_env: None) -> Generator[subprocess.Popen, None, None]: | ||
process = None | ||
try: | ||
with stand_up_airflow( | ||
airflow_cmd=["make", "downstream_airflow_run"], | ||
env=os.environ, | ||
cwd=makefile_dir(), | ||
port=8082, | ||
) as process: | ||
yield process | ||
finally: | ||
if process: | ||
process.terminate() | ||
|
||
|
||
@pytest.fixture(name="dagster_dev") | ||
def dagster_fixture( | ||
upstream_airflow: subprocess.Popen, downstream_airflow: subprocess.Popen | ||
) -> Generator[subprocess.Popen, None, None]: | ||
process = None | ||
try: | ||
with stand_up_dagster( | ||
dagster_dev_cmd=["make", "-C", str(makefile_dir()), "dagster_run"], | ||
port=3000, | ||
) as process: | ||
yield process | ||
finally: | ||
if process: | ||
process.terminate() |
39 changes: 39 additions & 0 deletions
39
examples/airlift-federation-tutorial/airlift_federation_tutorial_tests/test_load.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
import subprocess | ||
|
||
import requests | ||
from dagster_airlift.core import AirflowBasicAuthBackend, AirflowInstance | ||
from dagster_airlift.in_airflow.gql_queries import VERIFICATION_QUERY | ||
|
||
|
||
def test_load_upstream(upstream_airflow: subprocess.Popen) -> None: | ||
af_instance = AirflowInstance( | ||
auth_backend=AirflowBasicAuthBackend( | ||
webserver_url="http://localhost:8081", | ||
username="admin", | ||
password="admin", | ||
), | ||
name="test", | ||
) | ||
assert len(af_instance.list_dags()) == 1 | ||
|
||
|
||
def test_load_downstream(downstream_airflow: subprocess.Popen) -> None: | ||
af_instance = AirflowInstance( | ||
auth_backend=AirflowBasicAuthBackend( | ||
webserver_url="http://localhost:8082", | ||
username="admin", | ||
password="admin", | ||
), | ||
name="test", | ||
) | ||
assert len(af_instance.list_dags()) == 1 | ||
|
||
|
||
def test_load_dagster(dagster_dev: subprocess.Popen) -> None: | ||
response = requests.post( | ||
# Timeout in seconds | ||
"http://localhost:3000/graphql", | ||
json={"query": VERIFICATION_QUERY}, | ||
timeout=3, | ||
) | ||
assert response.status_code == 200 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
pytest_plugins = ["dagster_airlift.test.shared_fixtures"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
[build-system] | ||
requires = ["setuptools"] | ||
build-backend = "setuptools.build_meta" | ||
|
||
[tool.dagster] | ||
module_name = "airlift_federation_tutorial.definitions" |
Oops, something went wrong.