Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-airlift][federation-tutorial] Implement dags #25885

Merged
merged 1 commit into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ Finally, let's run the two Airflow instances with environment variables set:
In one shell run:

```bash
make upstream_airflow_run
make warehouse_airflow_run
```

In a separate shell, run:

```bash
make downstream_airflow_run
make metrics_airflow_run
```

This will run two Airflow Web UIs, one for each Airflow instance. You should now be able to access the upstream Airflow UI at `http://localhost:8081`, with the default username and password set to `admin`.
This will run two Airflow Web UIs, one for each Airflow instance. You should now be able to access the warehouse Airflow UI at `http://localhost:8081`, with the default username and password set to `admin`.

You should be able to see the `load_customers` DAG in the Airflow UI.

Expand All @@ -64,7 +64,7 @@ width={1484}
height={300}
/>

Similarly, you should be able to access the downstream Airflow UI at `http://localhost:8082`, with the default username and password set to `admin`.
Similarly, you should be able to access the metrics Airflow UI at `http://localhost:8082`, with the default username and password set to `admin`.

You should be able to see the `customer_metrics` DAG in the Airflow UI.

Expand Down
1 change: 1 addition & 0 deletions examples/airlift-federation-tutorial/.gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
*.**airflow_home*
*.dagster_home*
customers.csv
*.db

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
22 changes: 11 additions & 11 deletions examples/airlift-federation-tutorial/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ endef
MAKEFILE_DIR := $(GET_MAKEFILE_DIR)
export TUTORIAL_EXAMPLE_DIR := $(MAKEFILE_DIR)
export DAGSTER_HOME := $(MAKEFILE_DIR)/.dagster_home
export UPSTREAM_AIRFLOW_HOME := $(MAKEFILE_DIR)/.upstream_airflow_home
export DOWNSTREAM_AIRFLOW_HOME := $(MAKEFILE_DIR)/.downstream_airflow_home
export WAREHOUSE_AIRFLOW_HOME := $(MAKEFILE_DIR)/.warehouse_airflow_home
export METRICS_AIRFLOW_HOME := $(MAKEFILE_DIR)/.metrics_airflow_home
export DAGSTER_URL := http://localhost:3000

# Detect OS and use appropriate date command
Expand All @@ -30,18 +30,18 @@ airflow_install:
uv pip install -e $(MAKEFILE_DIR)

airflow_setup: wipe
mkdir -p $$UPSTREAM_AIRFLOW_HOME
mkdir -p $$DOWNSTREAM_AIRFLOW_HOME
mkdir -p $$WAREHOUSE_AIRFLOW_HOME
mkdir -p $$METRICS_AIRFLOW_HOME
mkdir -p $$DAGSTER_HOME
chmod +x $(MAKEFILE_DIR)/scripts/airflow_setup.sh
$(MAKEFILE_DIR)/scripts/airflow_setup.sh $(MAKEFILE_DIR)/airlift_federation_tutorial/upstream_airflow_dags $(UPSTREAM_AIRFLOW_HOME) 8081
$(MAKEFILE_DIR)/scripts/airflow_setup.sh $(MAKEFILE_DIR)/airlift_federation_tutorial/downstream_airflow_dags $(DOWNSTREAM_AIRFLOW_HOME) 8082
$(MAKEFILE_DIR)/scripts/airflow_setup.sh $(MAKEFILE_DIR)/airlift_federation_tutorial/warehouse_airflow_dags $(WAREHOUSE_AIRFLOW_HOME) 8081
$(MAKEFILE_DIR)/scripts/airflow_setup.sh $(MAKEFILE_DIR)/airlift_federation_tutorial/metrics_airflow_dags $(METRICS_AIRFLOW_HOME) 8082

upstream_airflow_run:
AIRFLOW_HOME=$(UPSTREAM_AIRFLOW_HOME) airflow standalone
warehouse_airflow_run:
AIRFLOW_HOME=$(WAREHOUSE_AIRFLOW_HOME) airflow standalone

downstream_airflow_run:
AIRFLOW_HOME=$(DOWNSTREAM_AIRFLOW_HOME) airflow standalone
metrics_airflow_run:
AIRFLOW_HOME=$(METRICS_AIRFLOW_HOME) airflow standalone

dagster_run:
dagster dev -m airlift_federation_tutorial.dagster_defs.definitions -p 3000
Expand All @@ -54,5 +54,5 @@ update_readme_snippets:
python ../../scripts/update_readme_snippets.py $(MAKEFILE_DIR)/README.md

