Skip to content

Commit

Permalink
[client] handle command_line as clear text and parent process name
Browse files Browse the repository at this point in the history
  • Loading branch information
isselparra committed Nov 12, 2024
1 parent 9c15bd9 commit 524fe8b
Show file tree
Hide file tree
Showing 2 changed files with 249 additions and 35 deletions.
78 changes: 43 additions & 35 deletions pyobas/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,19 +457,51 @@ def match_alert_element_fuzzy(self, signature_value, alert_values, fuzzy_scoring
return False

def match_alert_elements(self, signatures, alert_data):
return self._match_alert_elements_original(
signatures, alert_data
) or self._match_alert_elements_for_command_line(signatures, alert_data)
filtered_signatures = [signature for signature in signatures if signature.get('type') != "parent_process_name"]
filtered_alert_data = {key: value for key, value in alert_data.items() if key != "parent_process_name"}
return self._match_parent_process(signatures, alert_data) and (self._match_alert_elements_all_signatures(filtered_signatures, filtered_alert_data)
or self._match_alert_elements_at_least_one_signature(filtered_signatures, filtered_alert_data)
or self._match_alert_elements_for_command_line_detected_as_file(filtered_signatures, filtered_alert_data))

def _match_parent_process(self, signatures, alert_data):
found_signature_parent_process_name = next((signature for signature in signatures if signature.get('type') == 'parent_process_name'), None)
if found_signature_parent_process_name is None:
return True
found_alert_parent_process_name = alert_data.get("parent_process_name")
if found_alert_parent_process_name is None:
return True
alert_parent_process_name_value = next(iter(found_alert_parent_process_name.get("data")), None)
if alert_parent_process_name_value is None:
return True
obas_parent_process_name_pattern = r"^obas-implant-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$"
if re.match(obas_parent_process_name_pattern, alert_parent_process_name_value):
signature_parent_process_name_value = found_signature_parent_process_name.get("value")
if signature_parent_process_name_value == alert_parent_process_name_value:
return True
else:
return False
return True

def _match_alert_elements_original(self, signatures, alert_data):
def _match_alert_elements_all_signatures(self, signatures, alert_data):
matching_number, signatures_number = self._get_number_of_matches(signatures, alert_data)
if signatures_number == matching_number:
return True
return False

def _match_alert_elements_at_least_one_signature(self, signatures, alert_data):
matching_number, _ = self._get_number_of_matches(signatures, alert_data)
if matching_number > 0:
return True
return False

def _get_number_of_matches(self, signatures, alert_data):
# Example for alert_data
# {"process_name": {"list": ["xx", "yy"], "fuzzy": 90}}
relevant_signatures = [
s for s in signatures if s["type"] in self.relevant_signatures_types
]

# Matching logics
signatures_number = len(relevant_signatures)
matching_number = 0
for signature in relevant_signatures:
alert_data_for_signature = alert_data[signature["type"]]
Expand All @@ -484,51 +516,27 @@ def _match_alert_elements_original(self, signatures, alert_data):
signature_result = signature["value"] in str(
alert_data_for_signature["data"]
)

if signature_result:
matching_number = matching_number + 1
return matching_number, len(relevant_signatures)

if signatures_number == matching_number:
return True
return False

def _match_alert_elements_for_command_line(self, signatures, alert_data):
def _match_alert_elements_for_command_line_detected_as_file(self, signatures, alert_data):
command_line_signatures = [
signature
for signature in signatures
if signature.get("type") == "command_line"
]
if len(command_line_signatures) == 0:
return False
key_types = ["command_line", "command_line_base64", "process_name", "file_name"]
key_types = ["file_name", "process_name"]
alert_datas = [alert_data.get(key) for key in key_types if key in alert_data]
for signature in command_line_signatures:
signature_result = False
signature_value = self._decode_value(signature["value"]).strip().lower()
signature_value = signature["value"].strip().lower()
for alert_data in alert_datas:
trimmed_lowered_datas = [s.strip().lower() for s in alert_data["data"]]
signature_result = any(
data in signature_value for data in trimmed_lowered_datas
)
if signature_result:
return True
if signature_result:
return True
return False

def _decode_value(self, signature_value):
if _is_base64_encoded(signature_value):
try:
decoded_bytes = base64.b64decode(signature_value)
decoded_str = decoded_bytes.decode("utf-8")
return decoded_str
except Exception as e:
self.logger.error(str(e))
else:
return signature_value


def _is_base64_encoded(str_maybe_base64):
# Check if the length is a multiple of 4 and matches the Base64 character set
base64_pattern = re.compile(r"^[A-Za-z0-9+/]*={0,2}$")
return len(str_maybe_base64) % 4 == 0 and bool(
base64_pattern.match(str_maybe_base64)
)
206 changes: 206 additions & 0 deletions pyobas/test_open_bas_detection_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
import unittest
from unittest.mock import MagicMock

from helpers import OpenBASDetectionHelper


class TestOpenBASDetectionHelper(unittest.TestCase):

def setUp(self):
self.mock_logger = MagicMock()
self.relevant_signatures_types = ["parent_process_name", "command_line", "command_line_base64"]
self.detection_helper = OpenBASDetectionHelper(self.mock_logger, self.relevant_signatures_types)

def test_should_match_alert_with_clear_alert_command_line(self):
signatures = [{
"type": "parent_process_name",
"value": 'obas-implant-04942182-fb2f-41e3-a3c9-cb0eac1cd2d9'
},
{
"type": "command_line",
"value": "SCHTASKS /Create /SC ONCE /TN spawn /TR C:\\windows\\system32\\cmd.exe /ST 20:10\n"
},{
"type": "command_line_base64",
"value": "U0NIVEFTS1MgL0NyZWF0ZSAvU0MgT05DRSAvVE4gc3Bhd24gL1RSIEM6XHdpbmRvd3Ncc3lzdGVtMzJcY21kLmV4ZSAvU1QgMjA6MTAK"
}
]
alert_data = {
"parent_process_name": {
"type": "fuzzy",
"data": ["pwsh.exe"],
"score": 80
},
"command_line": {
"type": "fuzzy",
"data": ['"schtasks.exe" /Create /SC ONCE /TN spawn /TR C:\\windows\\system32\\cmd.exe /ST 20:10'],
"score": 60
},
"command_line_base64": {
"type": "fuzzy",
"data": ['"schtasks.exe" /Create /SC ONCE /TN spawn /TR C:\\windows\\system32\\cmd.exe /ST 20:10'],
"score": 60
}
}

result = self.detection_helper.match_alert_elements(signatures, alert_data)
self.assertTrue(result)

def test_should_match_alert_with_command_line_detected_as_file_on_alert(self):
signatures = [{
"type": "parent_process_name",
"value": 'obas-implant-82dc95ac-2d98-41a5-8a74-4103286e0342'
},
{
"type": "command_line",
"value": 'curl -o /tmp/eicar.com.txt https://secure.eicar.org/eicar.com.txt'
},
{
"type": "command_line_base64",
"value": 'Y3VybCAtbyAvdG1wL2VpY2FyLmNvbS50eHQgaHR0cHM6Ly9zZWN1cmUuZWljYXIub3JnL2VpY2FyLmNvbS50eHQ='
}]
alert_data = {
"parent_process_name": {
"type": "fuzzy",
"data": [],
"score": 80
},
"file_name": {
"type": "fuzzy",
"data": ["eicar.com.txt"],
"score": 60
},
"process_name": {
"type": "fuzzy",
"data": ["eicar.com.txt"],
"score": 60
},
"command_line": {
"type": "fuzzy",
"data": [],
"score": 60
},
"command_line_base64": {
"type": "fuzzy",
"data": [],
"score": 60
}
}

result = self.detection_helper.match_alert_elements(signatures, alert_data)
self.assertTrue(result)

def test_should_not_match_alert_with_command_line_detected_as_file_on_alert(self):
signatures = [{
"type": "parent_process_name",
"value": 'obas-implant-82dc95ac-2d98-41a5-8a74-4103286e0342'
},
{
"type": "command_line",
"value": 'curl -o /tmp/eicar.com.txt https://secure.eicar.org/eicar.com.txt'
},
{
"type": "command_line_base64",
"value": 'Y3VybCAtbyAvdG1wL2VpY2FyLmNvbS50eHQgaHR0cHM6Ly9zZWN1cmUuZWljYXIub3JnL2VpY2FyLmNvbS50eHQ='
}]
alert_data = {
"parent_process_name": {
"type": "fuzzy",
"data": [],
"score": 80
},
"file_name": {
"type": "fuzzy",
"data": ["eicar.zip"],
"score": 60
},
"process_name": {
"type": "fuzzy",
"data": ["eicar.zip"],
"score": 60
},
"command_line": {
"type": "fuzzy",
"data": [],
"score": 60
},
"command_line_base64": {
"type": "fuzzy",
"data": [],
"score": 60
}
}

result = self.detection_helper.match_alert_elements(signatures, alert_data)
self.assertFalse(result)


def test_should_match_alert_parent_process_name(self):
signatures = [{
"type": "parent_process_name",
"value": 'obas-implant-04942182-fb2f-41e3-a3c9-cb0eac1cd2d9'
},
{
"type": "command_line",
"value": "SCHTASKS /Create /SC ONCE /TN spawn /TR C:\\windows\\system32\\cmd.exe /ST 20:10\n"
},
{
"type": "command_line_base64",
"value": "U0NIVEFTS1MgL0NyZWF0ZSAvU0MgT05DRSAvVE4gc3Bhd24gL1RSIEM6XHdpbmRvd3Ncc3lzdGVtMzJcY21kLmV4ZSAvU1QgMjA6MTAK"
}]
alert_data = {
"parent_process_name": {
"type": "fuzzy",
"data": ["obas-implant-04942182-fb2f-41e3-a3c9-cb0eac1cd2d9"],
"score": 80
},
"command_line": {
"type": "fuzzy",
"data": ['"schtasks.exe" /Create /SC ONCE /TN spawn /TR C:\\windows\\system32\\cmd.exe /ST 20:10'],
"score": 60
},
"command_line_base64": {
"type": "fuzzy",
"data": ['"schtasks.exe" /Create /SC ONCE /TN spawn /TR C:\\windows\\system32\\cmd.exe /ST 20:10'],
"score": 60
}
}

result = self.detection_helper.match_alert_elements(signatures, alert_data)
self.assertTrue(result)

def test_should_not_match_alert_parent_process_name(self):
signatures = [{
"type": "parent_process_name",
"value": 'obas-implant-04942182-fb2f-41e3-a3c9-cb0eac1cd2d9'
},
{
"type": "command_line",
"value": "SCHTASKS /Create /SC ONCE /TN spawn /TR C:\\windows\\system32\\cmd.exe /ST 20:10\n"
},
{
"type": "command_line_base64",
"value": "U0NIVEFTS1MgL0NyZWF0ZSAvU0MgT05DRSAvVE4gc3Bhd24gL1RSIEM6XHdpbmRvd3Ncc3lzdGVtMzJcY21kLmV4ZSAvU1QgMjA6MTAK"
}]
alert_data = {
"parent_process_name": {
"type": "fuzzy",
"data": ["obas-implant-44942182-fb2f-41e3-a3c9-cb0eac1cd2d9"],
"score": 80
},
"command_line": {
"type": "fuzzy",
"data": ['"schtasks.exe" /Create /SC ONCE /TN spawn /TR C:\\windows\\system32\\cmd.exe /ST 20:10'],
"score": 60
},
"command_line_base64": {
"type": "fuzzy",
"data": ['"schtasks.exe" /Create /SC ONCE /TN spawn /TR C:\\windows\\system32\\cmd.exe /ST 20:10'],
"score": 60
}
}

result = self.detection_helper.match_alert_elements(signatures, alert_data)
self.assertFalse(result)

if __name__ == '__main__':
unittest.main()

0 comments on commit 524fe8b

Please sign in to comment.