-
Notifications
You must be signed in to change notification settings - Fork 32
Open
Description
I just wanted to share this recipe for how you can setup the queue to only unpickle the latest item and to skip all other items. This is useful if you have processes running at different rates and you need one process to be able to efficiently drop messages that aren't the latest message, especially applicable for real-time applications.
This can be done quite simply with a wrapper class as shown, but maybe ppl might find it as a nice general feature? wrapping it into the core would get around having to do the slightly hacky thing with self.actual_loads
but it's no big deal either way:
import faster_fifo
import faster_fifo_reduction
import queue
class Queue(faster_fifo.Queue):
def __init__(self, *a, **kw):
super().__init__(*a, **kw)
# have the loads function be a noop
self.actual_loads = self.loads
self.loads = lambda x: x
def get_many(self, *a, raw=False, **kw):
xs = super().get_many(*a, **kw)
return xs if raw else [self.actual_loads(x) for x in xs]
def get_latest(self, block=True, **kw):
'''Get the latest value, don't waste time unpickling values
you're not going to use.
'''
xs = self.get_many(block=block, raw=True, **kw)
return self.actual_loads(xs[-1]) if xs else None
def get_latest_nowait(self):
return self.get_latest(block=False)
if __name__ == '__main__':
# have the deserializer print out so we know what's actually being unpickled
og_loads = Queue.loads
def loads(x):
x = og_loads(q, x)
print('loaded', x)
return x
# setup the queue
q = Queue(loads=loads)
for i in range(6):
print('put', i)
q.put(i)
# get the latest value
print(q.get_latest())
# no more values, this throws an error
try:
print(q.get_latest_nowait())
except queue.Empty:
print('no more messages, as expected.')
# try it again for good measure
for i in range(6):
print('put', i)
q.put(i)
print(q.get_latest_nowait())
An addition that could also maybe(?) be useful would be to get the N latest values?
Metadata
Metadata
Assignees
Labels
No labels