Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[client] Handle command_line as clear text and optional matching of parent process name(#1685) #38

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,28 @@ jobs:
- ms-teams/report:
only_on_fail: true
webhook_url: $MS_TEAMS_WEBHOOK_URL
tests:
docker:
- image: cimg/python:3.12
working_directory: ~/repo
steps:
- checkout
- run:
name: install dependencies
command: pip3 install -r requirements.txt --user
- run:
name: install test-dependencies
command: pip3 install -r test-requirements.txt --user
- run:
name: run tests
command: python -m unittest discover pyobas
- slack/notify:
branch_pattern: main
event: fail
template: basic_fail_1
- ms-teams/report:
only_on_fail: true
webhook_url: $MS_TEAMS_WEBHOOK_URL
build:
working_directory: ~/openbas-client
docker:
Expand Down Expand Up @@ -144,6 +166,10 @@ workflows:
filters:
tags:
only: /.*/
- tests:
filters:
tags:
only: /.*/
- build:
filters:
tags:
Expand Down
91 changes: 17 additions & 74 deletions pyobas/helpers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import base64
import json
import os
import re
Expand Down Expand Up @@ -457,78 +456,22 @@ def match_alert_element_fuzzy(self, signature_value, alert_values, fuzzy_scoring
return False

def match_alert_elements(self, signatures, alert_data):
return self._match_alert_elements_original(
signatures, alert_data
) or self._match_alert_elements_for_command_line(signatures, alert_data)

def _match_alert_elements_original(self, signatures, alert_data):
# Example for alert_data
# {"process_name": {"list": ["xx", "yy"], "fuzzy": 90}}
relevant_signatures = [
s for s in signatures if s["type"] in self.relevant_signatures_types
]

# Matching logics
signatures_number = len(relevant_signatures)
matching_number = 0
for signature in relevant_signatures:
alert_data_for_signature = alert_data[signature["type"]]
signature_result = False
if alert_data_for_signature["type"] == "fuzzy":
signature_result = self.match_alert_element_fuzzy(
signature["value"],
alert_data_for_signature["data"],
alert_data_for_signature["score"],
)
elif alert_data_for_signature["type"] == "simple":
signature_result = signature["value"] in str(
alert_data_for_signature["data"]
)

if signature_result:
matching_number = matching_number + 1

if signatures_number == matching_number:
return True
return False

def _match_alert_elements_for_command_line(self, signatures, alert_data):
command_line_signatures = [
signature
for signature in signatures
if signature.get("type") == "command_line"
]
if len(command_line_signatures) == 0:
return False
key_types = ["command_line", "process_name", "file_name"]
alert_datas = [alert_data.get(key) for key in key_types if key in alert_data]
for signature in command_line_signatures:
signature_result = False
signature_value = self._decode_value(signature["value"]).strip().lower()
for alert_data in alert_datas:
trimmed_lowered_datas = [s.strip().lower() for s in alert_data["data"]]
signature_result = any(
data in signature_value for data in trimmed_lowered_datas
)
if signature_result:
return self._match_parent_process(signatures, alert_data)

def _match_parent_process(self, signatures, alert_data):
signature_parent_process_name_value = next(
(
signature.get("value")
for signature in signatures
if signature.get("type") == "parent_process_name"
),
None,
)
obas_parent_process_name_pattern = r"^obas-implant-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$"
for process_name in alert_data.get("parent_process_name", {}).get("data", []):
if process_name and (
re.match(obas_parent_process_name_pattern, process_name)
and signature_parent_process_name_value == process_name
):
return True
return False

def _decode_value(self, signature_value):
if _is_base64_encoded(signature_value):
try:
decoded_bytes = base64.b64decode(signature_value)
decoded_str = decoded_bytes.decode("utf-8")
return decoded_str
except Exception as e:
self.logger.error(str(e))
else:
return signature_value


def _is_base64_encoded(str_maybe_base64):
# Check if the length is a multiple of 4 and matches the Base64 character set
base64_pattern = re.compile(r"^[A-Za-z0-9+/]*={0,2}$")
return len(str_maybe_base64) % 4 == 0 and bool(
base64_pattern.match(str_maybe_base64)
)
Empty file added pyobas/test/__init__.py
Empty file.
128 changes: 128 additions & 0 deletions pyobas/test/test_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import unittest
from unittest.mock import MagicMock

from helpers import OpenBASDetectionHelper


class TestOpenBASDetectionHelper(unittest.TestCase):

def setUp(self):
self.mock_logger = MagicMock()
self.relevant_signatures_types = [
"parent_process_name",
"command_line",
"command_line_base64",
]
self.detection_helper = OpenBASDetectionHelper(
self.mock_logger, self.relevant_signatures_types
)

def test_should_match_alert_parent_process_name(self):
signatures = [
{
"type": "parent_process_name",
"value": "obas-implant-04942182-fb2f-41e3-a3c9-cb0eac1cd2d9",
}
]
alert_data = {
"parent_process_name": {
"type": "fuzzy",
"data": ["obas-implant-04942182-fb2f-41e3-a3c9-cb0eac1cd2d9"],
"score": 80,
}
}

result = self.detection_helper.match_alert_elements(signatures, alert_data)
self.assertTrue(result)

def test_should_not_match_alert_parent_process_when_empty(self):
signatures = [
{
"type": "parent_process_name",
"value": "obas-implant-04942182-fb2f-41e3-a3c9-cb0eac1cd2d9",
}
]
alert_data = {}

result = self.detection_helper.match_alert_elements(signatures, alert_data)
self.assertFalse(result)

def test_should_not_match_alert_parent_process_name(self):
signatures = [
{
"type": "parent_process_name",
"value": "obas-implant-04942182-fb2f-41e3-a3c9-cb0eac1cd2d9",
}
]
alert_data = {
"parent_process_name": {
"type": "fuzzy",
"data": ["obas-implant-44942182-fb2f-41e3-a3c9-cb0eac1cd2d9"],
"score": 80,
}
}

result = self.detection_helper.match_alert_elements(signatures, alert_data)
self.assertFalse(result)

def test_should_match_alert_parent_process_name_from_list(self):
signatures = [
{
"type": "parent_process_name",
"value": "obas-implant-04942182-fb2f-41e3-a3c9-cb0eac1cd2d9",
}
]
alert_data = {
"parent_process_name": {
"type": "fuzzy",
"data": [
"not_matching_process_name",
"obas-implant-44942182-fb2f-41e3-a3c9-cb0eac1cd2d9",
"obas-implant-04942182-fb2f-41e3-a3c9-cb0eac1cd2d9",
],
"score": 80,
}
}

result = self.detection_helper.match_alert_elements(signatures, alert_data)
self.assertTrue(result)

def test_should_not_match_alert_parent_process_name_from_non_matching_list(self):
signatures = [
{
"type": "parent_process_name",
"value": "obas-implant-04942182-fb2f-41e3-a3c9-cb0eac1cd2d9",
}
]
alert_data = {
"parent_process_name": {
"type": "fuzzy",
"data": ["not_matching_process_name", "not_matching_process_name"],
"score": 80,
}
}

result = self.detection_helper.match_alert_elements(signatures, alert_data)
self.assertFalse(result)

def test_should_not_match_alert_parent_process_name_from_list_with_None(self):
signatures = [
{
"type": "parent_process_name",
"value": "obas-implant-04942182-fb2f-41e3-a3c9-cb0eac1cd2d9",
}
]
alert_data = {
"parent_process_name": {
"type": "fuzzy",
"data": [None],
"score": 80,
}
}

result = self.detection_helper.match_alert_elements(signatures, alert_data)
self.assertFalse(result)


if __name__ == "__main__":
unittest.main()