-
Notifications
You must be signed in to change notification settings - Fork 30
/
Copy pathWhite-Phoenix.py
111 lines (97 loc) · 3.63 KB
/
White-Phoenix.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import logging
import os.path
import queue
import concurrent.futures
import sys
import threading
import utils
from extractors.pdf_extractor import PdfExtractor
from extractors.zip_extractor import ZipExtractor
from extractors.vm_extractor import VMExtractor
from identifiers.pdf_identifier import PdfIdentifier
from identifiers.zip_identifier import ZipIdentifier
path_queue = queue.Queue()
lock = threading.Lock()
def delete_folder_contents(folder_path):
# List all files and directories in the given folder
folder_contents = os.listdir(folder_path)
for item in folder_contents:
item_path = os.path.join(folder_path, item)
if os.path.isfile(item_path):
# Delete the file
os.remove(item_path)
elif os.path.isdir(item_path):
# Delete the subdirectory and its contents recursively
delete_folder_contents(item_path)
os.rmdir(item_path)
def get_paths(dir):
if dir:
return [dir]
else:
drives = []
if os.name == "nt": # For Windows
import string
drives = [d + ":" for d in string.ascii_uppercase if os.path.exists(d + ":")]
elif os.name == "posix": # For macOS and Linux
drives = ["/"]
return drives
def extract_data_from_file(output, separated_files, file_path, is_vm):
file_content = utils.read_file(file_path)
utils.verify_output(output)
if is_vm:
extractor = VMExtractor(file_content, output, file_path)
elif PdfIdentifier(file_content):
sys.stdout.flush()
extractor = PdfExtractor(file_content, output, separated_files, file_path)
elif ZipIdentifier(file_content):
extractor = ZipExtractor(file_content, output)
else:
logging.error("file not supported")
exit(-1)
extractor.extract_content()
def find_all_files_path(folder_path, output, vm):
fifo_queue = queue.Queue()
for root, dirs, files in os.walk(folder_path):
for file in files:
file_path = os.path.join(root, file)
try:
if vm:
fifo_queue.put(file_path)
else:
logging.info(f'checking: {file_path}')
with open(file_path, "rb") as open_file:
content = open_file.read()
if PdfIdentifier(content) or ZipExtractor(content, output):
fifo_queue.put(file_path)
except Exception as e:
logging.error(f'in file:{file_path} - {e}')
return fifo_queue
def main():
global path_queue
if os.path.exists('temp') is not True:
os.mkdir('temp')
else:
delete_folder_contents('temp')
args = utils.argparse()
utils.init_logger(args.disable_log)
if args.filename:
extract_data_from_file(args.output, args.separated_files, file_path=args.filename, is_vm=args.vm)
else:
thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
starting_paths = get_paths(args.dir)
for starting_path in starting_paths:
temp_queue = find_all_files_path(starting_path, args.output, args.vm)
while not temp_queue.empty():
path_queue.put(temp_queue.get())
while not path_queue.empty():
lock.acquire()
file_path = path_queue.get()
lock.release()
thread_pool.submit(extract_data_from_file, args.output, args.separated_files, file_path, args.vm)
thread_pool.shutdown(wait=True)
try:
delete_folder_contents('temp')
except Exception as e:
logging.error(e)
if __name__ == '__main__':
main()