wipe:
rm -rf $$UPSTREAM_AIRFLOW_HOME $$DOWNSTREAM_AIRFLOW_HOME $$DAGSTER_HOME
rm -rf $$WAREHOUSE_AIRFLOW_HOME $$METRICS_AIRFLOW_HOME $$DAGSTER_HOME

Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from pathlib import Path

FEDERATION_TUTORIAL_ROOT_DIR = Path(__file__).parent.parent
DUCKDB_PATH = FEDERATION_TUTORIAL_ROOT_DIR / "federation_tutorial.db"

CUSTOMERS_CSV_PATH = FEDERATION_TUTORIAL_ROOT_DIR / "raw_customers.csv"
CUSTOMERS_COLS = [
"id",
"first_name",
"last_name",
]
CUSTOMERS_SCHEMA = "raw_data"
CUSTOMERS_DB_NAME = "federation_tutorial"
CUSTOMERS_TABLE_NAME = "raw_customers"
METRICS_SCHEMA = "metrics"
METRICS_TABLE_NAME = "customer_count"
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,28 @@
load_airflow_dag_asset_specs,
)

upstream_airflow_instance = AirflowInstance(
warehouse_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8081",
username="admin",
password="admin",
),
name="upstream",
name="warehouse",
)

downstream_airflow_instance = AirflowInstance(
metrics_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8082",
username="admin",
password="admin",
),
name="downstream",
name="metrics",
)

load_customers_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=upstream_airflow_instance,
airflow_instance=warehouse_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "load_customers",
)
)
Expand All @@ -46,7 +46,7 @@
next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=downstream_airflow_instance,
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
)
Expand All @@ -59,21 +59,21 @@

@multi_asset(specs=[customer_metrics_dag_asset])
def run_customer_metrics() -> MaterializeResult:
run_id = downstream_airflow_instance.trigger_dag("customer_metrics")
downstream_airflow_instance.wait_for_run_completion("customer_metrics", run_id)
if downstream_airflow_instance.get_run_state("customer_metrics", run_id) == "success":
run_id = metrics_airflow_instance.trigger_dag("customer_metrics")
metrics_airflow_instance.wait_for_run_completion("customer_metrics", run_id)
if metrics_airflow_instance.get_run_state("customer_metrics", run_id) == "success":
return MaterializeResult(asset_key=customer_metrics_dag_asset.key)
else:
raise Exception("Dag run failed.")


upstream_sensor = build_airflow_polling_sensor(
warehouse_sensor = build_airflow_polling_sensor(
mapped_assets=[load_customers_dag_asset],
airflow_instance=upstream_airflow_instance,
airflow_instance=warehouse_airflow_instance,
)
downstream_sensor = build_airflow_polling_sensor(
metrics_sensor = build_airflow_polling_sensor(
mapped_assets=[customer_metrics_dag_asset],
airflow_instance=downstream_airflow_instance,
airflow_instance=metrics_airflow_instance,
)

automation_sensor = AutomationConditionSensorDefinition(
Expand All @@ -85,5 +85,5 @@ def run_customer_metrics() -> MaterializeResult:

defs = Definitions(
assets=[load_customers_dag_asset, run_customer_metrics],
sensors=[upstream_sensor, downstream_sensor],
sensors=[warehouse_sensor, metrics_sensor],
)
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,28 @@
load_airflow_dag_asset_specs,
)

upstream_airflow_instance = AirflowInstance(
warehouse_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8081",
username="admin",
password="admin",
),
name="upstream",
name="warehouse",
)

downstream_airflow_instance = AirflowInstance(
metrics_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8082",
username="admin",
password="admin",
),
name="downstream",
name="metrics",
)

load_customers_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=upstream_airflow_instance,
airflow_instance=warehouse_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "load_customers",
)
)
Expand All @@ -37,7 +37,7 @@
next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=downstream_airflow_instance,
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
)
Expand All @@ -46,16 +46,16 @@
deps=[load_customers_dag_asset],
)

