diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index fa3195c4cab9b..86ffe88c49e58 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -1511,6 +1511,10 @@ type LaunchRunMutation { Output: LaunchRunResult! } +type LaunchMultipleRunsMutation { + Output: LaunchMultipleRunsResult! +} + type LaunchRunReexecutionMutation { Output: LaunchRunReexecutionResult! } @@ -2980,6 +2984,10 @@ union LaunchRunResult = | ConflictingExecutionParamsError | NoModeProvidedError +type LaunchMultipleRunsResult { + launchMultipleRunsResult: [LaunchRunResult!]! +} + union LaunchRunReexecutionResult = | LaunchRunSuccess | InvalidStepError @@ -3691,6 +3699,7 @@ type AutomationConditionEvaluationNode { type Mutation { launchPipelineExecution(executionParams: ExecutionParams!): LaunchRunResult! launchRun(executionParams: ExecutionParams!): LaunchRunResult! + launchMultipleRuns(executionParamsList: [ExecutionParams!]!): LaunchMultipleRunsResult! launchPipelineReexecution( executionParams: ExecutionParams reexecutionParams: ReexecutionParams diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index 36db38413ce80..b1ae65cd18139 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -2206,6 +2206,16 @@ export type LaunchBackfillSuccess = { launchedRunIds: Maybe>>; }; +export type LaunchMultipleRunsMutation = { + __typename: 'LaunchMultipleRunsMutation'; + Output: LaunchMultipleRunsResult; +}; + +export type LaunchMultipleRunsResult = { + __typename: 'LaunchMultipleRunsResult'; + launchMultipleRunsResult: Array; +}; + export type LaunchPipelineRunSuccess = { run: Run; }; @@ -2614,6 +2624,7 @@ export type Mutation = { deleteRun: DeletePipelineRunResult; freeConcurrencySlots: Scalars['Boolean']['output']; freeConcurrencySlotsForRun: Scalars['Boolean']['output']; + launchMultipleRuns: LaunchMultipleRunsResult; launchPartitionBackfill: LaunchBackfillResult; launchPipelineExecution: LaunchRunResult; launchPipelineReexecution: LaunchRunReexecutionResult; @@ -2681,6 +2692,10 @@ export type MutationFreeConcurrencySlotsForRunArgs = { runId: Scalars['String']['input']; }; +export type MutationLaunchMultipleRunsArgs = { + executionParamsList: Array; +}; + export type MutationLaunchPartitionBackfillArgs = { backfillParams: LaunchBackfillParams; }; @@ -9422,6 +9437,38 @@ export const buildLaunchBackfillSuccess = ( }; }; +export const buildLaunchMultipleRunsMutation = ( + overrides?: Partial, + _relationshipsToOmit: Set = new Set(), +): {__typename: 'LaunchMultipleRunsMutation'} & LaunchMultipleRunsMutation => { + const relationshipsToOmit: Set = new Set(_relationshipsToOmit); + relationshipsToOmit.add('LaunchMultipleRunsMutation'); + return { + __typename: 'LaunchMultipleRunsMutation', + Output: + overrides && overrides.hasOwnProperty('Output') + ? overrides.Output! + : relationshipsToOmit.has('LaunchMultipleRunsResult') + ? ({} as LaunchMultipleRunsResult) + : buildLaunchMultipleRunsResult({}, relationshipsToOmit), + }; +}; + +export const buildLaunchMultipleRunsResult = ( + overrides?: Partial, + _relationshipsToOmit: Set = new Set(), +): {__typename: 'LaunchMultipleRunsResult'} & LaunchMultipleRunsResult => { + const relationshipsToOmit: Set = new Set(_relationshipsToOmit); + relationshipsToOmit.add('LaunchMultipleRunsResult'); + return { + __typename: 'LaunchMultipleRunsResult', + launchMultipleRunsResult: + overrides && overrides.hasOwnProperty('launchMultipleRunsResult') + ? overrides.launchMultipleRunsResult! + : [], + }; +}; + export const buildLaunchPipelineRunSuccess = ( overrides?: Partial, _relationshipsToOmit: Set = new Set(), @@ -10203,6 +10250,12 @@ export const buildMutation = ( overrides && overrides.hasOwnProperty('freeConcurrencySlotsForRun') ? overrides.freeConcurrencySlotsForRun! : false, + launchMultipleRuns: + overrides && overrides.hasOwnProperty('launchMultipleRuns') + ? overrides.launchMultipleRuns! + : relationshipsToOmit.has('LaunchMultipleRunsResult') + ? ({} as LaunchMultipleRunsResult) + : buildLaunchMultipleRunsResult({}, relationshipsToOmit), launchPartitionBackfill: overrides && overrides.hasOwnProperty('launchPartitionBackfill') ? overrides.launchPartitionBackfill! diff --git a/python_modules/dagster-graphql/dagster_graphql/client/query.py b/python_modules/dagster-graphql/dagster_graphql/client/query.py index 579b4e6e7a046..68e05dc008c02 100644 --- a/python_modules/dagster-graphql/dagster_graphql/client/query.py +++ b/python_modules/dagster-graphql/dagster_graphql/client/query.py @@ -330,6 +330,66 @@ ) +LAUNCH_MULTIPLE_RUNS_MUTATION = ( + ERROR_FRAGMENT + + """ +mutation($executionParamsList: [ExecutionParams!]!) { + launchMultipleRuns(executionParamsList: $executionParamsList) { + __typename + launchMultipleRunsResult { + __typename + ... on InvalidStepError { + invalidStepKey + } + ... on InvalidOutputError { + stepKey + invalidOutputName + } + ... on LaunchRunSuccess { + run { + runId + pipeline { + name + } + tags { + key + value + } + status + runConfigYaml + mode + resolvedOpSelection + } + } + ... on ConflictingExecutionParamsError { + message + } + ... on PresetNotFoundError { + preset + message + } + ... on RunConfigValidationInvalid { + pipelineName + errors { + __typename + message + path + reason + } + } + ... on PipelineNotFoundError { + message + pipelineName + } + ... on PythonError { + ...errorFragment + } + } + } +} +""" +) + LAUNCH_PIPELINE_REEXECUTION_MUTATION = ( ERROR_FRAGMENT + """ diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/__init__.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/__init__.py index 6533d2d0b0f32..10f029145bc11 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/__init__.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/__init__.py @@ -10,6 +10,7 @@ def types(): GrapheneDeletePipelineRunSuccess, GrapheneDeleteRunMutation, GrapheneLaunchBackfillMutation, + GrapheneLaunchMultipleRunsMutation, GrapheneLaunchRunMutation, GrapheneLaunchRunReexecutionMutation, GrapheneReloadRepositoryLocationMutation, @@ -38,6 +39,7 @@ def types(): GrapheneExecutionPlanOrError, GrapheneLaunchBackfillMutation, GrapheneLaunchRunMutation, + GrapheneLaunchMultipleRunsMutation, GrapheneLaunchRunReexecutionMutation, GraphenePipelineOrError, GrapheneReloadRepositoryLocationMutation, diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py index 0e98ceda547ee..6da12aeb37488 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py @@ -1,4 +1,4 @@ -from typing import Optional, Sequence, Union +from typing import List, Optional, Sequence, Union import dagster._check as check import graphene @@ -78,6 +78,7 @@ ) from dagster_graphql.schema.pipelines.pipeline import GrapheneRun from dagster_graphql.schema.runs import ( + GrapheneLaunchMultipleRunsResult, GrapheneLaunchRunReexecutionResult, GrapheneLaunchRunResult, GrapheneLaunchRunSuccess, @@ -316,6 +317,35 @@ def mutate( return create_execution_params_and_launch_pipeline_exec(graphene_info, executionParams) +class GrapheneLaunchMultipleRunsMutation(graphene.Mutation): + """Launches multiple job runs.""" + + Output = graphene.NonNull(GrapheneLaunchMultipleRunsResult) + + class Arguments: + executionParamsList = non_null_list(GrapheneExecutionParams) + + class Meta: + name = "LaunchMultipleRunsMutation" + + @capture_error + @require_permission_check(Permissions.LAUNCH_PIPELINE_EXECUTION) + def mutate( + self, graphene_info: ResolveInfo, executionParamsList: List[GrapheneExecutionParams] + ): + launch_multiple_runs_result = [] + + for execution_params in executionParamsList: + result = GrapheneLaunchRunMutation.mutate( + None, graphene_info, executionParams=execution_params + ) + launch_multiple_runs_result.append(result) + + return GrapheneLaunchMultipleRunsResult( + launchMultipleRunsResult=launch_multiple_runs_result + ) + + class GrapheneLaunchBackfillMutation(graphene.Mutation): """Launches a set of partition backfill runs.""" @@ -984,6 +1014,7 @@ class Meta: launchPipelineExecution = GrapheneLaunchRunMutation.Field() launchRun = GrapheneLaunchRunMutation.Field() + launchMultipleRuns = GrapheneLaunchMultipleRunsMutation.Field() launchPipelineReexecution = GrapheneLaunchRunReexecutionMutation.Field() launchRunReexecution = GrapheneLaunchRunReexecutionMutation.Field() startSchedule = GrapheneStartScheduleMutation.Field() diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/runs.py b/python_modules/dagster-graphql/dagster_graphql/schema/runs.py index 9aabe08512373..d1f2271b07697 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/runs.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/runs.py @@ -11,6 +11,7 @@ from dagster_graphql.implementation.fetch_runs import get_run_ids, get_runs, get_runs_count from dagster_graphql.implementation.utils import UserFacingGraphQLError +from dagster_graphql.schema.backfill import pipeline_execution_error_types from dagster_graphql.schema.errors import ( GrapheneInvalidPipelineRunsFilterError, GraphenePythonError, @@ -73,17 +74,22 @@ class Meta: class GrapheneLaunchRunResult(graphene.Union): class Meta: - from dagster_graphql.schema.backfill import pipeline_execution_error_types - types = launch_pipeline_run_result_types + pipeline_execution_error_types name = "LaunchRunResult" -class GrapheneLaunchRunReexecutionResult(graphene.Union): +class GrapheneLaunchMultipleRunsResult(graphene.ObjectType): + """Contains results from multiple pipeline launches.""" + + launchMultipleRunsResult = non_null_list(GrapheneLaunchRunResult) + class Meta: - from dagster_graphql.schema.backfill import pipeline_execution_error_types + name = "LaunchMultipleRunsResult" + +class GrapheneLaunchRunReexecutionResult(graphene.Union): + class Meta: types = launch_pipeline_run_result_types + pipeline_execution_error_types name = "LaunchRunReexecutionResult" @@ -213,6 +219,7 @@ def parse_run_config_input( types = [ GrapheneLaunchRunResult, + GrapheneLaunchMultipleRunsResult, GrapheneLaunchRunReexecutionResult, GrapheneLaunchPipelineRunSuccess, GrapheneLaunchRunSuccess, diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_run_launcher.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_run_launcher.py index f33fc6139d7e7..bc484d1031053 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_run_launcher.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_run_launcher.py @@ -1,9 +1,13 @@ from typing import Any +from unittest.mock import patch from dagster._core.test_utils import wait_for_runs_to_finish from dagster._core.workspace.context import WorkspaceRequestContext -from dagster_graphql.client.query import LAUNCH_PIPELINE_EXECUTION_MUTATION -from dagster_graphql.test.utils import execute_dagster_graphql, infer_job_selector +from dagster_graphql.client.query import ( + LAUNCH_MULTIPLE_RUNS_MUTATION, + LAUNCH_PIPELINE_EXECUTION_MUTATION, +) +from dagster_graphql.test.utils import GqlResult, execute_dagster_graphql, infer_job_selector from dagster_graphql_tests.graphql.graphql_context_test_suite import ( GraphQLContextVariant, @@ -32,6 +36,12 @@ BaseTestSuite: Any = make_graphql_context_test_suite( context_variants=GraphQLContextVariant.all_executing_variants() ) +LaunchFailTestSuite: Any = make_graphql_context_test_suite( + context_variants=GraphQLContextVariant.all_non_launchable_variants() +) +ReadOnlyTestSuite: Any = make_graphql_context_test_suite( + context_variants=GraphQLContextVariant.all_readonly_variants() +) class TestBasicLaunch(BaseTestSuite): @@ -83,10 +93,127 @@ def test_run_launcher_subset(self, graphql_context: WorkspaceRequestContext): assert result.data["pipelineRunOrError"]["status"] == "SUCCESS" assert result.data["pipelineRunOrError"]["stats"]["stepsSucceeded"] == 1 + def test_run_launcher_unauthorized(self, graphql_context: WorkspaceRequestContext): + selector = infer_job_selector(graphql_context, "no_config_job") -LaunchFailTestSuite: Any = make_graphql_context_test_suite( - context_variants=GraphQLContextVariant.all_non_launchable_variants() -) + with patch.object(graphql_context, "has_permission_for_location", return_value=False): + with patch.object(graphql_context, "was_permission_checked", return_value=True): + result = execute_dagster_graphql( + context=graphql_context, + query=LAUNCH_PIPELINE_EXECUTION_MUTATION, + variables={"executionParams": {"selector": selector, "mode": "default"}}, + ) + assert result.data["launchPipelineExecution"]["__typename"] == "UnauthorizedError" + + +class TestMultipleLaunch(BaseTestSuite): + def test_multiple_run_launcher_same_job(self, graphql_context: WorkspaceRequestContext): + selector = infer_job_selector(graphql_context, "no_config_job") + + # test with multiple of the same job + executionParamsList = [ + {"selector": selector, "mode": "default"}, + {"selector": selector, "mode": "default"}, + {"selector": selector, "mode": "default"}, + ] + + result = execute_dagster_graphql( + context=graphql_context, + query=LAUNCH_MULTIPLE_RUNS_MUTATION, + variables={"executionParamsList": executionParamsList}, + ) + + assert "launchMultipleRuns" in result.data + launches = result.data["launchMultipleRuns"] + + assert launches["__typename"] == "LaunchMultipleRunsResult" + assert "launchMultipleRunsResult" in launches + results = launches["launchMultipleRunsResult"] + + run_ids = [] + + for result in results: + assert result["__typename"] == "LaunchRunSuccess" + run_ids.append(result["run"]["runId"]) + + wait_for_runs_to_finish(graphql_context.instance) + + for run_id in run_ids: + result = execute_dagster_graphql( + context=graphql_context, query=RUN_QUERY, variables={"runId": run_id} + ) + assert result.data["pipelineRunOrError"]["__typename"] == "Run" + assert result.data["pipelineRunOrError"]["status"] == "SUCCESS" + + def test_multiple_run_launcher_multiple_jobs(self, graphql_context: WorkspaceRequestContext): + selectors = [ + infer_job_selector(graphql_context, "no_config_job"), + infer_job_selector(graphql_context, "more_complicated_config", ["noop_op"]), + ] + + # test with multiple of the same job + executionParamsList = [ + {"selector": selectors[0], "mode": "default"}, + {"selector": selectors[1], "mode": "default"}, + {"selector": selectors[0], "mode": "default"}, + {"selector": selectors[1], "mode": "default"}, + ] + + result = execute_dagster_graphql( + context=graphql_context, + query=LAUNCH_MULTIPLE_RUNS_MUTATION, + variables={"executionParamsList": executionParamsList}, + ) + + assert "launchMultipleRuns" in result.data + launches = result.data["launchMultipleRuns"] + + assert launches["__typename"] == "LaunchMultipleRunsResult" + assert "launchMultipleRunsResult" in launches + results = launches["launchMultipleRunsResult"] + + run_ids = [] + + for result in results: + assert result["__typename"] == "LaunchRunSuccess" + run_ids.append(result["run"]["runId"]) + + wait_for_runs_to_finish(graphql_context.instance) + + for run_id in run_ids: + result = execute_dagster_graphql( + context=graphql_context, query=RUN_QUERY, variables={"runId": run_id} + ) + assert result.data["pipelineRunOrError"]["__typename"] == "Run" + assert result.data["pipelineRunOrError"]["status"] == "SUCCESS" + if result.data["pipelineRunOrError"].get("stats"): + assert result.data["pipelineRunOrError"]["stats"]["stepsSucceeded"] == 1 + + def test_multiple_launch_failure_unauthorized(self, graphql_context: WorkspaceRequestContext): + executionParamsList = [ + {"selector": infer_job_selector(graphql_context, "no_config_job"), "mode": "default"}, + {"selector": infer_job_selector(graphql_context, "no_config_job"), "mode": "default"}, + ] + + # mock no permissions + with patch.object(graphql_context, "has_permission_for_location", return_value=False): + with patch.object(graphql_context, "was_permission_checked", return_value=True): + result = execute_dagster_graphql( + context=graphql_context, + query=LAUNCH_MULTIPLE_RUNS_MUTATION, + variables={"executionParamsList": executionParamsList}, + ) + + assert "launchMultipleRuns" in result.data + result_data = result.data["launchMultipleRuns"] + + assert "launchMultipleRunsResult" in result_data + results = result_data["launchMultipleRunsResult"] + + assert len(results) == 2 + + for result in results: + assert result["__typename"] == "UnauthorizedError" class TestFailedLaunch(LaunchFailTestSuite): @@ -97,6 +224,7 @@ def test_launch_failure(self, graphql_context: WorkspaceRequestContext): query=LAUNCH_PIPELINE_EXECUTION_MUTATION, variables={"executionParams": {"selector": selector, "mode": "default"}}, ) + assert result.data["launchPipelineExecution"]["__typename"] != "LaunchRunSuccess" # fetch the most recent run, which should be this one that just failed to launch @@ -105,7 +233,141 @@ def test_launch_failure(self, graphql_context: WorkspaceRequestContext): result = execute_dagster_graphql( context=graphql_context, query=RUN_QUERY, variables={"runId": run.run_id} ) + assert result.data["pipelineRunOrError"]["__typename"] == "Run" assert result.data["pipelineRunOrError"]["status"] == "FAILURE" assert result.data["pipelineRunOrError"]["startTime"] assert result.data["pipelineRunOrError"]["endTime"] + + +class TestFailedMultipleLaunch(LaunchFailTestSuite): + def test_multiple_launch_failure(self, graphql_context: WorkspaceRequestContext): + executionParamsList = [ + {"selector": infer_job_selector(graphql_context, "no_config_job"), "mode": "default"}, + {"selector": infer_job_selector(graphql_context, "no_config_job"), "mode": "default"}, + ] + + result = execute_dagster_graphql( + context=graphql_context, + query=LAUNCH_MULTIPLE_RUNS_MUTATION, + variables={"executionParamsList": executionParamsList}, + ) + + assert "launchMultipleRuns" in result.data + result_data = result.data["launchMultipleRuns"] + + assert result_data["__typename"] == "LaunchMultipleRunsResult" + results = result_data["launchMultipleRunsResult"] + + assert len(results) == 2 + + for run_result in results: + assert run_result["__typename"] == "PythonError" + assert run_result["message"].startswith( + "NotImplementedError: The entire purpose of this is to throw on launch" + ) + assert run_result["className"] == "NotImplementedError" + + +class TestFailedMultipleLaunchReadOnly(ReadOnlyTestSuite): + def test_multiple_launch_failure_readonly(self, graphql_context: WorkspaceRequestContext): + executionParamsList = [ + {"selector": infer_job_selector(graphql_context, "no_config_job"), "mode": "default"}, + {"selector": infer_job_selector(graphql_context, "no_config_job"), "mode": "default"}, + ] + + result = execute_dagster_graphql( + context=graphql_context, + query=LAUNCH_MULTIPLE_RUNS_MUTATION, + variables={"executionParamsList": executionParamsList}, + ) + + assert "launchMultipleRuns" in result.data + result_data = result.data["launchMultipleRuns"] + + assert result_data["__typename"] == "LaunchMultipleRunsResult" + results = result_data["launchMultipleRunsResult"] + + assert len(results) == 2 + + for result in results: + assert result["__typename"] == "UnauthorizedError" + + +class TestSuccessAndFailureMultipleLaunch(BaseTestSuite): + def test_launch_multiple_runs_success_and_failure( + self, graphql_context: WorkspaceRequestContext + ): + launchSuccessExecutionParams = [ + { + "selector": { + "repositoryLocationName": "test", + "repositoryName": "test_repo", + "pipelineName": "no_config_job", + "solidSelection": None, + "assetSelection": None, + "assetCheckSelection": None, + }, + "mode": "default", + }, + { + "selector": { + "repositoryLocationName": "test", + "repositoryName": "test_repo", + "pipelineName": "no_config_job", + "solidSelection": None, + "assetSelection": None, + "assetCheckSelection": None, + }, + "mode": "default", + }, + ] + + pipelineNotFoundExecutionParams = [ + { + "selector": { + "repositoryLocationName": "test", + "repositoryName": "test_dict_repo", + "pipelineName": "no_config_job", + "solidSelection": None, + "assetSelection": None, + "assetCheckSelection": None, + }, + "mode": "default", + }, + { + "selector": { + "repositoryLocationName": "test", + "repositoryName": "test_dict_repo", + "pipelineName": "no_config_job", + "solidSelection": None, + "assetSelection": None, + "assetCheckSelection": None, + }, + "mode": "default", + }, + ] + + executionParamsList = [executionParams for executionParams in launchSuccessExecutionParams] + executionParamsList.extend( + [executionParams for executionParams in pipelineNotFoundExecutionParams] + ) + + result: GqlResult = execute_dagster_graphql( + context=graphql_context, + query=LAUNCH_MULTIPLE_RUNS_MUTATION, + variables={"executionParamsList": executionParamsList}, + ) + + assert "launchMultipleRuns" in result.data + result_data = result.data["launchMultipleRuns"] + + assert result_data["__typename"] == "LaunchMultipleRunsResult" + results = result_data["launchMultipleRunsResult"] + + assert len(results) == 4 + + assert results[0]["__typename"] == "LaunchRunSuccess" + assert results[1]["__typename"] == "LaunchRunSuccess" + assert results[2]["__typename"] == "PipelineNotFoundError" + assert results[3]["__typename"] == "PipelineNotFoundError"