Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add IPv6 obfuscation to cleaner #4362

Merged
merged 6 commits into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions insights/cleaner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

from insights.cleaner.filters import AllowFilter
from insights.cleaner.hostname import Hostname
from insights.cleaner.ip import IPv4 # IPv6
from insights.cleaner.ip import IPv4, IPv6
from insights.cleaner.keyword import Keyword

# from insights.cleaner.mac import Mac
Expand Down Expand Up @@ -92,8 +92,8 @@ def __init__(self, config, rm_conf, fqdn=None):
if config and config.obfuscate:
# - IPv4 obfuscation
self.obfuscate.update(ip=IPv4())
# # - IPv6 obfuscation
# self.obfuscate.update(ipv6=IPv6()) if config.obfuscate_ipv6 else None
# - IPv6 obfuscation
self.obfuscate.update(ipv6=IPv6()) if config.obfuscate_ipv6 else None
# - Hostname obfuscation
(
self.obfuscate.update(hostname=Hostname(self.fqdn))
Expand Down Expand Up @@ -140,7 +140,6 @@ def _clean_line(line):
# - Keyword
# - Mac
# - Password
no_obfuscate.append('ipv6') if no_obfuscate and 'ip' in no_obfuscate else None
for obf in set(self.obfuscate.keys()) - set(no_obfuscate or []):
if self.obfuscate[obf]:
parsers.append((self.obfuscate[obf], {'width': width}))
Expand Down Expand Up @@ -211,14 +210,17 @@ def generate_rhsm_facts(self):
ipv4 = self.obfuscate.get('ip')
ipv4_mapping = ipv4.mapping() if ipv4 else []

ipv6 = self.obfuscate.get('ipv6')
ipv6_mapping = ipv6.mapping() if ipv6 else []

facts = {
'insights_client.hostname': self.fqdn,
'insights_client.obfuscate_ip_enabled': 'ip' in self.obfuscate,
# 'insights_client.obfuscate_ipv6_enabled': 'ipv6' in self.obfuscate,
'insights_client.obfuscate_ipv6_enabled': 'ipv6' in self.obfuscate,
# 'insights_client.obfuscate_mac_enabled': 'mac' in self.obfuscate,
'insights_client.obfuscate_hostname_enabled': 'hostname' in self.obfuscate,
'insights_client.obfuscated_ipv4': json.dumps(ipv4_mapping),
# 'insights_client.obfuscated_ipv6': json.dumps(),
'insights_client.obfuscated_ipv6': json.dumps(ipv6_mapping),
# 'insights_client.obfuscated_mac': json.dumps(),
'insights_client.obfuscated_keyword': json.dumps(kw_mapping),
'insights_client.obfuscated_hostname': json.dumps(hn_mapping),
Expand Down
106 changes: 95 additions & 11 deletions insights/cleaner/ip.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@

"""

import hashlib
import logging
import os
import re
import six
import socket
import struct

Expand All @@ -32,6 +34,8 @@ def __init__(self):
self._ip_db = dict() # IP database
self._start_ip = '10.230.230.1'
self._ignore_list = ["127.0.0.1"]
# self.pattern = r'((?<!(\.|\d))([0-9]{1,3}\.){3}([0-9]){1,3}(\/([0-9]{1,2}))?)'
self.pattern = r"(((\b25[0-5]|\b2[0-4][0-9]|\b1[0-9][0-9]|\b[1-9][0-9]|\b[1-9]))(\.(\b25[0-5]|\b2[0-4][0-9]|\b1[0-9][0-9]|\b[1-9][0-9]|\b[0-9])){3})"

def _ip2int(self, ipstr):
# converts a dotted decimal IP address into an integer that can be incremented
Expand Down Expand Up @@ -114,15 +118,13 @@ def _sub_ip_keep_width(line, ip):
if not line:
return line
try:
pattern = r"(((\b25[0-5]|\b2[0-4][0-9]|\b1[0-9][0-9]|\b[1-9][0-9]|\b[1-9]))(\.(\b25[0-5]|\b2[0-4][0-9]|\b1[0-9][0-9]|\b[1-9][0-9]|\b[0-9])){3})"
ips = [each[0] for each in re.findall(pattern, line)]
if len(ips) > 0:
for ip in sorted(ips, key=len, reverse=True):
if ip not in self._ignore_list and ip in line:
if kwargs.get('width', False):
line = _sub_ip_keep_width(line, ip)
else:
line = _sub_ip(line, ip)
ips = [each[0] for each in re.findall(self.pattern, line)]
for ip in sorted(ips or [], key=len, reverse=True):
if ip not in self._ignore_list: # ip must in line
if kwargs.get('width', False):
line = _sub_ip_keep_width(line, ip)
else:
line = _sub_ip(line, ip)
return line
except Exception as e: # pragma: no cover
logger.warning(e)
Expand Down Expand Up @@ -150,10 +152,92 @@ def generate_report(self, report_dir, archive_name):
logger.info('Completed IP Report.')


# TODO
class IPv6(object):
"""
Class for obfuscating IPv6.
"""

pass
def __init__(self):
self._ipv6_db = dict() # IPv6 database
# Ignore list for IPv6
self._ignore_list = [r'\s+'] # ignore whitespace
# IPv6 pattern, stolen from sos
# FIXME:
# This pattern is not perfect, e.g. it cannot match "::1" perfectly.
# Hence, we add the above ignore list to avoid the roughly match that
# includes whitespace.
self.pattern = (
r"(?<![:\\.\\-a-z0-9])((([0-9a-f]{1,4})(:[0-9a-f]{1,4}){7})|"
r"(([0-9a-f]{1,4}(:[0-9a-f]{0,4}){0,5}))([^.])::(([0-9a-f]{1,4}"
r"(:[0-9a-f]{1,4}){0,5})?))(/\d{1,3})?(?![:\\a-z0-9])"
)

def _ip2db(self, ip):
'''
Add an IPv6 address to IPv6 database and return obfuscated address.

FORMAT:
{$original_ip:, $obfuscated_ip}
'''

def obfuscate_hex(_hex):
if _hex:
n_0_hex = _hex.lstrip('0').lower()
if n_0_hex:
old_hex = n_0_hex.encode('utf-8') if six.PY3 else n_0_hex
new_hex = hashlib.sha1(old_hex).hexdigest()[: len(old_hex)]
return '0' * (len(_hex) - len(n_0_hex)) + new_hex
return '0' * len(_hex)
return ''

try:
if ip in self._ipv6_db:
return self._ipv6_db[ip]
if ip in self._ipv6_db.values(): # pragma: no cover
# avoid nested obfuscating
return None
self._ipv6_db[ip] = ':'.join(obfuscate_hex(h) for h in ip.split(':'))
return self._ipv6_db[ip]
except Exception as e: # pragma: no cover
logger.warning(e)
raise Exception('SubIPv6Error: Unable to Substitute IPv6 Address - %s', ip)

def parse_line(self, line, **kwargs):

def _sub_ip(line, ip):
new_ip = self._ip2db(ip)
if new_ip:
logger.debug("Obfuscating IPv6 - %s > %s", ip, new_ip)
return line.replace(ip, new_ip)
# it's an obfuscated IP
return line

if not line:
return line

for ip in re.findall(self.pattern, line, re.I):
if any(re.search(_i, ip[0], re.I) for _i in self._ignore_list):
continue
line = _sub_ip(line, ip[0])
return line

def mapping(self):
mapping = []
for k, v in self._ipv6_db.items():
mapping.append({'original': k, 'obfuscated': v})
return mapping

def generate_report(self, report_dir, archive_name):
try:
ip_report_file = os.path.join(report_dir, "%s-ipv6.csv" % archive_name)
logger.info('Creating IPv6 Report - %s', ip_report_file)
lines = ['Obfuscated IPv6,Original IPv6']
for k, v in self._ipv6_db.items():
lines.append('{0},{1}'.format(v, k))
except Exception as e: # pragma: no cover
logger.exception(e)
raise Exception('CreateReport Error: Error Creating IPv6 Report')

write_report(lines, ip_report_file)

logger.info('Completed IPv6 Report.')
4 changes: 4 additions & 0 deletions insights/client/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@
# non-CLI
'default': False
},
'obfuscate_ipv6': {
# non-CLI
'default': False
},
'obfuscate_hostname': {
# non-CLI
'default': False
Expand Down
Loading
Loading