Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DOC-591 Switch embedded example code to CodeExample component #27605

Merged
merged 9 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/docs-beta/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ To include code snippets, use the following format:
<CodeExample path="path/to/file.py" />
```

You can optionally include [additional properties](https://github.com/dagster-io/dagster/blob/master/docs/docs-beta/src/components/CodeExample.tsx#L4), such as `language`, `title`, `lineStart`, `lineEnd`, `startAfter`, and `endBefore`:
You can optionally include [additional properties](https://github.com/dagster-io/dagster/blob/master/docs/docs-beta/src/components/CodeExample.tsx#L6), such as `language`, `title`, `lineStart`, `lineEnd`, `startAfter`, and `endBefore`:

```
<CodeExample path="path/to/file.py" language="python" startAfter="start-after-comment" endBefore="end-before-comment" title="My example" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,54 +58,7 @@ To set up a branch deployment workflow to construct and test these tables, we wi

In production, we want to write three tables to Snowflake: `ITEMS`, `COMMENTS`, and `STORIES`. We can define these tables as assets as follows:

{/* TODO convert to <CodeExample> */}
```python file=/guides/dagster/development_to_production/assets.py startafter=start_assets endbefore=end_assets
# assets.py
import pandas as pd
import requests

from dagster import Config, asset


class ItemsConfig(Config):
base_item_id: int


@asset(
io_manager_key="snowflake_io_manager",
)
def items(config: ItemsConfig) -> pd.DataFrame:
"""Items from the Hacker News API: each is a story or a comment on a story."""
rows = []
max_id = requests.get(
"https://hacker-news.firebaseio.com/v0/maxitem.json", timeout=5
).json()
# Hacker News API is 1-indexed, so adjust range by 1
for item_id in range(max_id - config.base_item_id + 1, max_id + 1):
item_url = f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json"
rows.append(requests.get(item_url, timeout=5).json())

# ITEM_FIELD_NAMES is a list of the column names in the Hacker News dataset
result = pd.DataFrame(rows, columns=ITEM_FIELD_NAMES).drop_duplicates(subset=["id"])
result.rename(columns={"by": "user_id"}, inplace=True)
return result


@asset(
io_manager_key="snowflake_io_manager",
)
def comments(items: pd.DataFrame) -> pd.DataFrame:
"""Comments from the Hacker News API."""
return items[items["type"] == "comment"]


@asset(
io_manager_key="snowflake_io_manager",
)
def stories(items: pd.DataFrame) -> pd.DataFrame:
"""Stories from the Hacker News API."""
return items[items["type"] == "story"]
```
<CodeExample path="docs_snippets/docs_snippets/guides/dagster/development_to_production/assets.py" startAfter="start_assets" endBefore="end_assets" />

As you can see, our assets use an [I/O manager](/guides/build/io-managers/) named `snowflake_io_manager`. Using I/O managers and other resources allow us to swap out implementations per environment without modifying our business logic.

