-
Notifications
You must be signed in to change notification settings - Fork 42
/
Copy pathshmem.py
73 lines (59 loc) · 2.24 KB
/
shmem.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
"""Shared memory with memory mapping"""
import ctypes
import mmap
import os
from functools import reduce
import numpy as np
from holodeck.exceptions import HolodeckException
class Shmem:
"""Implementation of shared memory
Args:
name (:obj:`str`): Name the points to the beginning of the shared memory block
shape (:obj:`int`): Shape of the memory block
dtype (type, optional): data type of the shared memory. Defaults to np.float32
uuid (:obj:`str`, optional): UUID of the memory block. Defaults to ""
"""
_numpy_to_ctype = {
np.float32: ctypes.c_float,
np.uint8: ctypes.c_uint8,
np.bool: ctypes.c_bool,
np.byte: ctypes.c_byte,
}
def __init__(self, name, shape, dtype=np.float32, uuid=""):
self.shape = shape
self.dtype = dtype
size = reduce(lambda x, y: x * y, shape)
size_bytes = np.dtype(dtype).itemsize * size
self._mem_path = None
self._mem_pointer = None
if os.name == "nt":
self._mem_path = "/HOLODECK_MEM" + uuid + "_" + name
self._mem_pointer = mmap.mmap(0, size_bytes, self._mem_path)
elif os.name == "posix":
self._mem_path = "/dev/shm/HOLODECK_MEM" + uuid + "_" + name
f = os.open(self._mem_path, os.O_CREAT | os.O_TRUNC | os.O_RDWR)
os.ftruncate(f, size_bytes)
os.fsync(f)
self._mem_pointer = mmap.mmap(f, size_bytes)
os.close(
f
) # we don't need the file descriptor to stay open. see the man page
else:
raise HolodeckException("Currently unsupported os: " + os.name)
self.np_array = np.ndarray(shape, dtype=dtype)
self.np_array.data = (Shmem._numpy_to_ctype[dtype] * size).from_buffer(
self._mem_pointer
)
def unlink(self):
"""unlinks the shared memory"""
if os.name == "posix":
self.__linux_unlink__()
elif os.name == "nt":
self.__windows_unlink__()
else:
raise HolodeckException("Currently unsupported os: " + os.name)
def __linux_unlink__(self):
del self.np_array
os.remove(self._mem_path)
def __windows_unlink__(self):
pass