-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy path__init__.py
executable file
·361 lines (313 loc) · 12.2 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
#!/usr/bin/env python
"""
NOTE: You should really run this with root access as it will probably be
needed if you want full access to the Time Machine harddrive.
Or if there's a mismatch between your current users ID and the original file owner's ID
NOTE: This only supports files and directories.
It skips over links or any other non file/directory items.
"""
import shutil
import os
import os.path
import sys
import logging
import collections
logger = logging.getLogger('time_machine')
if sys.version_info >= (3, 0):
raw_input = input
class TimeMachineException(Exception):
pass
class TimeMachine(object):
backup_directory = 'Backups.backupdb'
store_directory = '.HFS+ Private Directory Data\x0D'
def __init__(self, mount_path, host=None, version=None, partition=None):
logger.info('Set mount_path "%s"', mount_path)
self.mount_path = dst_path = self.get_absolute_path(mount_path)
self._host = self._version = self._partition = None
if host is not None:
self.host = host
if version is not None:
self.version = version
if partition is not None:
self.partition = partition
def __ensure_value(self, name):
# Make sure we were supplied the value
value = getattr(self, '_{}'.format(name))
if value is None:
# If only one option is available, we use that
options = getattr(self, '{}s'.format(name))
if len(options) == 1:
return options[0]
raise ValueError('You must set a "%s" value.', name)
return value
@property
def hosts(self):
return [
x
for x in os.listdir(
os.path.join(
self.mount_path,
self.backup_directory
)
)
if not x.startswith('.')
]
def get_host(self):
return self.__ensure_value('host')
def set_host(self, host):
logger.info('Set Host "%s"', host)
self._host = host
host = property(get_host, set_host)
@property
def host_path(self):
return os.path.join(
self.mount_path,
self.backup_directory,
self.host
)
@property
def versions(self):
return [x for x in os.listdir(self.host_path) if not x.startswith('.')]
def get_version(self):
return self.__ensure_value('version')
def set_version(self, version):
logger.info('Set Version "%s"', version)
self._version = version
version = property(get_version, set_version)
@property
def version_path(self):
return os.path.join(self.host_path, self.version)
@property
def partitions(self):
return [x for x in os.listdir(self.version_path) if not x.startswith('.')]
def get_partition(self):
return self.__ensure_value('partition')
def set_partition(self, partition):
logger.info('Set Partition "%s"', partition)
self._partition = partition
partition = property(get_partition, set_partition)
@property
def partition_path(self):
return os.path.join(
self.version_path,
self.partition
)
def get_real_path(self, path):
if path.startswith('/'):
path = path[1:]
if path.endswith('/'):
path = path[:-1]
bits = path.split('/')
new_path = self.partition_path
while bits:
item = bits.pop(0)
new_path = os.path.join(new_path, item)
if os.path.isfile(new_path):
stats = os.stat(new_path)
size = stats.st_size
link = stats.st_nlink
if not size and link > 3:
new_path = os.path.join(
self.mount_path,
self.store_directory,
'dir_{}'.format(link)
)
elif not os.path.isdir(new_path):
# TODO: Handle non file and directory items, such as links
return None
logger.debug('Real Path "%s" -> "%s"', path, new_path)
return new_path
@staticmethod
def get_absolute_path(path):
return os.path.abspath(os.path.expanduser(path))
def copy_path(self, path, dst_path):
"""
Given a path that would have worked on the backed up machine,
copy the path out of the timemachine to a specified location.
"""
dst_path = self.get_absolute_path(dst_path)
real_path = self.get_real_path(path)
logger.info('Copy Path "%s" -> "%s" -> "%s"', path, real_path, dst_path)
if not real_path or not os.path.exists(real_path):
logger.warn('Src path does not exist. Skipping. "%s" -> "%s"', path, real_path)
elif os.path.isfile(real_path):
self.copy_file(path, dst_path)
elif os.path.isdir(real_path):
self.copy_directory(path, dst_path)
def copy_file(self, path, dst_path):
"""
Given a path to a file that would have worked on the backed up machine,
copy the path out of the timemachine to a specified location.
"""
dst_path = self.get_absolute_path(dst_path)
real_path = self.get_real_path(path)
logger.info('Copy File "%s" -> "%s" -> "%s"', path, real_path, dst_path)
if not real_path or not os.path.exists(real_path):
logger.warn('Src file path does not exist. Skipping. "%s" -> "%s"', path, real_path)
else:
try:
shutil.copy(real_path, dst_path)
except IOError as e:
raise
logger.warn('Could not copy file "%s" -> "%s" -> "%s"\n%s', path, real_path, dst_path, str(e))
def copy_directory(self, path, dst_path):
"""
Given a path to a directory that would
have worked on the backed up machine,
copy the path out of the timemachine to a specified location.
"""
dst_path = self.get_absolute_path(dst_path)
real_path = self.get_real_path(path)
logger.info('Copy Directory "%s" -> "%s" -> "%s"', path, real_path, dst_path)
if not real_path or not os.path.exists(real_path):
logger.warn('Src directory path does not exist. Skipping. "%s" -> "%s"', path, real_path)
else:
if path.endswith('/'):
path = path[:-1]
dst_path = os.path.join(dst_path, os.path.split(path)[1])
if not os.path.exists(dst_path):
os.makedirs(dst_path)
for item in self.listdir(real_path):
new_item_path = os.path.join(path, item)
self.copy_path(new_item_path, dst_path)
@staticmethod
def listdir(path):
# TODO: Handle non file and directory items, such as links
return [
x for x in os.listdir(path) if
os.path.isdir(os.path.join(path, x)) or
os.path.isfile(os.path.join(path, x))
]
class InteractiveTimeMachine(TimeMachine):
def __init__(self, mount_path, host=None, version=None, partition=None, path=None, dst_path=None):
super(InteractiveTimeMachine, self).__init__(mount_path, host, version, partition)
try:
self.host
except ValueError:
self.host = self.interactive_select(
self.hosts,
'Select the host to use:'
)
try:
self.version
except ValueError:
self.version = self.interactive_select(
self.versions,
'Select the version to use:'
)
try:
self.partition
except ValueError:
self.partition = self.interactive_select(
self.partitions,
'Select the partition to use:'
)
if path and dst_path:
self.copy_path(path, dst_path)
else:
self.interactive_directory_select(path)
def interactive_select(self, choices, message, depth=0):
if isinstance(choices, list):
choices = collections.OrderedDict([(i, choice) for i, choice in enumerate(choices)])
while 1:
print(message)
for i, choice in choices.items():
print('{}. {}'.format(i, choice))
choice = raw_input('Enter 0-{}: '.format(choices.keys()[-1]))
if (choice == '0' or choice == '..') and depth:
choice = None
break
if choice.isdigit() and int(choice) in choices:
choice = choices[int(choice)]
break
cs = collections.OrderedDict()
cs[0] = '..'
c = None
for i, c in choices.items():
if c == choice:
break
if c.startswith(choice):
cs[i] = c
if c == choice:
break
if len(cs) == 2:
choice = cs[cs.keys()[1]]
break
if len(cs) == 1:
continue
choice = self.interactive_select(cs, message, depth + 1)
if choice is not None:
break
return choice
def interactive_directory_select(self, path=None):
path = path if path else '/'
while 1:
real_path = self.get_real_path(path)
print(real_path)
if os.path.isfile(real_path):
self.interactive_copy_path(path)
break
else:
choices = ['.', '..'] + self.listdir(real_path)
message = '\n{}'.format('-' * 50)
message += '\nSelect a directory to enter or file to copy:'
message += '\nPWD: "{}"'.format(path)
choice = self.interactive_select(choices, message)
if choice == '.':
self.interactive_copy_path(path)
break
elif choice == '..':
path = '/'.join(path.split('/')[:-1])
elif os.path.isfile(self.get_real_path(os.path.join(path, choice))):
self.interactive_copy_path(os.path.join(path, choice))
break
else:
path = os.path.join(path, choice)
def interactive_copy_path(self, path):
dst_path = raw_input('Please enter a destination directory: ')
self.copy_path(path, dst_path)
def usage(msg=''):
if msg:
msg = '\nERROR: {}\n'.format(msg)
return '''
> timemachine --mount_path PATH [--host HOST] [--version VERSION] [--partition PARTITION] [--path PATH] [--dst_path PATH]
{}
REQUIRED:
--mount_path PATH Location where the timemachine HD is mounted. Usually something like "/Volumes/__HD_NAME__".
OPTIONAL:
--path PATH Local path to directory to start in. We'll try to infer anything not provided.
--host HOST Name of the host who's timemachine we want to access. Defaults to one with the most recent version.
--version VERSION The name of the timemachine versions we want to access. Defaults to "Latest".
--partition PARTITION The name of the partition in the timemachine we want to access. Usually "Macintosh HD"
--dst_path PATH If provided we just copy path to dst_path, no interactive folder walking.
'''.format(msg)
def get_arg(args, name):
value = None
flag = '--{}'.format(name)
equals = [x for x in args if x.startswith('{}='.format(flag))]
if equals:
value = equals[0]
value = value[value.find('=') + 1:]
if not value and flag in args:
loc = args.index(flag)
if len(args) > loc + 1 and not args[loc + 1].startswith('--'):
value = args[loc + 1]
return value
if __name__ == "__main__":
logging.basicConfig(level=logging.WARN, format='%(asctime)s - %(levelname)s - %(message)s')
kwargs = {
'mount_path': None,
'host': None,
'version': None,
'partition': None,
'path': None,
'dst_path': None,
}
for n in kwargs.keys():
kwargs[n] = get_arg(sys.argv, n)
if '--help' in sys.argv:
print(usage())
elif kwargs['mount_path'] is None:
print(usage('You MUST provide a "--mount_path" value.'))
else:
InteractiveTimeMachine(**kwargs)