Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[4/n] [RFC] add launch multiple runs backend functionality #25880

Open
wants to merge 1 commit into
base: dliu27/add-manual-tick-to-automation-rows
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 53 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 60 additions & 0 deletions python_modules/dagster-graphql/dagster_graphql/client/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,66 @@
)


LAUNCH_MULTIPLE_RUNS_MUTATION = (
ERROR_FRAGMENT
+ """
mutation($executionParamsList: [ExecutionParams!]!) {
launchMultipleRuns(executionParamsList: $executionParamsList) {
__typename
launchMultipleRunsResult {
__typename
... on InvalidStepError {
invalidStepKey
}
... on InvalidOutputError {
stepKey
invalidOutputName
}
... on LaunchRunSuccess {
run {
runId
pipeline {
name
}
tags {
key
value
}
status
runConfigYaml
mode
resolvedOpSelection
}
}
... on ConflictingExecutionParamsError {
message
}
... on PresetNotFoundError {
preset
message
}
... on RunConfigValidationInvalid {
pipelineName
errors {
__typename
message
path
reason
}
}
... on PipelineNotFoundError {
message
pipelineName
}
... on PythonError {
...errorFragment
}
}
}
}
"""
)

LAUNCH_PIPELINE_REEXECUTION_MUTATION = (
ERROR_FRAGMENT
+ """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def types():
GrapheneDeletePipelineRunSuccess,
GrapheneDeleteRunMutation,
GrapheneLaunchBackfillMutation,
GrapheneLaunchMultipleRunsMutation,
GrapheneLaunchRunMutation,
GrapheneLaunchRunReexecutionMutation,
GrapheneReloadRepositoryLocationMutation,
Expand Down Expand Up @@ -38,6 +39,7 @@ def types():
GrapheneExecutionPlanOrError,
GrapheneLaunchBackfillMutation,
GrapheneLaunchRunMutation,
GrapheneLaunchMultipleRunsMutation,
GrapheneLaunchRunReexecutionMutation,
GraphenePipelineOrError,
GrapheneReloadRepositoryLocationMutation,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional, Sequence, Union
from typing import List, Optional, Sequence, Union

import dagster._check as check
import graphene
Expand Down Expand Up @@ -78,6 +78,7 @@
)
from dagster_graphql.schema.pipelines.pipeline import GrapheneRun
from dagster_graphql.schema.runs import (
GrapheneLaunchMultipleRunsResult,
GrapheneLaunchRunReexecutionResult,
GrapheneLaunchRunResult,
GrapheneLaunchRunSuccess,
Expand Down Expand Up @@ -316,6 +317,35 @@ def mutate(
return create_execution_params_and_launch_pipeline_exec(graphene_info, executionParams)


class GrapheneLaunchMultipleRunsMutation(graphene.Mutation):
"""Launches multiple job runs."""

Output = graphene.NonNull(GrapheneLaunchMultipleRunsResult)

class Arguments:
executionParamsList = non_null_list(GrapheneExecutionParams)

class Meta:
name = "LaunchMultipleRunsMutation"

@capture_error
@require_permission_check(Permissions.LAUNCH_PIPELINE_EXECUTION)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This permission check here can make the whole mutation fail, and in this case the output would be a single GrapheneUnauthorizedError so I believe your output should be a union of GrapheneLaunchMultipleRunsResult and GrapheneUnauthorizedError and also GraphenePythonError since we're using @capture_error.

def mutate(
self, graphene_info: ResolveInfo, executionParamsList: List[GrapheneExecutionParams]
):
launch_multiple_runs_result = []

for execution_params in executionParamsList:
result = GrapheneLaunchRunMutation.mutate(
salazarm marked this conversation as resolved.
Show resolved Hide resolved
None, graphene_info, executionParams=execution_params
)
launch_multiple_runs_result.append(result)

return GrapheneLaunchMultipleRunsResult(
launchMultipleRunsResult=launch_multiple_runs_result
)


class GrapheneLaunchBackfillMutation(graphene.Mutation):
"""Launches a set of partition backfill runs."""

Expand Down Expand Up @@ -984,6 +1014,7 @@ class Meta:

launchPipelineExecution = GrapheneLaunchRunMutation.Field()
launchRun = GrapheneLaunchRunMutation.Field()
launchMultipleRuns = GrapheneLaunchMultipleRunsMutation.Field()
launchPipelineReexecution = GrapheneLaunchRunReexecutionMutation.Field()
launchRunReexecution = GrapheneLaunchRunReexecutionMutation.Field()
startSchedule = GrapheneStartScheduleMutation.Field()
Expand Down
15 changes: 11 additions & 4 deletions python_modules/dagster-graphql/dagster_graphql/schema/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from dagster_graphql.implementation.fetch_runs import get_run_ids, get_runs, get_runs_count
from dagster_graphql.implementation.utils import UserFacingGraphQLError
from dagster_graphql.schema.backfill import pipeline_execution_error_types
from dagster_graphql.schema.errors import (
GrapheneInvalidPipelineRunsFilterError,
GraphenePythonError,
Expand Down Expand Up @@ -73,17 +74,22 @@ class Meta:

class GrapheneLaunchRunResult(graphene.Union):
class Meta:
from dagster_graphql.schema.backfill import pipeline_execution_error_types
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was this removed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and why was it there in the first place 🤔


types = launch_pipeline_run_result_types + pipeline_execution_error_types

name = "LaunchRunResult"


class GrapheneLaunchRunReexecutionResult(graphene.Union):
class GrapheneLaunchMultipleRunsResult(graphene.ObjectType):
"""Contains results from multiple pipeline launches."""

launchMultipleRunsResult = non_null_list(GrapheneLaunchRunResult)

class Meta:
from dagster_graphql.schema.backfill import pipeline_execution_error_types
name = "LaunchMultipleRunsResult"


class GrapheneLaunchRunReexecutionResult(graphene.Union):
class Meta:
types = launch_pipeline_run_result_types + pipeline_execution_error_types

name = "LaunchRunReexecutionResult"
Expand Down Expand Up @@ -213,6 +219,7 @@ def parse_run_config_input(

types = [
GrapheneLaunchRunResult,
GrapheneLaunchMultipleRunsResult,
GrapheneLaunchRunReexecutionResult,
GrapheneLaunchPipelineRunSuccess,
GrapheneLaunchRunSuccess,
Expand Down
Loading