-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
17 changed files
with
345 additions
and
100 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
*.**airflow_home* | ||
*.dagster_home* | ||
customers.csv | ||
*.db | ||
|
||
# Byte-compiled / optimized / DLL files | ||
__pycache__/ | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
16 changes: 16 additions & 0 deletions
16
examples/airlift-federation-tutorial/airlift_federation_tutorial/constants.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
from pathlib import Path | ||
|
||
FEDERATION_TUTORIAL_ROOT_DIR = Path(__file__).parent.parent | ||
DUCKDB_PATH = FEDERATION_TUTORIAL_ROOT_DIR / "federation_tutorial.db" | ||
|
||
CUSTOMERS_CSV_PATH = FEDERATION_TUTORIAL_ROOT_DIR / "raw_customers.csv" | ||
CUSTOMERS_COLS = [ | ||
"id", | ||
"first_name", | ||
"last_name", | ||
] | ||
CUSTOMERS_SCHEMA = "raw_data" | ||
CUSTOMERS_DB_NAME = "federation_tutorial" | ||
CUSTOMERS_TABLE_NAME = "raw_customers" | ||
METRICS_SCHEMA = "metrics" | ||
METRICS_TABLE_NAME = "customer_count" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
6 changes: 0 additions & 6 deletions
6
...s/airlift-federation-tutorial/airlift_federation_tutorial/downstream_airflow_dags/dags.py
This file was deleted.
Oops, something went wrong.
67 changes: 67 additions & 0 deletions
67
...ples/airlift-federation-tutorial/airlift_federation_tutorial/metrics_airflow_dags/dags.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
import os | ||
from datetime import datetime | ||
from typing import Union | ||
|
||
import duckdb | ||
import pandas as pd | ||
from airflow import DAG | ||
from airflow.operators.python import PythonOperator | ||
from airlift_federation_tutorial.constants import ( | ||
CUSTOMERS_DB_NAME, | ||
CUSTOMERS_SCHEMA, | ||
CUSTOMERS_TABLE_NAME, | ||
DUCKDB_PATH, | ||
METRICS_SCHEMA, | ||
METRICS_TABLE_NAME, | ||
) | ||
|
||
|
||
def calculate_customer_count() -> None: | ||
os.environ["NO_PROXY"] = "*" | ||
|
||
con = duckdb.connect(str(DUCKDB_PATH)) | ||
|
||
result: Union[tuple, None] = con.execute( | ||
f"SELECT COUNT(*) FROM {CUSTOMERS_DB_NAME}.{CUSTOMERS_SCHEMA}.{CUSTOMERS_TABLE_NAME}" | ||
).fetchone() | ||
if not result: | ||
raise ValueError("No customers found") | ||
count_df = pd.DataFrame([{"date": datetime.now().date(), "total_customers": result[0]}]) # noqa: F841 # used by duckdb | ||
|
||
con.execute(f"CREATE SCHEMA IF NOT EXISTS {METRICS_SCHEMA}").fetchall() | ||
|
||
con.execute(f""" | ||
CREATE TABLE IF NOT EXISTS {CUSTOMERS_DB_NAME}.{METRICS_SCHEMA}.{METRICS_TABLE_NAME} ( | ||
date DATE, | ||
total_customers INTEGER | ||
) | ||
""").fetchall() | ||
|
||
con.execute( | ||
f"INSERT INTO {CUSTOMERS_DB_NAME}.{METRICS_SCHEMA}.{METRICS_TABLE_NAME} SELECT * FROM count_df" | ||
).fetchall() | ||
|
||
con.close() | ||
|
||
|
||
with DAG( | ||
dag_id="customer_metrics", | ||
is_paused_upon_creation=False, | ||
) as dag: | ||
count_task = PythonOperator( | ||
task_id="calculate_customer_count", | ||
python_callable=calculate_customer_count, | ||
dag=dag, | ||
) | ||
|
||
for dag_id in ["orders_metrics", "products_metrics", "payments_metrics", "sales_metrics"]: | ||
with DAG( | ||
dag_id=dag_id, | ||
is_paused_upon_creation=False, | ||
) as dag: | ||
PythonOperator( | ||
task_id="task", | ||
python_callable=lambda: None, | ||
dag=dag, | ||
) | ||
globals()[dag_id] = dag |
6 changes: 0 additions & 6 deletions
6
...les/airlift-federation-tutorial/airlift_federation_tutorial/upstream_airflow_dags/dags.py
This file was deleted.
Oops, something went wrong.
55 changes: 55 additions & 0 deletions
55
...es/airlift-federation-tutorial/airlift_federation_tutorial/warehouse_airflow_dags/dags.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
import os | ||
|
||
import duckdb | ||
import pandas as pd | ||
from airflow import DAG | ||
from airflow.operators.python import PythonOperator | ||
from airlift_federation_tutorial.constants import ( | ||
CUSTOMERS_COLS, | ||
CUSTOMERS_CSV_PATH, | ||
CUSTOMERS_DB_NAME, | ||
CUSTOMERS_SCHEMA, | ||
CUSTOMERS_TABLE_NAME, | ||
DUCKDB_PATH, | ||
) | ||
|
||
|
||
def load_customers() -> None: | ||
# https://github.com/apache/airflow/discussions/24463 | ||
os.environ["NO_PROXY"] = "*" | ||
df = pd.read_csv( # noqa: F841 # used by duckdb | ||
CUSTOMERS_CSV_PATH, | ||
names=CUSTOMERS_COLS, | ||
) | ||
|
||
# Connect to DuckDB and create a new table | ||
con = duckdb.connect(str(DUCKDB_PATH)) | ||
con.execute(f"CREATE SCHEMA IF NOT EXISTS {CUSTOMERS_SCHEMA}").fetchall() | ||
con.execute( | ||
f"CREATE TABLE IF NOT EXISTS {CUSTOMERS_DB_NAME}.{CUSTOMERS_SCHEMA}.{CUSTOMERS_TABLE_NAME} AS SELECT * FROM df" | ||
).fetchall() | ||
con.close() | ||
|
||
|
||
with DAG( | ||
dag_id="load_customers", | ||
is_paused_upon_creation=False, | ||
) as dag: | ||
PythonOperator( | ||
task_id="load_customers_to_warehouse", | ||
python_callable=load_customers, | ||
dag=dag, | ||
) | ||
|
||
# Define some dummy DAGs to simulate a big Airflow instance | ||
for dag_id in ["load_orders", "load_products", "load_payments", "load_sales"]: | ||
with DAG( | ||
dag_id=dag_id, | ||
is_paused_upon_creation=False, | ||
) as dag: | ||
PythonOperator( | ||
task_id="task", | ||
python_callable=lambda: None, | ||
dag=dag, | ||
) | ||
globals()[dag_id] = dag |
Oops, something went wrong.