-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathutils.py
192 lines (146 loc) · 5.46 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
from __future__ import annotations
import contextlib
import functools
import itertools
import json
from collections.abc import Iterable
from pathlib import Path
from typing import TYPE_CHECKING, Any, Iterator, Optional, Tuple, TypeVar
import django
import tqdm
from django.apps import apps
from django.conf import settings as django_settings
from django.db.models import Model
if TYPE_CHECKING:
from .strategies import Strategy
T = TypeVar("T")
TModel = TypeVar("TModel", bound=Model)
@functools.lru_cache(maxsize=1024)
def to_app_model_label(model: Model) -> str:
return "{}.{}".format(model._meta.app_label, model.__name__)
@functools.lru_cache(maxsize=1024)
def to_model(app_model_label: str) -> Optional[Model]:
app_label, model_name = app_model_label.split(".")
try:
return apps.get_model(app_label, model_name)
except LookupError:
# App is not installed, that's ok.
return None
def get_all_models() -> list[type[Model]]:
return apps.get_models(include_auto_created=True)
def migrations_file_path(dir: Path) -> Path:
return dir / "migrations.json"
def progress(sequence: Iterable[T]) -> tqdm.tqdm[T]:
return tqdm.tqdm(sequence)
def sort_model_strategies(
model_strategies: dict[str, list[Strategy]],
) -> list[tuple[str, Strategy]]:
model_dependencies = []
models = set()
for app_model_label, strategies in model_strategies.items():
model = to_model(app_model_label)
if not model:
continue
models.add(model)
if hasattr(model, "natural_key"):
deps = getattr(model.natural_key, "dependencies", [])
if deps:
deps = [apps.get_model(dep) for dep in deps]
else:
deps = []
for field in model._meta.fields:
if field.remote_field and field.remote_field.model != model:
deps.append(field.remote_field.model)
for field in model._meta.many_to_many:
if (
field.remote_field.through._meta.auto_created
and field.remote_field.model != model
):
deps.append(field.remote_field.model)
for strategy in strategies:
for dep in strategy.depends_on:
deps.append(to_model(dep))
model_dependencies.append((model, deps))
model_dependencies.reverse()
model_list = []
while model_dependencies:
skipped = []
changed = False
while model_dependencies:
model, deps = model_dependencies.pop()
# If all of the models in the dependency list are either already
# on the final model list, or not on the original serialization list,
# then we've found another model with all it's dependencies satisfied.
if all(d not in models or d in model_list for d in deps):
model_list.append(model)
changed = True
else:
skipped.append((model, deps))
if not changed:
raise RuntimeError(
"Can't resolve dependencies for {} in serialized app list.".format(
", ".join(
"{}.{}".format(
model._meta.app_label, model._meta.object_name
)
for model, _ in sorted(
skipped, key=lambda obj: obj[0].__name__
)
),
),
)
model_dependencies = skipped
return [
(to_app_model_label(y), x)
for y in model_list
for x in model_strategies[to_app_model_label(y)]
]
@functools.lru_cache(maxsize=32)
def get_exported_pks_for_model(dest: Path, model: type[Model]) -> list[str]:
return [
str(x["pk"])
for x in get_exported_objects_for_model(dest, model) # type: ignore[arg-type] # mypy can't see that models are hashable
]
@functools.lru_cache(maxsize=8)
def get_exported_objects_for_model(
dest: Path,
model: type[Model],
) -> list[dict[str, Any]]:
app_model_label = to_app_model_label(model) # type: ignore[arg-type] # mypy can't see that models are hashable
objects = []
data_dir = dest / app_model_label
data_files = data_dir.glob("*.json")
for data_file in data_files:
with data_file.open() as f:
try:
data = json.load(f)
except json.JSONDecodeError as e:
print("Invalid file {}".format(data_file))
raise e
objects.extend(data)
return objects
def is_empty_iterator(iterator: Iterator[T]) -> Tuple[Iterator[T], bool]:
try:
first = next(iterator)
except StopIteration:
empty = True
else:
empty = False
iterator = itertools.chain([first], iterator)
return (iterator, empty)
@contextlib.contextmanager
def disable_migrations() -> Iterator[None]:
original_migration_modules = django_settings.MIGRATION_MODULES
class DisableMigrations:
def __contains__(self, item: object) -> bool:
return True
def __getitem__(self, item: object) -> None:
return None
django_settings.MIGRATION_MODULES = DisableMigrations()
yield
django_settings.MIGRATION_MODULES = original_migration_modules
def nodb_cursor(connection): # type: ignore[no-untyped-def]
if django.VERSION < (3, 1):
return connection._nodb_connection.cursor()
else:
return connection._nodb_cursor()