-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSimpleRedis.py
85 lines (69 loc) · 2.65 KB
/
SimpleRedis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from multiprocessing import shared_memory
class SimpleRedis:
def __init__(self):
self.SHARED_MEMORY_LENGTH_BYTES = 256
try:
self.shm = shared_memory.SharedMemory(
create=True,
size=self.SHARED_MEMORY_LENGTH_BYTES,
name="simple_redis", # noqa: E501
)
print("Created new shared memory")
except FileExistsError as e:
print(f"Got {e}. So re-using existing shared memory")
self.shm = shared_memory.SharedMemory(
create=False,
size=self.SHARED_MEMORY_LENGTH_BYTES,
name="simple_redis", # noqa: E501
)
def put(self, value: bytes):
"""Put into shared memory"""
# Verify of type bytes
if not isinstance(value, bytes):
msg = (
"You must pass a value of type bytes. "
"Tip: use variable.encode('utf-8') to encode strings to bytes"
)
raise Exception(msg)
# Get length of value
value_size = len(value)
# Validate length of value is <= SHARED_MEMORY_LENGTH_BYTES
if value_size > self.SHARED_MEMORY_LENGTH_BYTES:
raise Exception(
f"Refusing to put value larger than SHARED_MEMORY_LENGTH_BYTES: {self.SHARED_MEMORY_LENGTH_BYTES}" # noqa: E501
)
# Clear current shared memory
self.free()
# Store value in shared memory
self.shm.buf[:value_size] = value
def read(self, parse_int=False) -> bytes | int:
"""Prints & returns contents of shared memory
parse_int: bool Try to parse shared memory value into an int
and return it. Default false.
"""
if parse_int:
contents = int(self.read().decode("utf-8").rstrip("\x00"))
else:
contents = bytes(self.shm.buf)
return contents
def debug_shm(self):
"""Puts a debug string into the shared memory & prints it"""
self.shm.buf[:] = b"hello12345"
print(bytes(self.shm.buf))
def free(self):
"""Clears the shared memory and set it all to
empty bytes (\x00\00) of lenth
SHARED_MEMORY_LENGTH_BYTES
Essentially calloc.
"""
self.shm.buf[:] = bytes(self.SHARED_MEMORY_LENGTH_BYTES)
def destroy(self):
"""Destroy the shared memory"""
self.shm.close()
self.shm.unlink()
def __del__(self):
pass
# self.shm.close()
# self.shm.unlink() # Probably not needed/here be dragons
# Who decides when we don't need the shared memory :)
simple_redis = SimpleRedis()