@@ -21,7 +21,7 @@ class SecretsManager:
21
21
22
22
def __init__ (self ):
23
23
self .crypto = CodeGateCrypto ()
24
- self ._session_store : dict [str , SecretEntry ] = {}
24
+ self ._session_store : dict [str , dict [ str , SecretEntry ] ] = {}
25
25
self ._encrypted_to_session : dict [str , str ] = {} # Reverse lookup index
26
26
27
27
def store_secret (self , value : str , service : str , secret_type : str , session_id : str ) -> str :
@@ -41,12 +41,14 @@ def store_secret(self, value: str, service: str, secret_type: str, session_id: s
41
41
encrypted_value = self .crypto .encrypt_token (value , session_id )
42
42
43
43
# Store mappings
44
- self ._session_store [session_id ] = SecretEntry (
44
+ session_secrets = self ._session_store .get (session_id , {})
45
+ session_secrets [encrypted_value ] = SecretEntry (
45
46
original = value ,
46
47
encrypted = encrypted_value ,
47
48
service = service ,
48
49
secret_type = secret_type ,
49
50
)
51
+ self ._session_store [session_id ] = session_secrets
50
52
self ._encrypted_to_session [encrypted_value ] = session_id
51
53
52
54
logger .debug ("Stored secret" , service = service , type = secret_type , encrypted = encrypted_value )
@@ -58,7 +60,9 @@ def get_original_value(self, encrypted_value: str, session_id: str) -> Optional[
58
60
try :
59
61
stored_session_id = self ._encrypted_to_session .get (encrypted_value )
60
62
if stored_session_id == session_id :
61
- return self ._session_store [session_id ].original
63
+ session_secrets = self ._session_store [session_id ].get (encrypted_value )
64
+ if session_secrets :
65
+ return session_secrets .original
62
66
except Exception as e :
63
67
logger .error ("Error retrieving secret" , error = str (e ))
64
68
return None
@@ -71,9 +75,10 @@ def cleanup(self):
71
75
"""Securely wipe sensitive data"""
72
76
try :
73
77
# Convert and wipe original values
74
- for entry in self ._session_store .values ():
75
- original_bytes = bytearray (entry .original .encode ())
76
- self .crypto .wipe_bytearray (original_bytes )
78
+ for secrets in self ._session_store .values ():
79
+ for entry in secrets .values ():
80
+ original_bytes = bytearray (entry .original .encode ())
81
+ self .crypto .wipe_bytearray (original_bytes )
77
82
78
83
# Clear the dictionaries
79
84
self ._session_store .clear ()
@@ -92,9 +97,9 @@ def cleanup_session(self, session_id: str):
92
97
"""
93
98
try :
94
99
# Get the secret entry for the session
95
- entry = self ._session_store .get (session_id )
100
+ secrets = self ._session_store .get (session_id , {} )
96
101
97
- if entry :
102
+ for entry in secrets . values () :
98
103
# Securely wipe the original value
99
104
original_bytes = bytearray (entry .original .encode ())
100
105
self .crypto .wipe_bytearray (original_bytes )
0 commit comments