forked from snok/container-retention-policy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
301 lines (238 loc) · 10.2 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
from __future__ import annotations
import asyncio
from collections import namedtuple
from dataclasses import dataclass
from distutils.util import strtobool
from enum import Enum
from functools import partial
from sys import argv
from typing import TYPE_CHECKING
from urllib.parse import quote_from_bytes
from dateparser import parse
from httpx import AsyncClient
if TYPE_CHECKING:
from datetime import datetime
from typing import Any, Callable, Coroutine, Optional, Union
from httpx import Response
BASE_URL = 'https://api.github.com'
ImageName = namedtuple('ImageName', ['value', 'encoded'])
class TimestampType(Enum):
UPDATED_AT = 'updated_at'
CREATED_AT = 'created_at'
class AccountType(Enum):
ORG = 'org'
PERSONAL = 'personal'
async def list_org_package_versions(org_name: str, image_name: ImageName, http_client: AsyncClient) -> list[dict]:
"""
List image versions for an organization.
:param org_name: The name of the organization.
:param image_name: The name of the container image.
:param http_client: HTTP client.
:return: List of image objects.
"""
response = await http_client.get(f'{BASE_URL}/orgs/{org_name}/packages/container/{image_name.encoded}/versions?per_page=100')
response.raise_for_status()
return response.json()
async def list_package_versions(image_name: ImageName, http_client: AsyncClient) -> list[dict]:
"""
List image versions for a personal account.
:param image_name: The name of the container image.
:param http_client: HTTP client.
:return: List of image objects.
"""
response = await http_client.get(f'{BASE_URL}/user/packages/container/{image_name.encoded}/versions?per_page=100')
response.raise_for_status()
return response.json()
def post_deletion_output(response: Response, image_name: ImageName, version_id: int) -> None:
"""
Output a little info to the user.
"""
if response.is_error:
print(f'\nCouldn\'t delete {image_name.value}:{version_id}.\nStatus code: {response.status_code}\nResponse: {response.json()}\n')
else:
print(f'Deleted old image: {image_name.value}:{version_id}')
async def delete_org_package_versions(org_name: str, image_name: ImageName, version_id: int, http_client: AsyncClient) -> None:
"""
Delete an image version for an organization.
:param org_name: The name of the org.
:param image_name: The name of the container image.
:param version_id: The ID of the image version we're deleting.
:param http_client: HTTP client.
:return: Nothing - the API returns a 204.
"""
url = f'{BASE_URL}/orgs/{org_name}/packages/container/{image_name.encoded}/versions/{version_id}'
response = await http_client.delete(url)
post_deletion_output(response, image_name, version_id)
async def delete_package_versions(image_name: ImageName, version_id: int, http_client: AsyncClient) -> None:
"""
Delete an image version for a personal account.
:param image_name: The name of the container image.
:param version_id: The ID of the image version we're deleting.
:param http_client: HTTP client.
:return: Nothing - the API returns a 204.
"""
url = f'{BASE_URL}/user/packages/container/{image_name.encoded}/versions/{version_id}'
response = await http_client.delete(url)
post_deletion_output(response, image_name, version_id)
def get_image_version_tags(version: dict) -> list[str]:
"""
Return the list of tags on a container image.
"""
if 'metadata' in version and 'container' in version['metadata'] and 'tags' in version['metadata']['container']:
return version['metadata']['container']['tags']
return []
@dataclass
class Inputs:
"""
Class holds validated inputs, and unifies the API for org- and personal functions.
"""
parsed_cutoff: datetime
timestamp_type: TimestampType
account_type: AccountType
untagged_only: bool
skip_tags: list[str]
keep_at_least: int
org_name: Optional[str] = None
def __post_init__(self) -> None:
"""
Cast keep-at-least to int.
"""
self.keep_at_least = int(self.keep_at_least)
@property
def is_org(self) -> bool:
"""
Whether the account type is an org or not.
"""
return self.account_type == AccountType.ORG
@property
def list_package_versions(self) -> Callable[[ImageName, Any], Coroutine[Any, Any, list[dict]]]:
"""
Unify the API for package version list functions.
"""
if self.is_org:
return partial(list_org_package_versions, self.org_name)
else:
return list_package_versions
@property
def delete_package(self) -> Callable[[ImageName, int, Any], Coroutine[Any, Any, None]]:
"""
Unify the API for package deletion functions.
"""
if self.is_org:
return partial(delete_org_package_versions, self.org_name)
else:
return delete_package_versions
async def get_and_delete_old_versions(image_name: ImageName, inputs: Inputs, http_client: AsyncClient) -> None:
"""
Delete old package versions for an image name.
"""
versions = await inputs.list_package_versions(image_name, http_client)
if inputs.keep_at_least >= 0:
versions = versions[inputs.keep_at_least :]
tasks = []
for version in versions:
updated_or_created_at = parse(version[inputs.timestamp_type.value])
if not updated_or_created_at:
print(f'Skipping image version {version["id"]}. Unable to parse timestamps.')
continue
if updated_or_created_at > inputs.parsed_cutoff:
# Skipping because it's not below the datetime cut-off
continue
image_tags = get_image_version_tags(version)
if inputs.untagged_only and image_tags:
# Skipping because no tagged images should be deleted
continue
if any(tag in inputs.skip_tags for tag in image_tags):
# Skipping because this image version is tagged with a protected tag
continue
tasks.append(asyncio.create_task(inputs.delete_package(image_name, version['id'], http_client)))
if not tasks:
print(f'No more versions to delete for {image_name.value}')
await asyncio.gather(*tasks)
def validate_inputs(
account_type: str,
org_name: str,
timestamp_type: str,
cut_off: str,
untagged_only: Union[bool, str],
skip_tags: Optional[str],
keep_at_least: Optional[str],
) -> Inputs:
"""
Perform basic validation on the incoming parameters and return an Inputs instance.
"""
# For date parsing we use `dateparser`. If you're having issues getting this to work,
# check out https://dateparser.readthedocs.io/en/latest/.
if not (parsed_cutoff := parse(cut_off)):
raise ValueError(f"Unable to parse '{cut_off}'")
elif parsed_cutoff.tzinfo is None or parsed_cutoff.tzinfo.utcoffset(parsed_cutoff) is None:
raise ValueError('Timezone is required for the cut-off')
if account_type == 'org' and not org_name:
raise ValueError('org-name is required when account-type is org')
if isinstance(untagged_only, str):
untagged_only_ = strtobool(untagged_only) == 1
else:
untagged_only_ = untagged_only
if skip_tags is None:
skip_tags_ = []
else:
skip_tags_ = [i.strip() for i in skip_tags.split(',')]
if keep_at_least is None:
keep_at_least_ = 0
else:
keep_at_least_ = int(keep_at_least)
if keep_at_least_ < 0:
raise ValueError('keep-at-least must be 0 or positive')
return Inputs(
parsed_cutoff=parsed_cutoff,
timestamp_type=TimestampType(timestamp_type),
account_type=AccountType(account_type),
org_name=org_name if account_type == 'org' else None,
untagged_only=untagged_only_,
skip_tags=skip_tags_,
keep_at_least=keep_at_least_,
)
def parse_image_names(image_names: str) -> list[ImageName]:
"""
Return an ImageName for each images name received.
The image_name can be one or multiple image names, and should be comma-separated.
For images with special characters in the name (e.g., `/`), we must url-encode
the image names before passing them to the Github API, so we save both the url-
encoded and raw value to a named tuple.
"""
return [ImageName(img_name.strip(), quote_from_bytes(img_name.strip().encode('utf-8'), safe='')) for img_name in image_names.split(',')]
async def main(
account_type: str,
org_name: str,
image_names: str,
timestamp_type: str,
cut_off: str,
token: str,
untagged_only: Union[bool, str] = False,
skip_tags: Optional[str] = None,
keep_at_least: Optional[str] = None,
) -> None:
"""
Delete old image versions.
See action.yml for additional descriptions of each parameter.
:param account_type: Account type, must be 'org' or 'personal'.
:param org_name: The name of the org. Required if account type is 'org'.
:param image_names: The image names to delete versions for.
Can be a single image name, or multiple comma-separated image names.
:param timestamp_type: Which timestamp to base our cut-off on. Can be 'updated_at' or 'created_at'.
:param cut_off: Can be a human readable relative time like '2 days ago UTC', or a timestamp.
Must contain a reference to the timezone.
:param token: The personal access token to authenticate with.
:param untagged_only: Whether to only delete untagged images.
:param skip_tags: Comma-separated list of tags to not delete.
:param keep_at_least: Number of images to always keep
"""
parsed_image_names: list[ImageName] = parse_image_names(image_names)
inputs: Inputs = validate_inputs(account_type, org_name, timestamp_type, cut_off, untagged_only, skip_tags, keep_at_least)
headers = {'accept': 'application/vnd.github.v3+json', 'Authorization': f'Bearer {token}'}
async with AsyncClient(headers=headers) as http_client:
await asyncio.gather(
*(asyncio.create_task(get_and_delete_old_versions(image_name, inputs, http_client)) for image_name in parsed_image_names)
)
if __name__ == '__main__':
asyncio.run(main(*argv[1:]))