Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement storage- and relationship-aware cleanup #973

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ sentry-sdk>=2.13.0
sentry-sdk[celery]
SQLAlchemy
SQLAlchemy-Utils
sqlparse
statsd
stripe>=11.4.1
time-machine
Expand Down
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,9 @@ sqlalchemy-utils==0.36.8
# -r requirements.in
# pytest-sqlalchemy
sqlparse==0.5.0
# via django
# via
# -r requirements.in
# django
statsd==3.3.0
# via -r requirements.in
stripe==11.4.1
Expand Down
Empty file added services/cleanup/__init__.py
Empty file.
52 changes: 52 additions & 0 deletions services/cleanup/cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import logging

from django.db.models.query import QuerySet

from services.cleanup.models import MANUAL_CLEANUP
from services.cleanup.relations import build_relation_graph
from services.cleanup.utils import CleanupContext, CleanupResult, CleanupSummary

log = logging.getLogger(__name__)


def run_cleanup(
query: QuerySet,
) -> CleanupSummary:
"""
Cleans up all the models and storage files reachable from the given `QuerySet`.

This deletes all database models in topological sort order, and also removes
all the files in storage for any of the models in the relationship graph.

Returns the number of models and files being cleaned up in total, and per-Model.
"""
context = CleanupContext()
models_to_cleanup = build_relation_graph(query)

summary = {}
cleaned_models = 0
cleaned_files = 0

for model, query in models_to_cleanup:
manual_cleanup = MANUAL_CLEANUP.get(model)
if manual_cleanup is not None:
result = manual_cleanup(context, query)
else:
result = CleanupResult(query._raw_delete(query.db))

if result.cleaned_models > 0 or result.cleaned_files > 0:
summary[model] = result

log.info(
f"Finished cleaning up {model.__name__}",
extra={
"cleaned_models": result.cleaned_models,
"cleaned_files": result.cleaned_files,
},
)

cleaned_models += result.cleaned_models
cleaned_files += result.cleaned_files

totals = CleanupResult(cleaned_models, cleaned_files)
return CleanupSummary(totals, summary)
199 changes: 199 additions & 0 deletions services/cleanup/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
import dataclasses
import itertools
from collections import defaultdict
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor
from functools import partial

from django.db.models import Model
from django.db.models.query import QuerySet
from shared.bundle_analysis import StoragePaths
from shared.django_apps.compare.models import CommitComparison
from shared.django_apps.core.models import Commit, Pull
from shared.django_apps.profiling.models import ProfilingUpload
from shared.django_apps.reports.models import CommitReport, ReportDetails
from shared.django_apps.reports.models import ReportSession as Upload
from shared.django_apps.staticanalysis.models import StaticAnalysisSingleFileSnapshot

from services.archive import ArchiveService, MinioEndpoints
from services.cleanup.utils import CleanupContext, CleanupResult

MANUAL_QUERY_CHUNKSIZE = 5_000
DELETE_FILES_BATCHSIZE = 50


def cleanup_files_batched(
context: CleanupContext, buckets_paths: dict[str, list[str]]
) -> int:
cleaned_files = 0

# TODO: maybe reuse the executor across calls?
with ThreadPoolExecutor() as e:
for bucket, paths in buckets_paths.items():
for batched_paths in itertools.batched(paths, DELETE_FILES_BATCHSIZE):
e.submit(context.storage.delete_files, bucket, list(batched_paths))
cleaned_files += len(paths)

return cleaned_files


def cleanup_with_storage_field(
path_field: str,
context: CleanupContext,
query: QuerySet,
) -> CleanupResult:
cleaned_files = 0

# delete `None` `path_field`s right away
cleaned_models = query.filter(**{f"{path_field}__isnull": True})._raw_delete(
query.db
)

# delete all those files from storage, using chunks based on the `id` column
storage_query = query.filter(**{f"{path_field}__isnull": False}).order_by("id")

while True:
storage_paths = storage_query.values_list(path_field, flat=True)[
:MANUAL_QUERY_CHUNKSIZE
]
if len(storage_paths) == 0:
break

cleaned_files += cleanup_files_batched(
context, {context.default_bucket: storage_paths}
)
cleaned_models += query.filter(
id__in=storage_query[:MANUAL_QUERY_CHUNKSIZE]
)._raw_delete(query.db)

return CleanupResult(cleaned_models, cleaned_files)


def cleanup_archivefield(
field_name: str, context: CleanupContext, query: QuerySet
) -> CleanupResult:
model_field_name = f"_{field_name}_storage_path"

return cleanup_with_storage_field(model_field_name, context, query)


# This has all the `Repository` fields needed by `get_archive_hash`
@dataclasses.dataclass
class FakeRepository:
repoid: int
service: str
service_id: str


def cleanup_commitreport(context: CleanupContext, query: QuerySet) -> CleanupResult:
coverage_reports = query.values_list(
"report_type",
"code",
"external_id",
"commit__commitid",
"commit__repository__repoid",
"commit__repository__author__service",
"commit__repository__service_id",
).order_by("id")

cleaned_models = 0
cleaned_files = 0
repo_hashes: dict[int, str] = {}

while True:
reports = coverage_reports[:MANUAL_QUERY_CHUNKSIZE]
if len(reports) == 0:
break

