Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid fetching runs in runs feed if backfill filter is set and we are hiding runs within backfills #25904

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ def _bulk_action_filters_from_run_filters(filters: RunsFilter) -> BulkActionsFil
_bulk_action_statuses_from_run_statuses(filters.statuses) if filters.statuses else None
)
backfill_ids = None
if filters.tags and filters.tags.get(BACKFILL_ID_TAG) is not None:
if filters.tags.get(BACKFILL_ID_TAG) is not None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in making the change in this PR i noticed that the filters.tags check is unnecessary since filters.tags is never None, it defaults to {}

backfill_ids = filters.tags[BACKFILL_ID_TAG]
if isinstance(backfill_ids, str):
backfill_ids = [backfill_ids]
Expand Down Expand Up @@ -609,12 +609,18 @@ def get_runs_feed_entries(
else:
backfills = []

runs = [
GrapheneRun(run)
for run in instance.get_run_records(
limit=fetch_limit, cursor=runs_feed_cursor.run_cursor, filters=run_filters
)
]
# if we are not showing runs within backfills and the backfill_id filter is set, we know
# there will be no results, so we can skip fetching runs
should_fetch_runs = not (exclude_subruns and run_filters.tags.get(BACKFILL_ID_TAG) is not None)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when we add the option to only show backfills this condition will become more complicated and probably warrant it's own function

if should_fetch_runs:
runs = [
GrapheneRun(run)
for run in instance.get_run_records(
limit=fetch_limit, cursor=runs_feed_cursor.run_cursor, filters=run_filters
)
]
else:
runs = []

# if we fetched limit+1 of either runs or backfills, we know there must be more results
# to fetch on the next call since we will return limit results for this call. Additionally,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import time
from typing import Mapping, Optional
from unittest import mock

import pytest
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
Expand Down Expand Up @@ -56,6 +57,30 @@
}
"""

MINIMAL_GET_RUNS_FEED_QUERY = """
query RunsFeedEntryQuery($cursor: String, $limit: Int!, $filter: RunsFilter, $includeRunsFromBackfills: Boolean!) {
runsFeedOrError(cursor: $cursor, limit: $limit, filter: $filter, includeRunsFromBackfills: $includeRunsFromBackfills) {
... on RunsFeedConnection {
results {
__typename
id
}
cursor
hasMore
}
... on PythonError {
stack
message
}
}
runsFeedCountOrError(filter: $filter, includeRunsFromBackfills: $includeRunsFromBackfills) {
... on RunsFeedCount {
count
}
}
}
"""

# when runs are inserted into the database, sqlite uses CURRENT_TIMESTAMP to set the creation time.
# CURRENT_TIMESTAMP only has second precision for sqlite, so if we create runs and backfills without any delay
# the resulting list is a chunk of runs and then a chunk of backfills when ordered by time. Adding a small
Expand Down Expand Up @@ -1304,3 +1329,34 @@ def test_get_backfill_id_filter(self, graphql_context):
assert not result.errors
assert result.data
_assert_results_match_count_match_expected(result, 0)

def test_runs_not_fetched_when_excluding_subruns_and_filtering_backfills(self, graphql_context):
# TestRunsFeedUniqueSetups::test_runs_not_fetched_when_excluding_subruns_and_filtering_backfills
def _fake_get_run_records(*args, **kwargs):
raise Exception("get_run_records should not be called")

backfill_id = _create_backfill(graphql_context)
_create_run_for_backfill(graphql_context, backfill_id)
_create_run_for_backfill(graphql_context, backfill_id)
_create_run_for_backfill(graphql_context, backfill_id)

with mock.patch.object(graphql_context.instance, "get_run_records", _fake_get_run_records):
result = execute_dagster_graphql(
graphql_context,
MINIMAL_GET_RUNS_FEED_QUERY,
variables={
"limit": 20,
"cursor": None,
"filter": {
"tags": [
{"key": BACKFILL_ID_TAG, "value": backfill_id},
]
},
"includeRunsFromBackfills": False,
},
)

assert not result.errors
assert result.data
assert len(result.data["runsFeedOrError"]["results"]) == 1
assert not result.data["runsFeedOrError"]["hasMore"]