-
Notifications
You must be signed in to change notification settings - Fork 381
/
Copy pathtest_data
executable file
·280 lines (240 loc) · 8.8 KB
/
test_data
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
#!/usr/bin/env python3
"""
Script to synchronize (local>remote and viceversa) test data files from/to GCS.
//test/data files are not checked in the codebase because they are large binary
file and change frequently. Instead we check-in only xxx.sha256 files, which
contain the SHA-256 of the actual binary file, and sync them from a GCS bucket.
File in the GCS bucket are content-indexed as gs://bucket/file_name-a1b2c3f4 .
Usage:
./test_data status # Prints the status of new & modified files.
./test_data download # To sync remote>local (used by install-build-deps).
./test_data upload # To upload newly created and modified files.
"""
import argparse
import logging
import os
import sys
import hashlib
import subprocess
from multiprocessing.pool import ThreadPool
from collections import namedtuple, defaultdict
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
BUCKET = 'gs://perfetto/test_data'
SUFFIX = '.sha256'
FS_MATCH = 'matches'
FS_NEW_FILE = 'needs upload'
FS_MODIFIED = 'modified'
FS_MISSING = 'needs download'
FileStat = namedtuple('FileStat',
['path', 'status', 'actual_digest', 'expected_digest'])
args = None
def relpath(path):
return os.path.relpath(path, ROOT_DIR)
def download(url, out_file):
subprocess.check_call(['curl', '-L', '-s', '-o', out_file, url])
def list_files(path, scan_new_files=False):
""" List files recursively in path.
If scan_new_files=False, returns only files with a maching xxx.sha256 tracker.
If scan_new_files=True returns all files including untracked ones.
"""
seen = set()
for root, _, files in os.walk(path):
for fname in files:
if fname.endswith('.swp'):
continue # Temporary files left around if CTRL-C-ing while downloading.
if fname in ["OWNERS", "README.md"]:
continue # OWNERS or README.md file should not be uploaded.
fpath = os.path.join(root, fname)
if not os.path.isfile(fpath) or fname.startswith('.'):
continue
if fpath.endswith(SUFFIX):
fpath = fpath[:-len(SUFFIX)]
elif not scan_new_files:
continue
if fpath not in seen:
seen.add(fpath)
yield fpath
def hash_file(fpath):
hasher = hashlib.sha256()
with open(fpath, 'rb') as f:
for chunk in iter(lambda: f.read(32768), b''):
hasher.update(chunk)
return hasher.hexdigest()
def map_concurrently(fn, files):
done = 0
for fs in ThreadPool(args.jobs).imap_unordered(fn, files):
assert (isinstance(fs, FileStat))
done += 1
if not args.quiet:
print(
'[%d/%d] %-60s' % (done, len(files), relpath(fs.path)[-60:]),
end='\r')
if not args.quiet:
print('')
def get_file_status(fpath):
sha_file = fpath + SUFFIX
sha_exists = os.path.exists(sha_file)
file_exists = os.path.exists(fpath)
actual_digest = None
expected_digest = None
if sha_exists:
with open(sha_file, 'r') as f:
expected_digest = f.readline().strip()
if file_exists:
actual_digest = hash_file(fpath)
if sha_exists and not file_exists:
status = FS_MISSING
elif not sha_exists and file_exists:
status = FS_NEW_FILE
elif not sha_exists and not file_exists:
raise Exception(fpath)
elif expected_digest == actual_digest:
status = FS_MATCH
else:
status = FS_MODIFIED
return FileStat(fpath, status, actual_digest, expected_digest)
def cmd_upload(dir):
all_files = list_files(dir, scan_new_files=True)
files_to_upload = []
for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files):
if fs.status in (FS_NEW_FILE, FS_MODIFIED):
files_to_upload.append(fs)
if len(files_to_upload) == 0:
if not args.quiet:
print('No modified or new files require uploading')
return 0
if args.dry_run:
return 0
if not args.quiet:
print('\n'.join(relpath(f.path) for f in files_to_upload))
print('')
print('About to upload %d files' % len(files_to_upload))
input('Press a key to continue or CTRL-C to abort')
def upload_one_file(fs):
assert (fs.actual_digest is not None)
dst_name = '%s/%s-%s' % (args.bucket, os.path.basename(
fs.path), fs.actual_digest)
cmd = ['gsutil', '-q', 'cp', '-n', '-a', 'public-read', fs.path, dst_name]
logging.debug(' '.join(cmd))
subprocess.check_call(cmd)
with open(fs.path + SUFFIX + '.swp', 'w') as f:
f.write(fs.actual_digest)
os.replace(fs.path + SUFFIX + '.swp', fs.path + SUFFIX)
return fs
map_concurrently(upload_one_file, files_to_upload)
return 0
def cmd_clean(dir):
all_files = list_files(dir, scan_new_files=True)
files_to_clean = []
for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files):
if fs.status in (FS_NEW_FILE, FS_MODIFIED):
files_to_clean.append(fs.path)
if len(files_to_clean) == 0:
if not args.quiet:
print('No modified or new files require cleaning')
return 0
if args.dry_run:
return 0
if not args.quiet:
print('\n'.join(relpath(f) for f in files_to_clean))
print('')
print('About to remove %d files' % len(files_to_clean))
input('Press a key to continue or CTRL-C to abort')
list(map(os.remove, files_to_clean))
return 0
def cmd_download(dir, overwrite_locally_modified=False):
files_to_download = []
modified = []
all_files = list_files(dir, scan_new_files=False)
for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files):
if fs.status == FS_MISSING:
files_to_download.append(fs)
elif fs.status == FS_MODIFIED:
modified.append(fs)
if len(modified) > 0 and not overwrite_locally_modified:
print('WARNING: The following files diverged locally and will NOT be ' +
'overwritten if you continue')
print('\n'.join(relpath(f.path) for f in modified))
print('')
print('Re run `download --overwrite` to overwrite locally modified files')
print('or `upload` to sync them on the GCS bucket')
print('')
input('Press a key to continue or CTRL-C to abort')
elif overwrite_locally_modified:
files_to_download += modified
if len(files_to_download) == 0:
if not args.quiet:
print('Nothing to do, all files are synced')
return 0
if not args.quiet:
print('Downloading %d files in //%s' %
(len(files_to_download), relpath(args.dir)))
if args.dry_run:
print('\n'.join(files_to_download))
return
def download_one_file(fs):
assert (fs.expected_digest is not None)
uri = '%s/%s-%s' % (args.bucket, os.path.basename(
fs.path), fs.expected_digest)
uri = uri.replace('gs://', 'https://storage.googleapis.com/')
logging.debug(uri)
tmp_path = fs.path + '.swp'
download(uri, tmp_path)
digest = hash_file(tmp_path)
if digest != fs.expected_digest:
raise Exception('Mismatching digest for %s. expected=%s, actual=%s' %
(uri, fs.expected_digest, digest))
os.replace(tmp_path, fs.path)
return fs
map_concurrently(download_one_file, files_to_download)
return 0
def cmd_status(dir):
files = list_files(dir, scan_new_files=True)
file_by_status = defaultdict(list)
num_files = 0
num_out_of_sync = 0
for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, files):
file_by_status[fs.status].append(relpath(fs.path))
num_files += 1
for status, rpaths in sorted(file_by_status.items()):
if status == FS_NEW_FILE and args.ignore_new:
continue
if status != FS_MATCH:
for rpath in rpaths:
num_out_of_sync += 1
if not args.quiet:
print('%-15s: %s' % (status, rpath))
if num_out_of_sync == 0:
if not args.quiet:
print('Scanned %d files in //%s, everything in sync.' %
(num_files, relpath(dir)))
return 0
return 1
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--dir', default=os.path.join(ROOT_DIR, 'test/data'))
parser.add_argument('--overwrite', action='store_true')
parser.add_argument('--bucket', default=BUCKET)
parser.add_argument('--jobs', '-j', default=10, type=int)
parser.add_argument('--dry-run', '-n', action='store_true')
parser.add_argument('--quiet', '-q', action='store_true')
parser.add_argument('--verbose', '-v', action='store_true')
parser.add_argument('--ignore-new', action='store_true')
parser.add_argument('cmd', choices=['status', 'download', 'upload', 'clean'])
global args
args = parser.parse_args()
logging.basicConfig(
format='%(asctime)s %(levelname).1s %(message)s',
level=logging.DEBUG if args.verbose else logging.INFO,
datefmt=r'%H:%M:%S')
if args.cmd == 'status':
return cmd_status(args.dir)
if args.cmd == 'download':
return cmd_download(args.dir, overwrite_locally_modified=args.overwrite)
if args.cmd == 'upload':
return cmd_upload(args.dir)
if args.cmd == 'clean':
return cmd_clean(args.dir)
print('Unknown command: %s' % args.cmd)
if __name__ == '__main__':
sys.exit(main())