upstream_sensor = build_airflow_polling_sensor(
warehouse_sensor = build_airflow_polling_sensor(
mapped_assets=[load_customers_dag_asset],
airflow_instance=upstream_airflow_instance,
airflow_instance=warehouse_airflow_instance,
)
downstream_sensor = build_airflow_polling_sensor(
metrics_sensor = build_airflow_polling_sensor(
mapped_assets=[customer_metrics_dag_asset],
airflow_instance=downstream_airflow_instance,
airflow_instance=metrics_airflow_instance,
)

defs = Definitions(
assets=[load_customers_dag_asset, customer_metrics_dag_asset],
sensors=[upstream_sensor, downstream_sensor],
sensors=[warehouse_sensor, metrics_sensor],
)

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import os
from datetime import datetime
from typing import Union

import duckdb
import pandas as pd
from airflow import DAG
from airflow.operators.python import PythonOperator
from airlift_federation_tutorial.constants import (
CUSTOMERS_DB_NAME,
CUSTOMERS_SCHEMA,
CUSTOMERS_TABLE_NAME,
DUCKDB_PATH,
METRICS_SCHEMA,
METRICS_TABLE_NAME,
)


def calculate_customer_count() -> None:
os.environ["NO_PROXY"] = "*"

con = duckdb.connect(str(DUCKDB_PATH))

result: Union[tuple, None] = con.execute(
f"SELECT COUNT(*) FROM {CUSTOMERS_DB_NAME}.{CUSTOMERS_SCHEMA}.{CUSTOMERS_TABLE_NAME}"
).fetchone()
if not result:
raise ValueError("No customers found")
count_df = pd.DataFrame([{"date": datetime.now().date(), "total_customers": result[0]}]) # noqa: F841 # used by duckdb

con.execute(f"CREATE SCHEMA IF NOT EXISTS {METRICS_SCHEMA}").fetchall()

con.execute(f"""
CREATE TABLE IF NOT EXISTS {CUSTOMERS_DB_NAME}.{METRICS_SCHEMA}.{METRICS_TABLE_NAME} (
date DATE,
total_customers INTEGER
)
""").fetchall()

con.execute(
f"INSERT INTO {CUSTOMERS_DB_NAME}.{METRICS_SCHEMA}.{METRICS_TABLE_NAME} SELECT * FROM count_df"
).fetchall()

con.close()


with DAG(
dag_id="customer_metrics",
is_paused_upon_creation=False,
) as dag:
count_task = PythonOperator(
task_id="calculate_customer_count",
python_callable=calculate_customer_count,
dag=dag,
)

for dag_id in ["orders_metrics", "products_metrics", "payments_metrics", "sales_metrics"]:
with DAG(
dag_id=dag_id,
is_paused_upon_creation=False,
) as dag:
PythonOperator(
task_id="task",
python_callable=lambda: None,
dag=dag,
)
globals()[dag_id] = dag

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import os

import duckdb
import pandas as pd
from airflow import DAG
from airflow.operators.python import PythonOperator
from airlift_federation_tutorial.constants import (
CUSTOMERS_COLS,
CUSTOMERS_CSV_PATH,
CUSTOMERS_DB_NAME,
CUSTOMERS_SCHEMA,
CUSTOMERS_TABLE_NAME,
DUCKDB_PATH,
)


def load_customers() -> None:
# https://github.com/apache/airflow/discussions/24463
os.environ["NO_PROXY"] = "*"
df = pd.read_csv( # noqa: F841 # used by duckdb
CUSTOMERS_CSV_PATH,
names=CUSTOMERS_COLS,
)

# Connect to DuckDB and create a new table
con = duckdb.connect(str(DUCKDB_PATH))
con.execute(f"CREATE SCHEMA IF NOT EXISTS {CUSTOMERS_SCHEMA}").fetchall()
con.execute(
f"CREATE TABLE IF NOT EXISTS {CUSTOMERS_DB_NAME}.{CUSTOMERS_SCHEMA}.{CUSTOMERS_TABLE_NAME} AS SELECT * FROM df"
).fetchall()
con.close()


with DAG(
dag_id="load_customers",
is_paused_upon_creation=False,
) as dag:
PythonOperator(
task_id="load_customers_to_warehouse",
python_callable=load_customers,
dag=dag,
)

# Define some dummy DAGs to simulate a big Airflow instance
for dag_id in ["load_orders", "load_products", "load_payments", "load_sales"]:
with DAG(
dag_id=dag_id,
is_paused_upon_creation=False,
) as dag:
PythonOperator(
task_id="task",
python_callable=lambda: None,
dag=dag,
)
globals()[dag_id] = dag
Loading