-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dagster-airlift] Migration #25822
[dagster-airlift] Migration #25822
Conversation
|
||
To begin proxying tasks in a DAG, first you will need a file to track proxying state. In your Airflow DAG directory, create a `proxied_state` folder, and in it create a yaml file with the same name as your DAG. The included example at `airflow_dags/proxied_state` is used by `make airflow_run`, and can be used as a template for your own proxied state files. | ||
|
||
Given our example DAG `rebuild_customers_list` with three tasks, `load_raw_customers`, `run_dbt_model`, and `export_customers`, `proxied_state/rebuild_customers_list.yaml` should look like the following: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The task name run_dbt_model
appears to be incorrect here. The rest of the documentation and YAML examples use build_dbt_models
instead. For consistency, this reference should be updated to match the actual task name.
Spotted by Graphite Reviewer
Is this helpful? React 👍 or 👎 to let us know.
|
||
For all other operator types, we will need to build our own asset definitions. We recommend creating a factory function whose arguments match the inputs to your Airflow operator. Then, you can use this factory to build definitions for each Airflow task. | ||
|
||
For example, our `load_raw_customers` task uses a custom `LoadCSVToDuckDB` operator. We'll define a function `load_csv_to_duckdb_defs` factory to build corresponding software-defined assets. Similarly for `export_customers` we'll define a function `export_duckdb_to_csv_defs` to build SDAs: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There appears to be a discrepancy between the text and the code implementation. The text refers to a function named load_csv_to_duckdb_defs
, but the actual implementation shows load_csv_to_duckdb_asset
. The text should be updated to match the implementation for accuracy and to avoid confusion for readers.
Spotted by Graphite Reviewer
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved w/ a couple of small comments.
|
||
Up until now, we have not touched the Airflow code base at all. Now it's time to begin the actual migration process. | ||
|
||
Once you have created corresponding definitions in Dagster to your Airflow tasks, you can proxy execution to Dagster on a per-task basis while Airflow is still controlling scheduling and orchestration. Once a task has been proxied, Airflow will kick off materializations of corresponding Dagster assets in place of executing the business logic of that task. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once you have created corresponding definitions in Dagster to your Airflow tasks, you can proxy execution to Dagster on a per-task basis while Airflow is still controlling scheduling and orchestration. Once a task has been proxied, Airflow will kick off materializations of corresponding Dagster assets in place of executing the business logic of that task. | |
Once you have created the corresponding definitions in Dagster for your Airflow tasks, you can proxy execution to Dagster on a per-task basis while Airflow still controls scheduling and orchestration. Once a task has been proxied, Airflow will kick off materializations of corresponding Dagster assets instead of executing the business logic of that task. |
|
||
Set `PROXYING` to `True` or eliminate the `if` statement. | ||
|
||
The DAG will now display its proxied state in the Airflow UI. (There is some latency as Airflow evaluates the Python file periodically.) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The DAG will now display its proxied state in the Airflow UI. (There is some latency as Airflow evaluates the Python file periodically.) | |
The DAG will now display its proxied state in the Airflow UI. |
I think it's ok to forgo the latency warning here
|
||
In order to proxy a task, you must do two things: | ||
|
||
1. First, ensure all associated assets are executable in Dagster by providing asset definitions in place of bare asset specs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may confused some people.
If by default we create AssetSpecs representing tasks through Airlift, maybe this can be rephrased to indicate that an asset must be defined for the task implementing the logic of that Airflow task.
I think we still need to do a better job of teaching people asset specs vs assets in our docs.
4b97bdf
to
d7e7201
Compare
2570820
to
bcb4130
Compare
d7e7201
to
63cdef7
Compare
bcb4130
to
1d9e857
Compare
1d9e857
into
dpeng817/remove_sections_in_reference
Summary & Motivation
Migration section. I didn't pull in the asset checks section, I think that one is going to need to be split up / interspersed with the other sections.