buckets_paths: dict[str, list[str]] = defaultdict(list)
for (
report_type,
report_code,
external_id,
commit_sha,
repoid,
repo_service,
repo_service_id,
) in reports:
if repoid not in repo_hashes:
fake_repo = FakeRepository(
repoid=repoid, service=repo_service, service_id=repo_service_id
)
repo_hashes[repoid] = ArchiveService.get_archive_hash(fake_repo)
repo_hash = repo_hashes[repoid]

# depending on the `report_type`, we have:
# - a `chunks` file for coverage
# - a `bundle_report.sqlite` for BA
if report_type == "bundle_analysis":
path = StoragePaths.bundle_report.path(
repo_key=repo_hash, report_key=external_id
)
buckets_paths[context.bundleanalysis_bucket].append(path)
elif report_type == "test_results":
# TA has cached rollups, but those are based on `Branch`
pass
else:
chunks_file_name = report_code if report_code is not None else "chunks"
path = MinioEndpoints.chunks.get_path(
version="v4",
repo_hash=repo_hash,
commitid=commit_sha,
chunks_file_name=chunks_file_name,
)
buckets_paths[context.default_bucket].append(path)

cleaned_files += cleanup_files_batched(context, buckets_paths)
cleaned_models += query.filter(
id__in=query.order_by("id")[:MANUAL_QUERY_CHUNKSIZE]
)._raw_delete(query.db)

return CleanupResult(cleaned_models, cleaned_files)


def cleanup_upload(context: CleanupContext, query: QuerySet) -> CleanupResult:
cleaned_files = 0

# delete `None` `storage_path`s right away
cleaned_models = query.filter(storage_path__isnull=True)._raw_delete(query.db)

# delete all those files from storage, using chunks based on the `id` column
storage_query = query.filter(storage_path__isnull=False).order_by("id")

while True:
uploads = storage_query.values_list("report__report_type", "storage_path")[
:MANUAL_QUERY_CHUNKSIZE
]
if len(uploads) == 0:
break

buckets_paths: dict[str, list[str]] = defaultdict(list)
for report_type, storage_path in uploads:
if report_type == "bundle_analysis":
buckets_paths[context.bundleanalysis_bucket].append(storage_path)
else:
buckets_paths[context.default_bucket].append(storage_path)

cleaned_files += cleanup_files_batched(context, buckets_paths)
cleaned_models += query.filter(
id__in=storage_query[:MANUAL_QUERY_CHUNKSIZE]
)._raw_delete(query.db)

return CleanupResult(cleaned_models, cleaned_files)


# All the models that need custom python code for deletions so a bulk `DELETE` query does not work.
MANUAL_CLEANUP: dict[
type[Model], Callable[[CleanupContext, QuerySet], CleanupResult]
] = {
Commit: partial(cleanup_archivefield, "report"),
Pull: partial(cleanup_archivefield, "flare"),
ReportDetails: partial(cleanup_archivefield, "files_array"),
CommitReport: cleanup_commitreport,
Upload: cleanup_upload,
CommitComparison: partial(cleanup_with_storage_field, "report_storage_path"),
ProfilingUpload: partial(cleanup_with_storage_field, "raw_upload_location"),
StaticAnalysisSingleFileSnapshot: partial(
cleanup_with_storage_field, "content_location"
),
}
54 changes: 54 additions & 0 deletions services/cleanup/owner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import logging

from django.db import transaction
from django.db.models import Q
from shared.django_apps.codecov_auth.models import Owner, OwnerProfile
from shared.django_apps.core.models import Commit, Pull, Repository

from services.cleanup.cleanup import run_cleanup
from services.cleanup.utils import CleanupSummary

log = logging.getLogger(__name__)

CLEAR_ARRAY_FIELDS = ["plan_activated_users", "organizations", "admins"]


def cleanup_owner(owner_id: int) -> CleanupSummary:
log.info("Started/Continuing Owner cleanup", extra={"owner_id": owner_id})

clear_owner_references(owner_id)
owner_query = Owner.objects.filter(ownerid=owner_id)
summary = run_cleanup(owner_query)

log.info("Owner cleanup finished", extra={"owner_id": owner_id, "summary": summary})
return summary


# TODO: maybe turn this into a `MANUAL_CLEANUP`?
def clear_owner_references(owner_id: int):
"""
This clears the `ownerid` from various DB arrays where it is being referenced.
"""

OwnerProfile.objects.filter(default_org=owner_id).update(default_org=None)
Owner.objects.filter(bot=owner_id).update(bot=None)
Repository.objects.filter(bot=owner_id).update(bot=None)
Commit.objects.filter(author=owner_id).update(author=None)
Pull.objects.filter(author=owner_id).update(author=None)

# This uses a transaction / `select_for_update` to ensure consistency when
# modifying these `ArrayField`s in python.
# I don’t think we have such consistency anyplace else in the codebase, so
# if this is causing lock contention issues, its also fair to avoid this.
with transaction.atomic():
filter = Q()
for field in CLEAR_ARRAY_FIELDS:
filter = filter | Q(**{f"{field}__contains": [owner_id]})

owners_with_reference = Owner.objects.select_for_update().filter(filter)
for owner in owners_with_reference:
for field in CLEAR_ARRAY_FIELDS:
array = getattr(owner, field)
setattr(owner, field, [x for x in array if x != owner_id])

owner.save(update_fields=CLEAR_ARRAY_FIELDS)
Loading
Loading