Expand All @@ -119,46 +72,7 @@ Dagster automatically sets certain [environment variables](/dagster-plus/deploym

Because we want to configure our assets to write to Snowflake using a different set of credentials and database in each environment, we'll configure a separate I/O manager for each environment:

{/* TODO convert to <CodeExample> */}
```python file=/guides/dagster/development_to_production/branch_deployments/repository_v1.py startafter=start_repository endbefore=end_repository
# definitions.py
from dagster import Definitions

from ..assets import comments, items, stories

snowflake_config = {
"account": "abc1234.us-east-1",
"user": "[email protected]",
"password": {"env": "SYSTEM_SNOWFLAKE_PASSWORD"},
"schema": "HACKER_NEWS",
}

resources = {
"branch": {
"snowflake_io_manager": SnowflakePandasIOManager(
**snowflake_config,
database=f"PRODUCTION_CLONE_{os.getenv('DAGSTER_CLOUD_PULL_REQUEST_ID')}",
),
},
"prod": {
"snowflake_io_manager": SnowflakePandasIOManager(
**snowflake_config,
database="PRODUCTION",
),
},
}


def get_current_env():
is_branch_depl = os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT") == "1"
assert is_branch_depl is not None # env var must be set
return "branch" if is_branch_depl else "prod"


defs = Definitions(
assets=[items, comments, stories], resources=resources[get_current_env()]
)
```
<CodeExample path="docs_snippets/docs_snippets/guides/dagster/development_to_production/branch_deployments/repository_v1.py" startAfter="start_repository" endBefore="end_repository" />

Refer to the [Dagster+ environment variables documentation](/dagster-plus/deployment/management/environment-variables/) for more info about available environment variables.

Expand All @@ -177,89 +91,17 @@ these tasks, like viewing them in the Global Asset Graph.

:::

{/* TODO convert to <CodeExample> */}
```python file=/guides/dagster/development_to_production/branch_deployments/clone_and_drop_db.py startafter=start_clone_db endbefore=end_clone_db
from dagster_snowflake import SnowflakeResource

from dagster import In, Nothing, graph, op


@op
def drop_database_clone(snowflake: SnowflakeResource):
with snowflake.get_connection() as conn:
cur = conn.cursor()
cur.execute(
"DROP DATABASE IF EXISTS"
f" PRODUCTION_CLONE_{os.environ['DAGSTER_CLOUD_PULL_REQUEST_ID']}"
)


@op(ins={"start": In(Nothing)})
def clone_production_database(snowflake: SnowflakeResource):
with snowflake.get_connection() as conn:
cur = conn.cursor()
cur.execute(
"CREATE DATABASE"
f" PRODUCTION_CLONE_{os.environ['DAGSTER_CLOUD_PULL_REQUEST_ID']} CLONE"
' "PRODUCTION"'
)


@graph
def clone_prod():
clone_production_database(start=drop_database_clone())


@graph
def drop_prod_clone():
drop_database_clone()
```
<CodeExample path="docs_snippets/docs_snippets/guides/dagster/development_to_production/branch_deployments/clone_and_drop_db.py" startAfter="start_clone_db" endBefore="end_clone_db" />

We've defined `drop_database_clone` and `clone_production_database` to utilize the <PyObject section="libraries" object="SnowflakeResource" module="dagster_snowflake" />. The Snowflake resource will use the same configuration as the Snowflake I/O manager to generate a connection to Snowflake. However, while our I/O manager writes outputs to Snowflake, the Snowflake resource executes queries against Snowflake.

We now need to define resources that configure our jobs to the current environment. We can modify the resource mapping by environment as follows:

{/* TODO convert to <CodeExample> */}
```python file=/guides/dagster/development_to_production/branch_deployments/repository_v2.py startafter=start_resources endbefore=end_resources
resources = {
"branch": {
"snowflake_io_manager": SnowflakePandasIOManager(
**snowflake_config,
database=f"PRODUCTION_CLONE_{os.getenv('DAGSTER_CLOUD_PULL_REQUEST_ID')}",
),
"snowflake": SnowflakeResource(
**snowflake_config,
database=f"PRODUCTION_CLONE_{os.getenv('DAGSTER_CLOUD_PULL_REQUEST_ID')}",
),
},
"prod": {
"snowflake_io_manager": SnowflakePandasIOManager(
**snowflake_config,
database="PRODUCTION",
),
"snowflake": SnowflakeResource(**snowflake_config, database="PRODUCTION"),
},
}
```
<CodeExample path="docs_snippets/docs_snippets/guides/dagster/development_to_production/branch_deployments/repository_v2.py" startAfter="start_resources" endBefore="end_resources" />

Then, we can add the `clone_prod` and `drop_prod_clone` jobs that now use the appropriate resource to the environment and add them to our definitions:

{/* TODO convert to <CodeExample> */}
```python file=/guides/dagster/development_to_production/branch_deployments/repository_v2.py startafter=start_repository endbefore=end_repository
branch_deployment_jobs = [
clone_prod.to_job(),
drop_prod_clone.to_job(),
]
defs = Definitions(
assets=[items, comments, stories],
resources=resources[get_current_env()],
jobs=(
branch_deployment_jobs
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT") == "1"
else []
),
)
```
<CodeExample path="docs_snippets/docs_snippets/guides/dagster/development_to_production/branch_deployments/repository_v2.py" startAfter="start_repository" endBefore="end_repository" />

## Step 4: Create our database clone upon opening a branch

Expand All @@ -268,37 +110,7 @@ defs = Definitions(

The `branch_deployments.yml` file located in `.github/workflows/branch_deployments.yml` defines a `dagster_cloud_build_push` job with a series of steps that launch a branch deployment. Because we want to queue a run of `clone_prod` within each deployment after it launches, we'll add an additional step at the end `dagster_cloud_build_push`. This job is triggered on multiple pull request events: `opened`, `synchronize`, `reopen`, and `closed`. This means that upon future pushes to the branch, we'll trigger a run of `clone_prod`. The `if` condition below ensures that `clone_prod` will not run if the pull request is closed:

{/* TODO convert to <CodeExample> */}
```yaml file=/guides/dagster/development_to_production/branch_deployments/clone_prod.yaml
# .github/workflows/branch_deployments.yml

name: Dagster Branch Deployments
on:
pull_request:
types: [opened, synchronize, reopened, closed]
env:
DAGSTER_CLOUD_URL: ${{ secrets.DAGSTER_CLOUD_URL }}

jobs:
dagster_cloud_build_push:
runs-on: ubuntu-latest
name: Dagster Branch Deployments
strategy:
...
steps:
# Existing steps here
...
- name: Clone Snowflake schema upon launch
if: github.event.action != 'closed'
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected]
with:
location_name: ${{ matrix.location.name }}
deployment: ${{ steps.deploy.outputs.deployment }}
job_name: clone_prod
env:
DAGSTER_CLOUD_URL: ${{ secrets.DAGSTER_CLOUD_URL }}
DAGSTER_CLOUD_API_TOKEN: ${{ secrets.DAGSTER_CLOUD_API_TOKEN }}
```
<CodeExample path="docs_snippets/docs_snippets/guides/dagster/development_to_production/branch_deployments/clone_prod.yaml" />

Opening a pull request for our current branch will automatically kick off a branch deployment. After the deployment launches, we can confirm that the `clone_prod` job has run:

Expand All @@ -315,53 +127,7 @@ We can also view our database in Snowflake to confirm that a clone exists for ea

The `.gitlab-ci.yaml` script contains a `deploy` job that defines a series of steps that launch a branch deployment. Because we want to queue a run of `clone_prod` within each deployment after it launches, we'll add an additional step at the end of `deploy`. This job is triggered on when a merge request is created or updated. This means that upon future pushes to the branch, we'll trigger a run of `clone_prod`.

```yaml file=/guides/dagster/development_to_production/branch_deployments/clone_prod.gitlab-ci.yml
# .gitlab-ci.yml

stages:
- setup
- build
- deploy

workflow:
rules:
- if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH
- if: $CI_PIPELINE_SOURCE == 'merge_request_event'

parse-workspace:
...

build-image:
...

deploy-docker:
...

deploy-docker-branch:
stage: deploy
rules:
- if: $CI_PIPELINE_SOURCE == 'merge_request_event'
dependencies:
- build-image
- parse-workspace
image: ghcr.io/dagster-io/dagster-cloud-action:0.1.23
script:
# Existing steps here
...

# Add a step to launch the job cloning the prod db
- dagster-plus job launch
--url "$DAGSTER_CLOUD_URL/$DEPLOYMENT_NAME"
--api-token "$DAGSTER_CLOUD_API_TOKEN"
--location "location_name_containing_clone_prod_job"
--job clone_prod
environment:
name: branch/$CI_COMMIT_REF_NAME
on_stop: close_branch

close_branch:
...
```
<CodeExample path="docs_snippets/docs_snippets/guides/dagster/development_to_production/branch_deployments/clone_prod.gitlab-ci.yml" />

Opening a merge request for our current branch will automatically kick off a branch deployment. After the deployment launches, we can confirm that the `clone_prod` job has run:

Expand All @@ -382,91 +148,14 @@ We can also view our database in Snowflake to confirm that a clone exists for ea

Finally, we can add a step to our `branch_deployments.yml` file that queues a run of our `drop_prod_clone` job:

{/* TODO convert to <CodeExample> */}
```yaml file=/guides/dagster/development_to_production/branch_deployments/drop_db_clone.yaml
# .github/workflows/branch_deployments.yml

name: Dagster Branch Deployments
on:
pull_request:
types: [opened, synchronize, reopened, closed]
env:
DAGSTER_CLOUD_URL: ${{ secrets.DAGSTER_CLOUD_URL }}

jobs:
dagster_cloud_build_push:
runs-on: ubuntu-latest
name: Dagster Branch Deployments
strategy:
...
steps:
# Existing steps here
...
- name: Clone Snowflake schema upon launch
...
- name: Delete schema clone upon PR close
if: github.event.action == 'closed'
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected]
with:
location_name: ${{ matrix.location.name }}
deployment: ${{ steps.deploy.outputs.deployment }}
job_name: drop_prod_clone
env:
DAGSTER_CLOUD_URL: ${{ secrets.DAGSTER_CLOUD_URL }}
DAGSTER_CLOUD_API_TOKEN: ${{ secrets.DAGSTER_CLOUD_API_TOKEN }}
```
<CodeExample path="docs_snippets/docs_snippets/guides/dagster/development_to_production/branch_deployments/drop_db_clone.yaml" />

</TabItem>
<TabItem value="Using Gitlab CI/CD">

Finally, we can add a step to our `.gitlab-ci.yml` file that queues a run of our `drop_prod_clone` job:

{/* TODO convert to <CodeExample> */}
```yaml file=/guides/dagster/development_to_production/branch_deployments/drop_db_clone.gitlab-ci.yml
# .gitlab-ci.yml

stages:
- setup
- build
- deploy

workflow:
rules:
- if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH
- if: $CI_PIPELINE_SOURCE == 'merge_request_event'

parse-workspace:
...

build-image:
...

deploy-docker:
...

deploy-docker-branch:
...

close_branch:
stage: deploy
image: ghcr.io/dagster-io/dagster-cloud-action:0.1.23
when: manual
only:
- merge_requests
script:
# Existing steps here
...

# Add a step to launch the job dropping the cloned db
- dagster-plus job launch
--url "$DAGSTER_CLOUD_URL/$DEPLOYMENT_NAME"
--api-token "$DAGSTER_CLOUD_API_TOKEN"
--location "location_name_containing_drop_prod_clone_job"
--job drop_prod_clone
environment:
name: branch/$CI_COMMIT_REF_NAME
action: stop
```
<CodeExample path="docs_snippets/docs_snippets/guides/dagster/development_to_production/branch_deployments/drop_db_clone.gitlab-ci.yml" />

</TabItem>
</Tabs>
Expand Down
Loading
Loading