-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhelpers.py
431 lines (367 loc) · 13.4 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
import re
import asyncio
from datetime import datetime
import logging
import os.path
import aiohttp
import certifi
import requests
from shutil import copyfile
from azure.common.credentials import get_azure_cli_credentials
import openpyxl
from openpyxl.worksheet.table import Table, TableStyleInfo
from openpyxl.utils.cell import get_column_letter
logger = logging.getLogger(__name__)
def handle_custom_ssl():
"""
This function is used to inject custom CA certificate into the system.
It is used to overcome the SSL certificate verification problem
by injecting locally stored PEM certificate into certifi store
in virtual env.
"""
certifi_ca_file = certifi.where()
certifi_ca_file_original = f"{certifi_ca_file}.orig"
if os.path.isfile(certifi_ca_file_original):
# Restore original certifi pem if .orig exists
copyfile(certifi_ca_file_original, certifi_ca_file)
else:
# Backup original certifi pem
copyfile(certifi_ca_file, certifi_ca_file_original)
custom_ca_file = "certificate.pem"
if os.path.isfile(custom_ca_file):
logger.info(f"Found custom CA file {custom_ca_file} ..")
custom_ca_file_bytes = open(custom_ca_file, "rb").read()
venv_ca_path = certifi_ca_file
logger.info(f"Injecting custom CA into {venv_ca_path} ..")
with open(venv_ca_path, "ab") as f:
f.write(custom_ca_file_bytes)
logger.info("Done ..")
def set_headers(credentials):
"""
This function is used to set the headers for the requests to the Azure API.
It takes the credentials object as an argument and returns the headers.
The headers are used to authenticate the requests to the Azure API.
The credentials object is used to get the access token.
Parameters:
credentials (object): The credentials object is used to get the access token.
Returns:
headers (dict):
The headers are used to authenticate the requests to the Azure API.
"""
headers = credentials.signed_session().headers
headers["Content-type"] = "application/json"
return headers
def get_tenant(headers):
"""
This function returns the tenant details of the organization.
Parameters:
headers (dict): A dictionary of HTTP headers containing the authorization token.
Returns:
dict: The tenant detils of the organization.
"""
try:
response = requests.get(
"https://graph.microsoft.com/v1.0/organization", headers=headers, timeout=50
).json()
return response["value"][0]
except Exception as e:
raise Exception(e) from e
def get_azure_credentials(endpoint):
"""
This function is used to get the credentials from the Azure CLI.
Parameters:
endpoint: The resource to get the credentials for.
Returns:
credentials: The credentials for the resource.
"""
try:
credentials, subscription_id = get_azure_cli_credentials(resource=endpoint)
return credentials
except Exception as e:
raise Exception(e) from e
def get_auth_user_details(headers, endpoint, api_version="beta"):
"""
This function takes a list of user ids and returns a list of dictionaries
containing the user's MFA registration details.
Parameters:
headers: A dictionary containing the authorization header.
endpoint: The endpoint to query.
user_ids: A list of user ids.
api_version: The version of the API to query.
Returns:
A list of dictionaries containing the user's MFA registration details.
"""
try:
params = {"api-version": api_version}
response = requests.get(
f"{endpoint}/myorganization/activities/authenticationMethodUserDetails",
headers=headers,
params=params,
timeout=50,
)
if hasattr(response, "json"):
response = response.json()
else:
raise Exception(response.content)
if "value" not in response:
raise Exception(response)
response = response["value"]
return response
except Exception as e:
raise Exception(e) from e
async def get_url(url, user_id, headers, session, params):
"""
This generic function is used to get the data from the API
using parallel aiohttp library.
Parameters:
url: The url of the API.
headers: The headers of the API.
session: The session of the API.
params: The parameters of the API.
Returns:
The data from the API.
"""
try:
async with session.get(url=url, headers=headers, params=params) as response:
response = await response.json()
# Handle Windows AD accounts.
if "id" not in response:
response["id"] = user_id
if "signInActivity" not in response:
response["signInActivity"] = {}
response["signInActivity"]["lastSignInDateTime"] = None
response["signInActivity"]["lastNonInteractiveSignInDateTime"] = None
return response
except Exception as e:
raise Exception(e) from e
async def get_aad_users(headers, endpoint, user_ids, query_select, api_version="beta"):
"""
This function takes a list of user ids and returns a list of dictionaries
containing the user's details.
Parameters:
headers: A dictionary containing the authorization header.
endpoint: The endpoint to query.
user_ids: A list of user ids.
api_version: The version of the API to query.
Returns:
A list of dictionaries containing the user's details.
"""
connector = aiohttp.TCPConnector(
limit=10, force_close=True, enable_cleanup_closed=True
)
async with aiohttp.ClientSession(connector=connector) as session:
user_details = await asyncio.gather(
*[
get_url(
f"{endpoint}/{api_version}/users/{user_id}",
user_id,
headers,
session,
params={"$select": ",".join(query_select)},
)
for user_id in user_ids
]
)
return user_details
def item_to_string(item):
"""
Converts an item to a string.
Parameters
----------
item : object
The item to convert to a string.
Returns
-------
str
The item converted to a string.
Notes
-----
If the item is None, then "N/A" is returned.
If the item is True, then "Yes" is returned.
If the item is False, then "No" is returned.
If the item is a string, then the string is returned.
If the item is a datetime,
then the string representation of the datetime is returned.
"""
if item is None:
return "N/A"
if item is True:
return "Yes"
if item is False:
return "No"
if isinstance(item, str):
try:
return is_datetime(item)
except ValueError:
return item
return item
def is_external(item):
"""
This function takes a string as an argument and returns a string.
The string is either "True" or "False".
The function checks if the string contains the "#EXT#" substring
which indicates AAD guest user.
"""
item = bool(re.search("#EXT#", item))
return item_to_string(item)
def is_datetime(item):
"""
This function takes a string and returns a datetime object if the string
is a valid date/time representation. Otherwise it returns "Never".
Parameters:
item (str): A string representing a date/time.
Returns:
str: A string representing a date/time, or "Never".
"""
if item and item is not None:
date = datetime.strptime(item, "%Y-%m-%dT%H:%M:%SZ")
if date.year > 1999:
return date.isoformat()
return "Never"
return "N/A"
def is_external_domain(item):
"""
This function takes a string as input and returns the external domain name.
It uses a regular expression to extract the guest account external domain name.
It returns "N/A" if the URL is not a valid external guest account domain.
Parameters:
item (str): The string to be processed.
Returns:
str: The domain name of the URL.
"""
item = re.findall("(?<=_)(.*)(?=#EXT#)", item)
if item:
return item[0]
return "N/A"
def get_tenant_domain(item):
"""
This function takes a string as input and returns the tenant domain name.
It uses a regular expression to extract the tenant domain name user is member of.
It returns "N/A" if the parameter is not a valid tenant domain.
Parameters:
item (str): The string to be processed.
Returns:
str: The domain name of the URL.
"""
item = re.findall("(?<=@)(.*)(?=$)", item)
if item:
return item[0]
return "N/A"
def get_mfa_methods(item):
"""
This function returns the MFA methods configured for a user.
It accepts a single argument, which is the user object.
It returns a string with the MFA methods separated by a comma.
If no MFA methods are configured, it returns "No AAD MFA configured".
"""
if item:
return ",".join(set(item))
return "No AAD MFA configured"
def xlsx_dict_prep(data):
"""
This function takes a list of dictionaries and returns a list of dictionaries.
The output list of dictionaries has the following keys:
- userId
- isEnabled
- userDisplayName
- userPrincipalName
- isExternal
- externalDomain
- externalUserState
- externalUserStateLastChangeUTC
- tenantDomain
- methodsRegistered
- onPremisesSyncEnabled
- lastInteractiveSignInUTC
- lastNonInteractiveSignInUTC
The input list of dictionaries is expected to be a list of dictionaries
representing the merged output of the `get_aad_users` and
`get_auth_user_details` functions.
The output list of dictionaries is a list of dictionaries, each representing
a single user.
"""
return [
{
"userId": x["id"],
"isEnabled": item_to_string(x["accountEnabled"]),
"userDisplayName": x["userDisplayName"],
"userPrincipalName": x["userPrincipalName"],
"isExternal": is_external(x["userPrincipalName"]),
"externalDomain": is_external_domain(x["userPrincipalName"]),
"externalUserState": item_to_string(x["externalUserState"]),
"externalUserStateLastChangeUTC": item_to_string(
x["externalUserStateChangeDateTime"]
),
"tenantDomain": get_tenant_domain(x["userPrincipalName"]),
"methodsRegistered": get_mfa_methods(x["methodsRegistered"]),
"onPremisesSyncEnabled": item_to_string(x["onPremisesSyncEnabled"]),
"lastInteractiveSignInUTC": is_datetime(
x["signInActivity"]["lastSignInDateTime"]
),
"lastNonInteractiveSignInUTC": is_datetime(
x["signInActivity"]["lastNonInteractiveSignInDateTime"]
),
}
for x in data
]
def adjust_column_width(sheet):
"""
Adjust the column width of a worksheet.
:param sheet: The worksheet to be adjusted.
:type sheet: openpyxl.worksheet.worksheet.Worksheet
This function takes a sheet as an argument and adjusts the width of each column
to fit the contents of that column.
Uses the column_letter function from the openpyxl module to get the
column letter of each column.
"""
for col in sheet.columns:
max_length = 0
column = col[0].column_letter # Get the column letter
for cell in col:
try: # Necessary to avoid error on empty cells
if len(str(cell.value)) > max_length:
max_length = len(cell.value)
except ValueError:
pass
adjusted_width = (max_length + 2) * 1.2
sheet.column_dimensions[column].width = adjusted_width
def generate_xlsx(data, sheet_title, table_style, filename):
"""
Generate an Excel file from a list of dictionaries.
:param data: A list of dictionaries.
:param sheet_title: The title of the Excel sheet.
:param table_style: The name of the Excel table style.
:param filename: The name of the Excel file.
:return: None
Example:
>>> data = [{'name': 'John', 'age': 20}, {'name': 'Brian', 'age': 25}]
>>> generate_xlsx(data, 'My Table', 'Table Style Medium 15', 'my_table.xlsx')
"""
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.title = sheet_title
headers = list(data[0].keys())
for i in range(1, len(headers) + 1):
cell = sheet.cell(row=1, column=i)
cell.value = headers[i - 1]
for i in range(2, len(data) + 2):
for j in range(1, len(headers) + 1):
cell = sheet.cell(row=i, column=j)
cell.value = data[i - 2][headers[j - 1]]
xlsx_header_letter = get_column_letter(len(headers))
xlsx_header_number = len(data) + 1
tab = Table(
displayName=sheet_title.replace(" ", "_").lower(),
ref=f"A1:{xlsx_header_letter}{xlsx_header_number}",
)
style = TableStyleInfo(
name=table_style,
showFirstColumn=False,
showLastColumn=False,
showRowStripes=True,
showColumnStripes=True,
)
tab.tableStyleInfo = style
sheet.add_table(tab)
adjust_column_width(sheet)
workbook.save(filename)