Skip to content

Commit

Permalink
[dagster-airlift] Customizing dag proxying operator reference
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Nov 8, 2024
1 parent c6e939c commit fa96690
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 1,295 deletions.
52 changes: 52 additions & 0 deletions docs/content/integrations/airlift/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,55 @@ defs = Definitions.merge(
),
)
```

### Customizing DAG proxying operator

Similar to how we can customize the operator we construct on a per-dag basis, we can customize the operator we construct on a per-dag basis. We can use the `build_from_dag_fn` argument of `proxying_to_dagster` to provide a custom operator in place of the default.

For example, let's take a look at the following custom operator which expects an API key to be provided as a variable:

```python file=../../experimental/dagster-airlift/examples/tutorial-example/tutorial_example/snippets/custom_operator_examples/custom_dag_level_proxy.py
from pathlib import Path

import requests
from airflow import DAG
from airflow.utils.context import Context
from dagster_airlift.in_airflow import BaseProxyDAGToDagsterOperator, proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml


class CustomProxyToDagsterOperator(BaseProxyDAGToDagsterOperator):
def get_dagster_session(self, context: Context) -> requests.Session:
if "var" not in context:
raise ValueError("No variables found in context")
api_key = context["var"]["value"].get("my_api_key")
session = requests.Session()
session.headers.update({"Authorization": f"Bearer {api_key}"})
return session

def get_dagster_url(self, context: Context) -> str:
return "https://dagster.example.com/"

# This method controls how the operator is built from the dag.
@classmethod
def build_from_dag(cls, dag: DAG):
return CustomProxyToDagsterOperator(dag=dag, task_id="OVERRIDDEN")


dag = DAG(
dag_id="custom_dag_level_proxy_example",
)

# At the end of your dag file
proxying_to_dagster(
global_vars=globals(),
proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"),
build_from_dag_fn=CustomProxyToDagsterOperator.build_from_dag,
)
```

`BaseProxyDAGToDagsterOperator` has three abstract methods which must be implemented:

- `get_dagster_session`, which controls the creation of a valid session to access the Dagster graphql API.
- `get_dagster_url`, which retrieves the domain at which the dagster webserver lives.
- `build_from_dag`, which controls how the proxying task is constructed from the provided DAG.
Loading

0 comments on commit fa96690

Please sign in to comment.