Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backfill daemon run retries 3/n] retries of runs in completed backfills should not be considered part of the backfill #25900

Draft
wants to merge 3 commits into
base: jamie/backfill-daemon-accounts-for-retries
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from dagster_graphql.client.query import (
LAUNCH_PARTITION_BACKFILL_MUTATION,
LAUNCH_PIPELINE_EXECUTION_MUTATION,
LAUNCH_PIPELINE_REEXECUTION_MUTATION,
)
from dagster_graphql.test.utils import (
execute_dagster_graphql,
Expand Down Expand Up @@ -2302,3 +2303,63 @@ def test_retry_successful_job_backfill(self, graphql_context):

assert retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == backfill_id
assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id

def test_run_retry_not_part_of_completed_backfill(self, graphql_context):
# TestLaunchDaemonBackfillFromFailure::test_run_retry_not_part_of_completed_backfill
repository_selector = infer_repository_selector(graphql_context)
result = execute_dagster_graphql(
graphql_context,
LAUNCH_PARTITION_BACKFILL_MUTATION,
variables={
"backfillParams": {
"selector": {
"repositorySelector": repository_selector,
"partitionSetName": "integers_partition_set",
},
"partitionNames": ["2", "3", "4", "5"],
}
},
)

assert not result.errors
assert result.data
assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess"
backfill_id = result.data["launchPartitionBackfill"]["backfillId"]

_seed_runs(
graphql_context,
[
(DagsterRunStatus.SUCCESS, "5"),
(DagsterRunStatus.SUCCESS, "2"),
(DagsterRunStatus.SUCCESS, "3"),
(DagsterRunStatus.SUCCESS, "4"),
(DagsterRunStatus.SUCCESS, "5"),
(DagsterRunStatus.SUCCESS, "2"),
(DagsterRunStatus.FAILURE, "3"),
(DagsterRunStatus.SUCCESS, "4"),
],
backfill_id,
)

backfill = graphql_context.instance.get_backfill(backfill_id)
graphql_context.instance.update_backfill(
backfill.with_status(BulkActionStatus.COMPLETED_SUCCESS)
)

failed_run = graphql_context.instance.get_runs(
filters=RunsFilter(statuses=[DagsterRunStatus.FAILURE])
)[0]

retry_run_result = execute_dagster_graphql(
graphql_context,
LAUNCH_PIPELINE_REEXECUTION_MUTATION,
variables={
"reexecutionParams": {"parentRunId": failed_run.run_id, "strategy": "ALL_STEPS"}
},
)
assert not retry_run_result.errors
assert retry_run_result.data
assert (
retry_run_result.data["launchPipelineReexecution"]["__typename"]
== "LaunchPipelineReexecutionSuccess"
)
28 changes: 23 additions & 5 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,13 @@
from dagster._core.storage.tags import (
ASSET_PARTITION_RANGE_END_TAG,
ASSET_PARTITION_RANGE_START_TAG,
BACKFILL_ID_TAG,
BACKFILL_TAGS,
PARENT_RUN_ID_TAG,
PARTITION_NAME_TAG,
RESUME_RETRY_TAG,
ROOT_RUN_ID_TAG,
TAGS_TO_MAYBE_OMIT_ON_RETRY,
TAGS_TO_OMIT_ON_RETRY,
)
from dagster._serdes import ConfigurableClass
Expand Down Expand Up @@ -1627,6 +1630,7 @@ def create_reexecuted_run(
run_config: Optional[Mapping[str, Any]] = None,
use_parent_run_tags: bool = False,
) -> DagsterRun:
from dagster._core.execution.backfill import BulkActionStatus
from dagster._core.execution.plan.resume_retry import ReexecutionStrategy
from dagster._core.execution.plan.state import KnownExecutionState
from dagster._core.remote_representation import CodeLocation, RemoteJob
Expand All @@ -1644,11 +1648,25 @@ def create_reexecuted_run(
parent_run_id = parent_run.run_id

# these can differ from remote_job.tags if tags were added at launch time
parent_run_tags = (
{key: val for key, val in parent_run.tags.items() if key not in TAGS_TO_OMIT_ON_RETRY}
if use_parent_run_tags
else {}
)
parent_run_tags = {}
if use_parent_run_tags:
parent_run_tags = {
key: val
for key, val in parent_run.tags.items()
if key not in TAGS_TO_OMIT_ON_RETRY and key not in TAGS_TO_MAYBE_OMIT_ON_RETRY
}
# for all tags in TAGS_TO_MAYBE_OMIT_ON_RETRY, add a condition that determines
# whether the tag should be added to the retried run

# condition for BACKFILL_ID_TAG, PARENT_BACKFILL_ID_TAG, ROOT_BACKFILL_ID_TAG
if parent_run.tags.get(BACKFILL_ID_TAG) is not None:
# if the run was part of a backfill and the backfill is complete, we do not want the
# retry to be considered part of the backfill, so remove all backfill-related tags
backfill = self.get_backfill(parent_run.tags[BACKFILL_ID_TAG])
if backfill and backfill.status == BulkActionStatus.REQUESTED:
for tag in BACKFILL_TAGS:
if parent_run.tags.get(tag) is not None:
parent_run_tags[tag] = parent_run.tags[tag]

tags = merge_dicts(
remote_job.tags,
Expand Down
4 changes: 4 additions & 0 deletions python_modules/dagster/dagster/_core/storage/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,13 @@
RUN_METRICS_POLLING_INTERVAL_TAG = f"{HIDDEN_TAG_PREFIX}run_metrics_polling_interval"
RUN_METRICS_PYTHON_RUNTIME_TAG = f"{HIDDEN_TAG_PREFIX}python_runtime_metrics"

BACKFILL_TAGS = {BACKFILL_ID_TAG, PARENT_BACKFILL_ID_TAG, ROOT_BACKFILL_ID_TAG}


TAGS_TO_OMIT_ON_RETRY = {*RUN_METRIC_TAGS, RUN_FAILURE_REASON_TAG}

TAGS_TO_MAYBE_OMIT_ON_RETRY = {*BACKFILL_TAGS}


class TagType(Enum):
# Custom tag provided by a user
Expand Down
95 changes: 95 additions & 0 deletions python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -3164,3 +3164,98 @@ def test_asset_backfill_retries_make_downstreams_runnable(
backfill.asset_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets
== 0
)


def test_run_retry_not_part_of_completed_backfill(
instance: DagsterInstance,
workspace_context: WorkspaceProcessContext,
remote_repo: RemoteRepository,
):
del remote_repo
backfill_id = "run_retries_backfill"
partition_keys = static_partitions.get_partition_keys()
asset_selection = [AssetKey("foo"), AssetKey("a1"), AssetKey("bar")]
instance.add_backfill(
PartitionBackfill.from_asset_partitions(
asset_graph=workspace_context.create_request_context().asset_graph,
backfill_id=backfill_id,
tags={"custom_tag_key": "custom_tag_value"},
backfill_timestamp=get_current_timestamp(),
asset_selection=asset_selection,
partition_names=partition_keys,
dynamic_partitions_store=instance,
all_partitions=False,
title=None,
description=None,
)
)
assert instance.get_runs_count() == 0
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.REQUESTED

list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
assert instance.get_runs_count() == 3
wait_for_all_runs_to_start(instance, timeout=30)
assert instance.get_runs_count() == 3
wait_for_all_runs_to_finish(instance, timeout=30)

assert instance.get_runs_count() == 3
runs = reversed(list(instance.get_runs()))
for run in runs:
assert run.tags[BACKFILL_ID_TAG] == backfill_id
assert run.tags["custom_tag_key"] == "custom_tag_value"
assert step_succeeded(instance, run, "foo")
assert step_succeeded(instance, run, "reusable")
assert step_succeeded(instance, run, "bar")

list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS

# manual retry of a run
instance.create_reexecuted_run()

# simulate a retry of a run
run_to_retry = instance.get_runs()[0]
retried_run = create_run_for_test(
instance=instance,
job_name=run_to_retry.job_name,
tags=run_to_retry.tags,
root_run_id=run_to_retry.run_id,
parent_run_id=run_to_retry.run_id,
)

# since there is a run in progress, the backfill should not be marked as complete, even though
# all targeted asset partitions have a completed state
list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.asset_backfill_data
assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status()
assert backfill.status == BulkActionStatus.REQUESTED

# manually mark the run as successful to show that the backfill will be marked as complete
# since there are no in progress runs
instance.handle_new_event(
EventLogEntry(
error_info=None,
level="debug",
user_message="",
run_id=retried_run.run_id,
timestamp=time.time(),
dagster_event=DagsterEvent(
event_type_value=DagsterEventType.RUN_SUCCESS.value,
job_name=retried_run.job_name,
),
)
)

retried_run = instance.get_runs(filters=RunsFilter(run_ids=[retried_run.run_id]))[0]
assert retried_run.status == DagsterRunStatus.SUCCESS

list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS