|
| 1 | +import copy |
1 | 2 | import logging
|
2 | 3 | import re
|
3 | 4 |
|
|
8 | 9 | redact = r"(eyJh[-_\w]*\.)([-_\w]*)\."
|
9 | 10 |
|
10 | 11 |
|
| 12 | +def gred(g): |
| 13 | + """Redact the payload of the JWT, keeping the header and signature""" |
| 14 | + return f"{g.group(1)}REDACTED." if len(g.groups()) > 1 else g |
| 15 | + |
| 16 | + |
11 | 17 | class TokenMaskingFilter(logging.Filter):
|
12 | 18 | """Mask access_tokens in logs"""
|
13 | 19 |
|
14 | 20 | def filter(self, record):
|
15 | 21 | record.msg = self.sanitize_line(record.msg)
|
| 22 | + record.args = self.sanitize_args(record.args) |
16 | 23 | return True
|
17 | 24 |
|
18 | 25 | @staticmethod
|
19 |
| - def sanitize_line(line): |
20 |
| - def gred(g): |
21 |
| - """Redact the payload of the JWT, keeping the header and signature""" |
22 |
| - return f"{g.group(1)}REDACTED." if len(g.groups()) > 1 else g |
| 26 | + def sanitize_args(d): |
| 27 | + if isinstance(d, dict): |
| 28 | + d = d.copy() # so we don't overwrite anything |
| 29 | + for k, v in d.items(): |
| 30 | + d[k] = self.sanitize_line(v) |
| 31 | + elif isinstance(d, tuple): |
| 32 | + # need a deepcopy of tuple turned to a list, as to not change the original values |
| 33 | + # otherwise we end up changing the items at the original memory location of the passed in tuple |
| 34 | + y = copy.deepcopy(list(d)) |
| 35 | + for x, value in enumerate(y): |
| 36 | + if isinstance(value, str): |
| 37 | + y[x] = re.sub(redact, gred, value) |
| 38 | + return tuple(y) # convert the list back to a tuple |
| 39 | + return d |
23 | 40 |
|
| 41 | + @staticmethod |
| 42 | + def sanitize_line(line): |
24 | 43 | return re.sub(redact, gred, line)
|
0 commit comments