Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-airlift] Customizing dag proxying operator reference #25830

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 53 additions & 1 deletion docs/content/integrations/airlift/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description: "dagster-airlift is a toolkit for observing and migrating Airflow D

# `dagster-airlift` integration reference

`dagster-airlift` is a toolkit for observing and migrating Airflow DAGs within Dagster. This reference page provides additional information for working with `dagster-airlift` that is not provided within the tutorial. You should start by reading the [dagster-airlift tutorial](/integrations/airlift/tutorial) before using this reference page.
`dagster-airlift` is a toolkit for observing and migrating Airflow DAGs within Dagster. This reference page provides additional information for working with `dagster-airlift` that is not provided within the tutorial. You should start by reading the [dagster-airlift tutorial](/integrations/airlift/tutorial/overview) before using this reference page.

- [Supporting custom authorization](#supporting-custom-authorization)
- [Dagster Plus Authorization](#dagster-plus-authorization)
Expand Down Expand Up @@ -123,3 +123,55 @@ defs = Definitions.merge(
),
)
```

### Customizing DAG proxying operator

Similar to how we can customize the operator we construct on a per-dag basis, we can customize the operator we construct on a per-dag basis. We can use the `build_from_dag_fn` argument of `proxying_to_dagster` to provide a custom operator in place of the default.

For example, in the following example we can see that the operator is customized to provide an authorization header which authenticates Dagster.

```python file=../../experimental/dagster-airlift/examples/tutorial-example/tutorial_example/snippets/custom_operator_examples/custom_dag_level_proxy.py
from pathlib import Path

import requests
from airflow import DAG
from airflow.utils.context import Context
from dagster_airlift.in_airflow import BaseProxyDAGToDagsterOperator, proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml


class CustomProxyToDagsterOperator(BaseProxyDAGToDagsterOperator):
def get_dagster_session(self, context: Context) -> requests.Session:
if "var" not in context:
raise ValueError("No variables found in context")
api_key = context["var"]["value"].get("my_api_key")
session = requests.Session()
session.headers.update({"Authorization": f"Bearer {api_key}"})
return session

def get_dagster_url(self, context: Context) -> str:
return "https://dagster.example.com/"

# This method controls how the operator is built from the dag.
@classmethod
def build_from_dag(cls, dag: DAG):
return CustomProxyToDagsterOperator(dag=dag, task_id="OVERRIDDEN")


dag = DAG(
dag_id="custom_dag_level_proxy_example",
)

# At the end of your dag file
proxying_to_dagster(
global_vars=globals(),
proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"),
build_from_dag_fn=CustomProxyToDagsterOperator.build_from_dag,
)
```

`BaseProxyDAGToDagsterOperator` has three abstract methods which must be implemented:

- `get_dagster_session`, which controls the creation of a valid session to access the Dagster graphql API.
- `get_dagster_url`, which retrieves the domain at which the dagster webserver lives.
- `build_from_dag`, which controls how the proxying task is constructed from the provided DAG.
Loading