|
| 1 | +from .SnapshotValue import SnapshotValue |
| 2 | +from collections import OrderedDict |
| 3 | + |
| 4 | + |
| 5 | +class Snapshot: |
| 6 | + def __init__(self, subject, facet_data): |
| 7 | + self._subject = subject |
| 8 | + self._facet_data = facet_data |
| 9 | + |
| 10 | + @property |
| 11 | + def facets(self): |
| 12 | + return OrderedDict(sorted(self._facet_data.items())) |
| 13 | + |
| 14 | + def __eq__(self, other): |
| 15 | + if not isinstance(other, Snapshot): |
| 16 | + return NotImplemented |
| 17 | + return self._subject == other._subject and self._facet_data == other._facet_data |
| 18 | + |
| 19 | + def __hash__(self): |
| 20 | + return hash((self._subject, frozenset(self._facet_data.items()))) |
| 21 | + |
| 22 | + def plus_facet(self, key, value): |
| 23 | + if isinstance(value, bytes): |
| 24 | + value = SnapshotValue.of(value) |
| 25 | + elif isinstance(value, str): |
| 26 | + value = SnapshotValue.of(value) |
| 27 | + return self._plus_facet(key, value) |
| 28 | + |
| 29 | + def _plus_facet(self, key, value): |
| 30 | + if not key: |
| 31 | + raise ValueError("The empty string is reserved for the subject.") |
| 32 | + facet_data = dict(self._facet_data) |
| 33 | + facet_data[self._unix_newlines(key)] = value |
| 34 | + return Snapshot(self._subject, facet_data) |
| 35 | + |
| 36 | + def plus_or_replace(self, key, value): |
| 37 | + if not key: |
| 38 | + return Snapshot(value, self._facet_data) |
| 39 | + facet_data = dict(self._facet_data) |
| 40 | + facet_data[self._unix_newlines(key)] = value |
| 41 | + return Snapshot(self._subject, facet_data) |
| 42 | + |
| 43 | + def subject_or_facet_maybe(self, key): |
| 44 | + if not key: |
| 45 | + return self._subject |
| 46 | + return self._facet_data.get(key) |
| 47 | + |
| 48 | + def subject_or_facet(self, key): |
| 49 | + value = self.subject_or_facet_maybe(key) |
| 50 | + if value is None: |
| 51 | + raise KeyError(f"'{key}' not found in {list(self._facet_data.keys())}") |
| 52 | + return value |
| 53 | + |
| 54 | + def all_entries(self): |
| 55 | + entries = [("", self._subject)] |
| 56 | + entries.extend(self._facet_data.items()) |
| 57 | + return entries |
| 58 | + |
| 59 | + def __bytes__(self): |
| 60 | + return f"[{self._subject} {self._facet_data}]" |
| 61 | + |
| 62 | + @staticmethod |
| 63 | + def of(data): |
| 64 | + if isinstance(data, bytes): |
| 65 | + # Handling binary data |
| 66 | + return Snapshot(SnapshotValue.of(data), {}) |
| 67 | + elif isinstance(data, str): |
| 68 | + # Handling string data |
| 69 | + return Snapshot(SnapshotValue.of(data), {}) |
| 70 | + elif isinstance(data, SnapshotValue): |
| 71 | + return Snapshot(data, {}) |
| 72 | + else: |
| 73 | + raise TypeError("Data must be either binary or string" + data) |
| 74 | + |
| 75 | + @staticmethod |
| 76 | + def of_entries(entries): |
| 77 | + subject = None |
| 78 | + facet_data = {} |
| 79 | + for key, value in entries: |
| 80 | + if not key: |
| 81 | + if subject is not None: |
| 82 | + raise ValueError( |
| 83 | + f"Duplicate root snapshot.\n first: {subject}\nsecond: {value}" |
| 84 | + ) |
| 85 | + subject = value |
| 86 | + else: |
| 87 | + facet_data[key] = value |
| 88 | + if subject is None: |
| 89 | + subject = SnapshotValue.of("") |
| 90 | + return Snapshot(subject, facet_data) |
| 91 | + |
| 92 | + @staticmethod |
| 93 | + def _unix_newlines(string): |
| 94 | + return string.replace("\\r\\n", "\\n") |
0 commit comments