-
Notifications
You must be signed in to change notification settings - Fork 1
/
MagicBuffer.py
66 lines (56 loc) · 1.94 KB
/
MagicBuffer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from collections import deque
class MagicBuffer:
"""Multi-key data buffer"""
def __init__(self, buffer_size=1, max_count=1024):
"""
Args:
buffer_size: Size of buffer.
max_count: Maximum number of update before dropping idle keys.
"""
self.__buffer_size = buffer_size
self.__max_count = max_count
self.__data = {}
"""key-data pair"""
self.__count = {}
"""counter"""
self.__n = 0
"""always increasing counter"""
def append(self, key:str, value):
"""Insert key and data to buffer. Return None if number of data points
is less than BUFFER_SIZE, otherwise return a tuple of data points
Args:
key: key
value: any object
Returns:
list(str, tuple(object)): key and a tuple of BUFFER_SIZE of data
"""
self.__n += 1
self.__cleanup()
# new key
if key not in self.__data.keys():
self.__data[key] = deque()
self.__count[key] = 0
# append data
self.__data[key].appendleft(value)
self.__count[key] = self.__n
# return None or return chunk of data
if len(self.__data[key]) >= self.__buffer_size:
value = tuple(self.__data[key].pop() for i in range(0, self.__buffer_size))
del self.__data[key]
del self.__count[key]
return key, value
else:
return None, None
def __cleanup(self):
"""Drop keys that are idle for too long"""
drop_keys = tuple(
key for key in self.__data.keys()
if self.__n - self.__count[key] > self.__max_count
or self.__n - self.__count[key] < 0
)
for key in drop_keys:
del self.__data[key]
del self.__count[key]
# reset counter if it gets too large
if self.__n > 99999999:
self.__n = 0