-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexport_encryption_domains.py
302 lines (240 loc) · 8.69 KB
/
export_encryption_domains.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
from datetime import datetime
import traceback
import subprocess
import os
import sys
import json
import time
import csv
import logging
import argparse
from argparse import RawTextHelpFormatter
###Global Variables###
# bash scripting
global shell
shell = '#!/bin/bash'
global cpprofile
cpprofile = '''source /etc/profile.d/CP.sh
source /etc/profile.d/vsenv.sh
source $MDSDIR/scripts/MDSprofile.sh
source $MDS_SYSTEM/shared/mds_environment_utils.sh
source $MDS_SYSTEM/shared/sh_utilities.sh
'''
# timestamp
global now
nowtmp = datetime.now()
now = nowtmp.strftime("%m-%d-%y_%H-%M-%S")
# filepaths
global gwpath, gwbin, gwout
gwpath = os.path.dirname(os.path.abspath(__file__))
gwbin = f'{gwpath}/scripts'
gwout = f'{gwpath}/output'
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S',
filename=f'{gwpath}/log.log',
filemode='w')
class Log:
@classmethod
def debug(cls, msg):
logging.debug(msg)
@classmethod
def info(cls, msg):
logging.info(msg)
@classmethod
def error(cls, msg):
logging.error(msg)
###Debugging Functions###
# pause script, take any input to continue
def pause_debug():
input("[ DEBUG ] Press any key to continue...\n\n")
# script exit
def end():
sys.exit(0)
def args():
parser = argparse.ArgumentParser(add_help=False,
formatter_class=RawTextHelpFormatter,
prog=f'python3 {os.path.basename(__file__)}',
description='Collect Gateway Encryption Domains',
epilog=f'''
[ Notes ]
None.
[ Scope ]
For MDM environment only.
[ Description ]
Collect networks/hosts of each encryption domain.
[ Folders ]
Main Path: {gwpath}
script and log
Output: {gwout}
encryption_domains.csv and .json '''
)
parser.add_argument('-d', '--debug', action='store_true') # enable debugging in logs
parser.add_argument('-h', '--help', action='help', default=argparse.SUPPRESS,
help='')
a = vars(parser.parse_args())
global debug
if a['debug'] is True:
debug = 1
else:
debug = 0
# make log directory / clear old log files
def mkdir():
Log.info(f'[ mkdir | {gwpath} | {gwbin} | {gwout}]\n')
if os.path.isdir(gwpath) and os.path.isdir(gwbin) and os.path.isdir(gwout):
Log.info(f'... Exists!\n')
else:
Log.info(f'... Does not exist\n')
os.system(f'mkdir -v {gwpath}')
os.system(f'mkdir -v {gwbin}')
os.system(f'mkdir -v {gwout}')
# create bash scripts
def runcmd(cmd, script):
script = f'{gwbin}/{script}'
bash=f"""{shell}
{cpprofile}
{cmd}
exit 0
"""
if debug == 1:
Log.debug(f'''[runcmd]\n-----\n{bash}\n [ script]\n{script}\n-----''')
with open(script, 'w') as f:
f.write(bash)
os.system(f"chmod +x {script}")
try:
response = subprocess.check_output(script, shell=True, text=True, timeout=120)
except subprocess.TimeoutExpired as e:
Log.error(traceback.print_exc())
Log.error(f"[runcmd] : Error : {e}")
if debug == 1:
Log.debug(f"[runcmd]\n-----\n{response}\n-----\n")
return response
# make list of CMA IP Addresses
def domains():
global domain_ips, domain_names, domain_map
cmd = "mdsstat | grep -i cma | awk '{print $6}' | grep -v 138.108.2.29"
domain_ips = runcmd(cmd, 'domains_ips.sh').split()
if debug == 1:
Log.debug(f"[ DOMAIN LIST ]\n{domain_ips}\n")
cmd = "mdsstat | grep -i cma | awk '{print $4}' | grep -v TCS"
domain_names = runcmd(cmd, 'domain_names').split()
domain_map = {}
for x,y in zip(domain_ips,domain_names):
domain_map[x] = y
Log.info(f'Domain Mapping : {domain_map}')
def vpndomains():
global vpngws
vpngws = {}
for domain in domain_ips:
Log.info(f'[vpndomains] : {domain_map[domain]}')
vpngws[domain_map[domain]] = {}
cmds = {
'simclu' : f'mgmt_cli -r true -d {domain} show simple-clusters details-level full limit 500 --format json',
'simgw' : f'mgmt_cli -r true -d {domain} show simple-gateways details-level full limit 500 --format json'
}
for dev,cmd in cmds.items():
Log.info(f'[vpndomains] : {dev}')
gws = json.loads(runcmd(cmd, f'show_simple_{dev}_{domain}.sh'))
if gws.get('code') == 'generic_error':
Log.info(f'Generic Error : {domain} : {dev} : ignoring...')
pass
else:
try:
for gw in gws['objects']:
Log.info(f"[Gateway] : {gw['name']}")
if gw.get('vpn') == False or gw.get('externally-managed') == True or gw.get('vpn-settings').get('vpn-domain') == None:
Log.info(f'Externally Managed or no vpn-domain')
pass
else:
devices = show_group(gw['vpn-settings']['vpn-domain']['name'], domain)
vpngws[domain_map[domain]].update({gw['name'] : devices})
except KeyError as e:
Log.error(traceback.print_exc())
Log.error(f"[runcmd] : Error : {domain} : {gw['name']} {e}")
pass
# get json object from mgmt api
def show_group(groupName, domain):
Log.info(f'[show_group] : {groupName}')
groups = []
cmd = f"mgmt_cli -r true -d {domain} show group name {groupName} details-level full --format json"
result = runcmd(cmd, f'show_{groupName}.sh')
groupjson = json.loads(result)
parsed = parser(groupjson, domain)
return parsed
# parse json object
def parser(data, domain):
Log.info(f'[parser] : {domain}')
if debug == 1:
Log.debug(f'-----\n\n{data}\n\n-----')
pause_debug()
global hosts,ranges,networks
hosts = {}
ranges = {}
networks = {}
cluster = {}
clustermember = {}
groups = {}
cphost = {}
try:
for ip in data['members']:
if ip['type'] == 'host':
hosts[ip['name']] = ip['ipv4-address']
elif ip['type'] == 'address-range':
ranges[ip['name']] = str(ip['ipv4-address-first']) + '-' + str(ip['ipv4-address-last'])
elif ip['type'] == 'network':
networks[ip['name']] = ip['subnet4'] + '/' + str(ip['mask-length4'])
elif ip['type'] == 'cluster-member':
clustermember[ip['name']] = ip['ip-address']
elif ip['type'] == 'simple-cluster':
for clumem in ip['cluster-members']:
cluster[clumem['name']] = clumem['ip-address']
elif ip['type'] == 'checkpoint-host':
cphost[ip['name']] = ip['ipv4-address']
elif ip['type'] == 'group':
groups[ip['name']] = show_group(ip['name'], domain)
else:
Log.error(f"[parser] {domain} : Mising Object Type: {ip['name']} : {ip['type']}\n")
Log.error(f"[parser] Screenshot and RFE to Cody Ellis\n")
except Exception as e:
print(f"[parser] Error: {e}\n{domain} : {ip['name']} : {ip['type']}")
print(traceback.format_exc())
pout = {}
for x in hosts, networks, ranges, cluster, clustermember, cphost, groups:
pout.update(x)
return pout
def output(dict, fn):
fn = f'{gwout}/{fn}'
# gateway command output
with open(f'{fn}.json', 'w') as f:
f.write(json.dumps(dict, indent=4, sort_keys=False))
# make csv of stdout information
fcsv = f'{fn}.csv'
with open(fcsv, 'w') as f:
w = csv.writer(f)
w.writerows(dict.items())
def cleanup():
# remove undeleted tmp scripts
os.system(f"rm {gwbin}/*")
def main():
# Help Menu and configuration
args()
# create direcotries
mkdir()
# get domains list
domains()
# get list of gateways from domains
vpndomains()
# create file
output(vpngws, 'vpn_gateways_encryption_domain_names')
if __name__ == "__main__":
try:
#time start
starttime = time.time()
Log.info(f"Start Time: {starttime}")
cleanup()
main()
except Exception as e:
Log.error(f"[main] : Error : {e}\n")
Log.error(traceback.print_exc())
finally:
end()