forked from dagster-io/dagster
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
DOC-591 Switch embedded example code to CodeExample component (dagste…
…r-io#27605) ## Summary & Motivation See title -- need to do this before re-implementing versioned docs. ## How I Tested These Changes ## Changelog > Insert changelog entry or delete this section. --------- Signed-off-by: nikki everett <[email protected]>
- Loading branch information
Showing
67 changed files
with
384 additions
and
7,175 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,54 +58,7 @@ To set up a branch deployment workflow to construct and test these tables, we wi | |
|
||
In production, we want to write three tables to Snowflake: `ITEMS`, `COMMENTS`, and `STORIES`. We can define these tables as assets as follows: | ||
|
||
{/* TODO convert to <CodeExample> */} | ||
```python file=/guides/dagster/development_to_production/assets.py startafter=start_assets endbefore=end_assets | ||
# assets.py | ||
import pandas as pd | ||
import requests | ||
|
||
from dagster import Config, asset | ||
|
||
|
||
class ItemsConfig(Config): | ||
base_item_id: int | ||
|
||
|
||
@asset( | ||
io_manager_key="snowflake_io_manager", | ||
) | ||
def items(config: ItemsConfig) -> pd.DataFrame: | ||
"""Items from the Hacker News API: each is a story or a comment on a story.""" | ||
rows = [] | ||
max_id = requests.get( | ||
"https://hacker-news.firebaseio.com/v0/maxitem.json", timeout=5 | ||
).json() | ||
# Hacker News API is 1-indexed, so adjust range by 1 | ||
for item_id in range(max_id - config.base_item_id + 1, max_id + 1): | ||
item_url = f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json" | ||
rows.append(requests.get(item_url, timeout=5).json()) | ||
|
||
# ITEM_FIELD_NAMES is a list of the column names in the Hacker News dataset | ||
result = pd.DataFrame(rows, columns=ITEM_FIELD_NAMES).drop_duplicates(subset=["id"]) | ||
result.rename(columns={"by": "user_id"}, inplace=True) | ||
return result | ||
|
||
|
||
@asset( | ||
io_manager_key="snowflake_io_manager", | ||
) | ||
def comments(items: pd.DataFrame) -> pd.DataFrame: | ||
"""Comments from the Hacker News API.""" | ||
return items[items["type"] == "comment"] | ||
|
||
|
||
@asset( | ||
io_manager_key="snowflake_io_manager", | ||
) | ||
def stories(items: pd.DataFrame) -> pd.DataFrame: | ||
"""Stories from the Hacker News API.""" | ||
return items[items["type"] == "story"] | ||
``` | ||
<CodeExample path="docs_snippets/docs_snippets/guides/dagster/development_to_production/assets.py" startAfter="start_assets" endBefore="end_assets" /> | ||
|
||
As you can see, our assets use an [I/O manager](/guides/build/io-managers/) named `snowflake_io_manager`. Using I/O managers and other resources allow us to swap out implementations per environment without modifying our business logic. | ||
|
||
|
@@ -119,46 +72,7 @@ Dagster automatically sets certain [environment variables](/dagster-plus/deploym | |
|
||
Because we want to configure our assets to write to Snowflake using a different set of credentials and database in each environment, we'll configure a separate I/O manager for each environment: | ||
|
||
{/* TODO convert to <CodeExample> */} | ||
```python file=/guides/dagster/development_to_production/branch_deployments/repository_v1.py startafter=start_repository endbefore=end_repository | ||
# definitions.py | ||
from dagster import Definitions | ||
|
||
from ..assets import comments, items, stories | ||
|
||
snowflake_config = { | ||
"account": "abc1234.us-east-1", | ||
"user": "[email protected]", | ||
"password": {"env": "SYSTEM_SNOWFLAKE_PASSWORD"}, | ||
"schema": "HACKER_NEWS", | ||
} | ||
|
||
resources = { | ||
"branch": { | ||
"snowflake_io_manager": SnowflakePandasIOManager( | ||
**snowflake_config, | ||
database=f"PRODUCTION_CLONE_{os.getenv('DAGSTER_CLOUD_PULL_REQUEST_ID')}", | ||
), | ||
}, | ||
"prod": { | ||
"snowflake_io_manager": SnowflakePandasIOManager( | ||
**snowflake_config, | ||
database="PRODUCTION", | ||
), | ||
}, | ||
} | ||
|
||
|
||
def get_current_env(): | ||
is_branch_depl = os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT") == "1" | ||
assert is_branch_depl is not None # env var must be set | ||
return "branch" if is_branch_depl else "prod" | ||
|
||
|
||
defs = Definitions( | ||
assets=[items, comments, stories], resources=resources[get_current_env()] | ||
) | ||
``` | ||
<CodeExample path="docs_snippets/docs_snippets/guides/dagster/development_to_production/branch_deployments/repository_v1.py" startAfter="start_repository" endBefore="end_repository" /> | ||
|
||
Refer to the [Dagster+ environment variables documentation](/dagster-plus/deployment/management/environment-variables/) for more info about available environment variables. | ||
|
||
|
@@ -177,89 +91,17 @@ these tasks, like viewing them in the Global Asset Graph. | |
|
||
::: | ||
|
||
{/* TODO convert to <CodeExample> */} | ||
```python file=/guides/dagster/development_to_production/branch_deployments/clone_and_drop_db.py startafter=start_clone_db endbefore=end_clone_db | ||
from dagster_snowflake import SnowflakeResource | ||
|
||
from dagster import In, Nothing, graph, op | ||
|
||
|
||
@op | ||
def drop_database_clone(snowflake: SnowflakeResource): | ||
with snowflake.get_connection() as conn: | ||
cur = conn.cursor() | ||
cur.execute( | ||
"DROP DATABASE IF EXISTS" | ||
f" PRODUCTION_CLONE_{os.environ['DAGSTER_CLOUD_PULL_REQUEST_ID']}" | ||
) | ||
|
||
|
||
@op(ins={"start": In(Nothing)}) | ||
def clone_production_database(snowflake: SnowflakeResource): | ||
with snowflake.get_connection() as conn: | ||
cur = conn.cursor() | ||
cur.execute( | ||
"CREATE DATABASE" | ||
f" PRODUCTION_CLONE_{os.environ['DAGSTER_CLOUD_PULL_REQUEST_ID']} CLONE" | ||
' "PRODUCTION"' | ||
) | ||
|
||
|
||
@graph | ||
def clone_prod(): | ||
clone_production_database(start=drop_database_clone()) | ||
|
||
|
||
@graph | ||
def drop_prod_clone(): | ||
drop_database_clone() | ||
``` | ||
<CodeExample path="docs_snippets/docs_snippets/guides/dagster/development_to_production/branch_deployments/clone_and_drop_db.py" startAfter="start_clone_db" endBefore="end_clone_db" /> | ||
|
||
We've defined `drop_database_clone` and `clone_production_database` to utilize the <PyObject section="libraries" object="SnowflakeResource" module="dagster_snowflake" />. The Snowflake resource will use the same configuration as the Snowflake I/O manager to generate a connection to Snowflake. However, while our I/O manager writes outputs to Snowflake, the Snowflake resource executes queries against Snowflake. | ||
|
||
We now need to define resources that configure our jobs to the current environment. We can modify the resource mapping by environment as follows: | ||
|
||
{/* TODO convert to <CodeExample> */} | ||
```python file=/guides/dagster/development_to_production/branch_deployments/repository_v2.py startafter=start_resources endbefore=end_resources | ||
resources = { | ||
"branch": { | ||
"snowflake_io_manager": SnowflakePandasIOManager( | ||
**snowflake_config, | ||
database=f"PRODUCTION_CLONE_{os.getenv('DAGSTER_CLOUD_PULL_REQUEST_ID')}", | ||
), | ||
"snowflake": SnowflakeResource( | ||
**snowflake_config, | ||
database=f"PRODUCTION_CLONE_{os.getenv('DAGSTER_CLOUD_PULL_REQUEST_ID')}", | ||
), | ||
}, | ||
"prod": { | ||
"snowflake_io_manager": SnowflakePandasIOManager( | ||
**snowflake_config, | ||
database="PRODUCTION", | ||
), | ||
"snowflake": SnowflakeResource(**snowflake_config, database="PRODUCTION"), | ||
}, | ||
} | ||
``` | ||
<CodeExample path="docs_snippets/docs_snippets/guides/dagster/development_to_production/branch_deployments/repository_v2.py" startAfter="start_resources" endBefore="end_resources" /> | ||
|
||
Then, we can add the `clone_prod` and `drop_prod_clone` jobs that now use the appropriate resource to the environment and add them to our definitions: | ||
|
||
{/* TODO convert to <CodeExample> */} | ||
```python file=/guides/dagster/development_to_production/branch_deployments/repository_v2.py startafter=start_repository endbefore=end_repository | ||
branch_deployment_jobs = [ | ||
clone_prod.to_job(), | ||
drop_prod_clone.to_job(), | ||
] | ||
defs = Definitions( | ||
assets=[items, comments, stories], | ||
resources=resources[get_current_env()], | ||
jobs=( | ||
branch_deployment_jobs | ||
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT") == "1" | ||
else [] | ||
), | ||
) | ||
``` | ||
<CodeExample path="docs_snippets/docs_snippets/guides/dagster/development_to_production/branch_deployments/repository_v2.py" startAfter="start_repository" endBefore="end_repository" /> | ||
|
||
## Step 4: Create our database clone upon opening a branch | ||
|
||
|
@@ -268,37 +110,7 @@ defs = Definitions( | |
|
||
The `branch_deployments.yml` file located in `.github/workflows/branch_deployments.yml` defines a `dagster_cloud_build_push` job with a series of steps that launch a branch deployment. Because we want to queue a run of `clone_prod` within each deployment after it launches, we'll add an additional step at the end `dagster_cloud_build_push`. This job is triggered on multiple pull request events: `opened`, `synchronize`, `reopen`, and `closed`. This means that upon future pushes to the branch, we'll trigger a run of `clone_prod`. The `if` condition below ensures that `clone_prod` will not run if the pull request is closed: | ||
|
||
{/* TODO convert to <CodeExample> */} | ||
```yaml file=/guides/dagster/development_to_production/branch_deployments/clone_prod.yaml | ||
# .github/workflows/branch_deployments.yml | ||
|
||
name: Dagster Branch Deployments | ||
on: | ||
pull_request: | ||
types: [opened, synchronize, reopened, closed] | ||
env: | ||
DAGSTER_CLOUD_URL: ${{ secrets.DAGSTER_CLOUD_URL }} | ||
|
||
jobs: | ||
dagster_cloud_build_push: | ||
runs-on: ubuntu-latest | ||
name: Dagster Branch Deployments | ||
strategy: | ||
... | ||
steps: | ||
# Existing steps here | ||
... | ||
- name: Clone Snowflake schema upon launch | ||
if: github.event.action != 'closed' | ||
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected] | ||
with: | ||
location_name: ${{ matrix.location.name }} | ||
deployment: ${{ steps.deploy.outputs.deployment }} | ||
job_name: clone_prod | ||
env: | ||
DAGSTER_CLOUD_URL: ${{ secrets.DAGSTER_CLOUD_URL }} | ||
DAGSTER_CLOUD_API_TOKEN: ${{ secrets.DAGSTER_CLOUD_API_TOKEN }} | ||
``` | ||
<CodeExample path="docs_snippets/docs_snippets/guides/dagster/development_to_production/branch_deployments/clone_prod.yaml" /> | ||
|
||
Opening a pull request for our current branch will automatically kick off a branch deployment. After the deployment launches, we can confirm that the `clone_prod` job has run: | ||
|
||
|
@@ -315,53 +127,7 @@ We can also view our database in Snowflake to confirm that a clone exists for ea | |
|
||
The `.gitlab-ci.yaml` script contains a `deploy` job that defines a series of steps that launch a branch deployment. Because we want to queue a run of `clone_prod` within each deployment after it launches, we'll add an additional step at the end of `deploy`. This job is triggered on when a merge request is created or updated. This means that upon future pushes to the branch, we'll trigger a run of `clone_prod`. | ||
|
||
```yaml file=/guides/dagster/development_to_production/branch_deployments/clone_prod.gitlab-ci.yml | ||
# .gitlab-ci.yml | ||
stages: | ||
- setup | ||
- build | ||
- deploy | ||
workflow: | ||
rules: | ||
- if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH | ||
- if: $CI_PIPELINE_SOURCE == 'merge_request_event' | ||
parse-workspace: | ||
... | ||
build-image: | ||
... | ||
deploy-docker: | ||
... | ||
deploy-docker-branch: | ||
stage: deploy | ||
rules: | ||
- if: $CI_PIPELINE_SOURCE == 'merge_request_event' | ||
dependencies: | ||
- build-image | ||
- parse-workspace | ||
image: ghcr.io/dagster-io/dagster-cloud-action:0.1.23 | ||
script: | ||
# Existing steps here | ||
... | ||
# Add a step to launch the job cloning the prod db | ||
- dagster-plus job launch | ||
--url "$DAGSTER_CLOUD_URL/$DEPLOYMENT_NAME" | ||
--api-token "$DAGSTER_CLOUD_API_TOKEN" | ||
--location "location_name_containing_clone_prod_job" | ||
--job clone_prod | ||
environment: | ||
name: branch/$CI_COMMIT_REF_NAME | ||
on_stop: close_branch | ||
close_branch: | ||
... | ||
``` | ||
<CodeExample path="docs_snippets/docs_snippets/guides/dagster/development_to_production/branch_deployments/clone_prod.gitlab-ci.yml" /> | ||
|
||
Opening a merge request for our current branch will automatically kick off a branch deployment. After the deployment launches, we can confirm that the `clone_prod` job has run: | ||
|
||
|
@@ -382,91 +148,14 @@ We can also view our database in Snowflake to confirm that a clone exists for ea | |
|
||
Finally, we can add a step to our `branch_deployments.yml` file that queues a run of our `drop_prod_clone` job: | ||
|
||
{/* TODO convert to <CodeExample> */} | ||
```yaml file=/guides/dagster/development_to_production/branch_deployments/drop_db_clone.yaml | ||
# .github/workflows/branch_deployments.yml | ||
name: Dagster Branch Deployments | ||
on: | ||
pull_request: | ||
types: [opened, synchronize, reopened, closed] | ||
env: | ||
DAGSTER_CLOUD_URL: ${{ secrets.DAGSTER_CLOUD_URL }} | ||
jobs: | ||
dagster_cloud_build_push: | ||
runs-on: ubuntu-latest | ||
name: Dagster Branch Deployments | ||
strategy: | ||
... | ||
steps: | ||
# Existing steps here | ||
... | ||
- name: Clone Snowflake schema upon launch | ||
... | ||
- name: Delete schema clone upon PR close | ||
if: github.event.action == 'closed' | ||
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected] | ||
with: | ||
location_name: ${{ matrix.location.name }} | ||
deployment: ${{ steps.deploy.outputs.deployment }} | ||
job_name: drop_prod_clone | ||
env: | ||
DAGSTER_CLOUD_URL: ${{ secrets.DAGSTER_CLOUD_URL }} | ||
DAGSTER_CLOUD_API_TOKEN: ${{ secrets.DAGSTER_CLOUD_API_TOKEN }} | ||
``` | ||
<CodeExample path="docs_snippets/docs_snippets/guides/dagster/development_to_production/branch_deployments/drop_db_clone.yaml" /> | ||
|
||
</TabItem> | ||
<TabItem value="Using Gitlab CI/CD"> | ||
|
||
Finally, we can add a step to our `.gitlab-ci.yml` file that queues a run of our `drop_prod_clone` job: | ||
|
||
{/* TODO convert to <CodeExample> */} | ||
```yaml file=/guides/dagster/development_to_production/branch_deployments/drop_db_clone.gitlab-ci.yml | ||
# .gitlab-ci.yml | ||
stages: | ||
- setup | ||
- build | ||
- deploy | ||
workflow: | ||
rules: | ||
- if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH | ||
- if: $CI_PIPELINE_SOURCE == 'merge_request_event' | ||
parse-workspace: | ||
... | ||
build-image: | ||
... | ||
deploy-docker: | ||
... | ||
deploy-docker-branch: | ||
... | ||
close_branch: | ||
stage: deploy | ||
image: ghcr.io/dagster-io/dagster-cloud-action:0.1.23 | ||
when: manual | ||
only: | ||
- merge_requests | ||
script: | ||
# Existing steps here | ||
... | ||
# Add a step to launch the job dropping the cloned db | ||
- dagster-plus job launch | ||
--url "$DAGSTER_CLOUD_URL/$DEPLOYMENT_NAME" | ||
--api-token "$DAGSTER_CLOUD_API_TOKEN" | ||
--location "location_name_containing_drop_prod_clone_job" | ||
--job drop_prod_clone | ||
environment: | ||
name: branch/$CI_COMMIT_REF_NAME | ||
action: stop | ||
``` | ||
<CodeExample path="docs_snippets/docs_snippets/guides/dagster/development_to_production/branch_deployments/drop_db_clone.gitlab-ci.yml" /> | ||
|
||
</TabItem> | ||
</Tabs> | ||
|
Oops, something went wrong.