Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update docs to reflect pools concept for concurrency #27171

Merged
merged 4 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 50 additions & 58 deletions docs/docs/guides/operate/managing-concurrency.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,105 +14,97 @@

:::

## Limit how many runs can be in progress at the same time

## Limit the number of total runs that can be in progress at the same time

* Dagster Core, add the following to your [dagster.yaml](/guides/deploy/dagster-yaml)
* In Dagster+, add the following to your [deployment settings](/dagster-plus/deployment/management/deployments/deployment-settings-reference)

```yaml
run_queue:
max_concurrent_runs: 15
concurrency:
runs:
max_concurrent_runs: 15
```

## Limit the number of runs that can be in progress for a set of ops

## Limit how many runs can be in progress by tag
You can assign assets and ops to concurrency pools which allow you to limit the number of in progress runs containing those assets or ops. You first assign your asset or op to a concurrency pool using the `pool` keyword argument.

You can configure a limit for runs that are tagged with a specific tag key or key-value pair.
<CodeExample path="docs_beta_snippets/docs_beta_snippets/guides/operate/concurrency-pool-api.py" language="python" title="Specifying pools on assets and ops" />

```yaml
# dagster.yaml for Dagster Core; Deployment Settings for Dagster+
run_queue:
tag_concurrency_limits:
- key: "dagster/concurrency_key"
value: "database"
limit: 1
```
You should be able to verify that you have set the pool correctly by viewing the details pane for the asset or op in the Dagster UI.

## Limit how many of a certain type of op or asset can run across all runs
![Viewing the pool tag](/images/guides/operate/managing-concurrency/asset-pool-tag.png)

For example, you might want to limit the number of ops or assets that are running with a key of `database` across all runs (to limit the load on that database).
Once you have assigned your assets and ops to a concurrency pool, you can configure a pool limit for that pool in your deployment by using the Dagster UI or the Dagster CLI.

Check failure on line 38 in docs/docs/guides/operate/managing-concurrency.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [Vale.Terms] Use 'cli' instead of 'CLI'. Raw Output: {"message": "[Vale.Terms] Use 'cli' instead of 'CLI'.", "location": {"path": "docs/docs/guides/operate/managing-concurrency.md", "range": {"start": {"line": 38, "column": 170}}}, "severity": "ERROR"}

:::warning
This feature is experimental and is only supported with Postgres/MySQL storage.
:::
To specify a limit for the pool "database" using the UI, navigate to the `Deployments` &rarr; `Concurrency` settings page and click the `Add pool limit` button:

![Setting the pool limit](/images/guides/operate/managing-concurrency/add-pool-ui.png)

To specify a global concurrency limit using the CLI, use:
To specify a limit for the pool "database" using the CLI, use:

Check failure on line 44 in docs/docs/guides/operate/managing-concurrency.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [Vale.Terms] Use 'cli' instead of 'CLI'. Raw Output: {"message": "[Vale.Terms] Use 'cli' instead of 'CLI'.", "location": {"path": "docs/docs/guides/operate/managing-concurrency.md", "range": {"start": {"line": 44, "column": 54}}}, "severity": "ERROR"}

```
dagster instance concurrency set database 1
```

A default concurrency limit can be configured for the instance, for any concurrency keys that don't have an explicit limit set:
### Setting a default limit for concurrency pools

* Dagster+: Edit the `concurrency` config in deployment settings via the [Dagster+ UI](/guides/operate/webserver) or the [`dagster-cloud` CLI](/dagster-plus/deployment/management/dagster-cloud-cli/).
* Dagster Open Source: Use your instance's [dagster.yaml](/guides/deploy/dagster-yaml)

To enable this default value, use `concurrency.default_op_concurrency_limit`. For example, the following would set the default concurrency value for the deployment to 1:
```yaml
concurrency:
default_op_concurrency_limit: 1
pools:
default_limit: 1
```

<Tabs>
<TabItem value="Asset Tag" label="Asset tag concurrency limits">
<CodeExample path="docs_beta_snippets/docs_beta_snippets/guides/tbd/concurrency-tag-key-asset.py" language="python" title="No more than 1 asset running with a tag of 'database' across all runs" />

</TabItem>
<TabItem value="Op Tag" label="Op tag concurrency limits">
<CodeExample path="docs_beta_snippets/docs_beta_snippets/guides/tbd/concurrency-tag-key-op.py" language="python" title="No more than 1 op running with a tag of 'database' across all runs" />
## Limit the number of runs that can be in progress by run tag

</TabItem>
</Tabs>

## Limit how many ops or assets can be executing at the same time for a specific job

You can control the number of assets or ops that are running concurrently within a job using the `config` argument of `dg.define_asset_job()` or `dg.@job()` for ops.

<Tabs>
<TabItem value="Assets" label="Asset job">
<CodeExample path="docs_beta_snippets/docs_beta_snippets/guides/tbd/concurrency-job-asset.py" language="python" title="Asset concurrency limits in a job" />

</TabItem>
You can also limit the number of in progress runs by run tag. This is useful for limiting sets of runs independent of which assets or ops it is executing. For example, you might want to limit the number of in-progress runs for a particular schedule. Or, you might want to limit the number of in-progress runs for all backfills.

