Skip to content

Commit

Permalink
[dagster-airlift] Federation tutorial
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Nov 13, 2024
1 parent 645cd02 commit 7b1f43c
Show file tree
Hide file tree
Showing 15 changed files with 476 additions and 6 deletions.
183 changes: 183 additions & 0 deletions examples/airlift-federation-tutorial/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
*.**airflow_home*
*.dagster_home*
customers.csv

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
mlruns/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py

# Flask stuff:
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# pyenv
.python-version

# celery beat schedule file
celerybeat-schedule

# SageMath parsed files
*.sage.py

# dotenv
.env
.envrc

# virtualenv
.direnv/
.venv
venv/
ENV/
Pipfile
Pipfile.lock

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# ruff
.ruff_cache/

# mypy
.mypy_cache/

tags
!python_modules/dagster/dagster/_core/definitions/tags

.pytest_cache
.DS_Store

docs/_build
python_modules/dagster/docs/_build

dagit_run_logs

python_modules/libraries/dagster-aws/dagster_aws/ecs/config.yaml

python_modules/dagster-webserver/node_modules/
python_modules/dagster-webserver/yarn.lock

# old dagit stuff
python_modules/dagit/node_modules/
python_modules/dagit/yarn.lock
js_modules/dagit

# Gatsby stuff
docs/gatsby/**/node_modules/
docs/gatsby/**/_build
docs/gatsby/**/public
# Next stuff
docs/next/.mdx-data
docs/next/public/sitemap.xml
# Data
data
# Don't ignore data folders in examples
!examples/*/data
!examples/**/**/data

# Dask
dask-worker-space

# PyCharm IDE Config files
.idea/

# Codemod bookmarks
.codemod.bookmark

# Examples outputs
examples/docs_snippets/docs_snippets/**/**/output/
examples/docs_snippets/docs_snippets/**/**/output/
examples/**/**/example.db

# Telemetry instance id
.telemetry

test_results.xml

# GitHub Codespaces
pythonenv*/

# Vim project-local settings
.vim

# DuckDB
*.duckdb

# PyRight config
pyrightconfig*

# Scripts working directory
scripts/.build

# dbt .user files
.user.yml
58 changes: 58 additions & 0 deletions examples/airlift-federation-tutorial/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
.PHONY: help

define GET_MAKEFILE_DIR
$(shell dirname $(realpath $(lastword $(MAKEFILE_LIST))) | sed 's:/*$$::')
endef

MAKEFILE_DIR := $(GET_MAKEFILE_DIR)
export TUTORIAL_EXAMPLE_DIR := $(MAKEFILE_DIR)
export DAGSTER_HOME := $(MAKEFILE_DIR)/.dagster_home
export UPSTREAM_AIRFLOW_HOME := $(MAKEFILE_DIR)/.upstream_airflow_home
export DOWNSTREAM_AIRFLOW_HOME := $(MAKEFILE_DIR)/.downstream_airflow_home
export DAGSTER_URL := http://localhost:3000

# Detect OS and use appropriate date command
UNAME_S := $(shell uname -s)
ifeq ($(UNAME_S),Darwin)
TOMORROW_DATE := $(shell date -v+1d +"%Y-%m-%d")
else
TOMORROW_DATE := $(shell date -d "+1 day" +"%Y-%m-%d")
endif

help:
@egrep -h '\s##\s' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'


### TUTORIAL COMMANDS ###
airflow_install:
pip install uv && \
uv pip install dagster-airlift[in-airflow,dbt,tutorial] && \
uv pip install -e $(MAKEFILE_DIR)

airflow_setup: wipe
mkdir -p $$UPSTREAM_AIRFLOW_HOME
mkdir -p $$DOWNSTREAM_AIRFLOW_HOME
mkdir -p $$DAGSTER_HOME
chmod +x $(MAKEFILE_DIR)/scripts/airflow_setup.sh
$(MAKEFILE_DIR)/scripts/airflow_setup.sh $(MAKEFILE_DIR)/airlift_federation_tutorial/upstream_airflow_dags $(UPSTREAM_AIRFLOW_HOME) 8081
$(MAKEFILE_DIR)/scripts/airflow_setup.sh $(MAKEFILE_DIR)/airlift_federation_tutorial/upstream_airflow_dags $(DOWNSTREAM_AIRFLOW_HOME) 8082

upstream_airflow_run:
AIRFLOW_HOME=$(UPSTREAM_AIRFLOW_HOME) airflow standalone

downstream_airflow_run:
AIRFLOW_HOME=$(DOWNSTREAM_AIRFLOW_HOME) airflow standalone

dagster_run:
dagster dev -m airlift_federation_tutorial.dagster_defs.definitions -p 3000

clean:
airflow db clean --yes --clean-before-timestamp $(TOMORROW_DATE)
dagster asset wipe --all --noprompt

update_readme_snippets:
python ../../scripts/update_readme_snippets.py $(MAKEFILE_DIR)/README.md

wipe:
rm -rf $$UPSTREAM_AIRFLOW_HOME $$DOWNSTREAM_AIRFLOW_HOME $$DAGSTER_HOME

3 changes: 3 additions & 0 deletions examples/airlift-federation-tutorial/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## Airlift Federation Tutorial Code

Work in progress.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from dagster import Definitions

# Use this empty file to follow along with the tutorial

defs = Definitions()
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from airflow import DAG

with DAG(
dag_id="customer_metrics",
) as dag:
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from airflow import DAG

with DAG(
dag_id="load_customers",
) as dag:
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import os
import subprocess
from pathlib import Path
from typing import Generator

import pytest
from dagster_airlift.test.shared_fixtures import stand_up_airflow, stand_up_dagster


def makefile_dir() -> Path:
return Path(__file__).parent.parent


@pytest.fixture(name="local_env")
def local_env_fixture() -> Generator[None, None, None]:
try:
subprocess.run(["make", "airflow_setup"], cwd=makefile_dir(), check=True)
yield
finally:
subprocess.run(["make", "wipe"], cwd=makefile_dir(), check=True)


@pytest.fixture(name="upstream_airflow")
def upstream_airflow_fixture(local_env: None) -> Generator[subprocess.Popen, None, None]:
process = None
try:
with stand_up_airflow(
airflow_cmd=["make", "upstream_airflow_run"],
env=os.environ,
cwd=makefile_dir(),
port=8081,
) as process:
yield process
finally:
if process:
process.terminate()


@pytest.fixture(name="downstream_airflow")
def downstream_airflow_fixture(local_env: None) -> Generator[subprocess.Popen, None, None]:
process = None
try:
with stand_up_airflow(
airflow_cmd=["make", "downstream_airflow_run"],
env=os.environ,
cwd=makefile_dir(),
port=8082,
) as process:
yield process
finally:
if process:
process.terminate()


@pytest.fixture(name="dagster_dev")
def dagster_fixture(
upstream_airflow: subprocess.Popen, downstream_airflow: subprocess.Popen
) -> Generator[subprocess.Popen, None, None]:
process = None
try:
with stand_up_dagster(
dagster_dev_cmd=["make", "-C", str(makefile_dir()), "dagster_run"],
port=3000,
) as process:
yield process
finally:
if process:
process.terminate()
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import subprocess

import requests
from dagster_airlift.core import AirflowBasicAuthBackend, AirflowInstance
from dagster_airlift.in_airflow.gql_queries import VERIFICATION_QUERY


def test_load_upstream(upstream_airflow: subprocess.Popen) -> None:
af_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8081",
username="admin",
password="admin",
),
name="test",
)
assert len(af_instance.list_dags()) == 1


def test_load_downstream(downstream_airflow: subprocess.Popen) -> None:
af_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8082",
username="admin",
password="admin",
),
name="test",
)
assert len(af_instance.list_dags()) == 1


def test_load_dagster(dagster_dev: subprocess.Popen) -> None:
response = requests.post(
# Timeout in seconds
"http://localhost:3000/graphql",
json={"query": VERIFICATION_QUERY},
timeout=3,
)
assert response.status_code == 200
1 change: 1 addition & 0 deletions examples/airlift-federation-tutorial/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pytest_plugins = ["dagster_airlift.test.shared_fixtures"]
6 changes: 6 additions & 0 deletions examples/airlift-federation-tutorial/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"

[tool.dagster]
module_name = "airlift_federation_tutorial.definitions"
Loading

0 comments on commit 7b1f43c

Please sign in to comment.