<TabItem value="Ops" label="Op job">
<CodeExample path="docs_beta_snippets/docs_beta_snippets/guides/tbd/concurrency-job-op.py" language="python" title="Op concurrency limits in a job" />
```yaml
concurrency:
runs:
tag_concurrency_limits:
- key: "dagster/sensor_name"
value: "my_cool_sensor"
limit: 5
- key: "dagster/backfill"
limit: 10
```

</TabItem>
</Tabs>
### Limit the number of runs that can be in progress by unique tag value

## Limit how many ops or assets can be executing at the same time by tag
To apply separate limits to each unique value of a run tag, set a limit for each unique value using `applyLimitPerUniqueValue`. For example, instead of limiting the number of backfill runs across all backfills, you may want to limit the number of runs for each backfill in progress:

You can also limit concurrency for a tag within the job definition, for example to limit the number of specific assets running at the same time *within* that run.
```yaml
concurrency:
runs:
tag_concurrency_limits:
- key: "dagster/backfill"
value:
applyLimitPerUniqueValue: true
limit: 10
```

<Tabs>
<TabItem value="Asset Tag with Job" label="Asset tag concurrency limits in a run">
<CodeExample path="docs_beta_snippets/docs_beta_snippets/guides/tbd/concurrency-tag-key-job-asset.py" language="python" title="No more than 1 asset running with a tag of 'database' within a run" />
## [Advanced] Limit the number of assets or ops actively in execution across a large set of runs

</TabItem>
<TabItem value="Op Tag with Job" label="Op tag concurrency limits in a run">
<CodeExample path="docs_beta_snippets/docs_beta_snippets/guides/tbd/concurrency-tag-key-job-op.py" language="python" title="No more than 1 op running with a tag of 'database' within a run" />
</TabItem>
</Tabs>
For deployments with complex jobs containing many ops, blocking entire runs for a small number of concurrency-limited ops may be too coarse-grained for your requirements. Instead of enforcing concurrency limits at the run level, Dagster will ensure that the concurrency limit will be applied at the individual asset or op execution level. This means that if one run completes its materialization of a pool's asset, a materialization of another pool asset in a different run may begin even if the first run is still in progress.

You can set the granularity of the concurrency limit enforcement to be at the op level instead of at the run level:

```yaml
concurrency:
pools:
granularity: op
```

## Prevent runs from starting if another run is already occurring (advanced)

You can use Dagster's rich metadata to use a schedule or a sensor to only start a run when there are no currently running jobs.

<CodeExample path="docs_beta_snippets/docs_beta_snippets/guides/tbd/concurrency-no-more-than-1-job.py" language="python" title="No more than 1 running job from a schedule" />
<CodeExample path="docs_beta_snippets/docs_beta_snippets/guides/operate/concurrency-no-more-than-1-job.py" language="python" title="No more than 1 running job from a schedule" />


## Troubleshooting
Expand All @@ -125,7 +117,7 @@
This only applies to Dagster Open Source.
:::

The `run_queue` key may not be set in your instance's settings. In the Dagster UI, navigate to Deployment > Configuration and verify that the `run_queue` key is set.
The `run_queue` key may not be set in your instance's settings. In the Dagster UI, navigate to **Deployment > Configuration** and verify that the `run_queue` key is set.

### Runs remaining in QUEUED status

Expand All @@ -149,7 +141,7 @@
**Troubleshoot the run queue configuration**
If the daemon is running, runs may intentionally be left in the queue due to concurrency rules. To investigate:
* **Check the output logged from the daemon process**, as this will include skipped runs.
* **Check the max_concurrent_runs setting in your instance's dagster.yaml**. If set to 0, this may block the queue. You can check this setting in the Dagster UI by navigating to Deployment > Configuration and locating the run_queue.max_concurrent_runs setting. Refer to the Limiting overall runs section for more info.
* **Check the max_concurrent_runs setting in your instance's dagster.yaml**. If set to 0, this may block the queue. You can check this setting in the Dagster UI by navigating to Deployment > Configuration and locating the concurrency.runs.max_concurrent_runs setting. Refer to the [Limit the number of total runs that can be in progress at the same time](#limit-the-number-of-total-runs-that-can-be-in-progress-at-the-same-time) section for more info.

Check failure on line 144 in docs/docs/guides/operate/managing-concurrency.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [Vale.Spelling] Did you really mean 'max_concurrent_runs'? Raw Output: {"message": "[Vale.Spelling] Did you really mean 'max_concurrent_runs'?", "location": {"path": "docs/docs/guides/operate/managing-concurrency.md", "range": {"start": {"line": 144, "column": 19}}}, "severity": "ERROR"}
* **Check the state of your run queue**. In some cases, the queue may be blocked by some number of in-progress runs. To view the status of your run queue, click **Runs** in the top navigation of the Dagster UI and then open the **Queued** and **In Progress** tabs.

If there are queued or in-progress runs blocking the queue, you can terminate them to allow other runs to proceed.
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import time

import dagster as dg


@dg.asset(pool="foo")
def my_asset():
pass


@dg.op(pool="bar")
def my_op():
pass


@dg.op(pool="barbar")
def my_downstream_op(inp):
return inp


@dg.graph_asset
def my_graph_asset():
return my_downstream_op(my_op())


defs = dg.Definitions(
assets=[my_asset, my_graph_asset],
)

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
run_queue:
max_concurrent_runs: 1
concurrency:
runs:
max_concurrent_runs: 15
Loading