Skip to content

Commit ebe4af2

Browse files
committed
recurrent interval update per handler and msg
1 parent 5e569f1 commit ebe4af2

File tree

2 files changed

+33
-6
lines changed

2 files changed

+33
-6
lines changed

sharded_queue/__init__.py

+13
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,23 @@ async def register(
102102

103103
if recurrent:
104104
if_not_exists = True
105+
tube = Tube(RecurrentHandler, Route())
106+
length = await self.storage.length(tube.pipe)
107+
messages = await self.storage.range(tube.pipe, length)
108+
105109
pipe_messages = RecurrentHandler.transform(
106110
pipe_messages, recurrent, self.serializer
107111
)
108112

113+
recurrent_tuples = [
114+
(request.pipe, request.msg) for (_, request) in pipe_messages
115+
]
116+
117+
for msg in reversed(messages):
118+
request = self.serializer.deserialize(RecurrentRequest, msg)
119+
if (request.pipe, request.msg) in recurrent_tuples:
120+
await self.storage.remove(tube.pipe, msg)
121+
109122
if defer:
110123
if_not_exists = True
111124
pipe_messages = DeferredHandler.transform(

tests/test_recurrent.py

+20-6
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
from asyncio import sleep
2-
from datetime import timedelta
2+
from datetime import datetime, timedelta
33
from typing import NamedTuple
44

55
from pytest import mark
66

7-
from sharded_queue import (DeferredHandler, Handler, Queue, RecurrentHandler,
8-
Route, Tube, Worker)
7+
from sharded_queue import (DeferredHandler, DeferredRequest, Handler, Queue,
8+
RecurrentHandler, RecurrentRequest, Route, Tube,
9+
Worker)
910
from sharded_queue.drivers import RuntimeLock, RuntimeStorage
1011
from sharded_queue.protocols import Lock, Storage
1112

@@ -41,7 +42,7 @@ async def stats() -> tuple[int, int, int]:
4142
)
4243

4344
await queue.register(
44-
ValidateAccess, CompanyRequest(1), recurrent=timedelta(milliseconds=10)
45+
ValidateAccess, CompanyRequest(1), recurrent=timedelta(seconds=1)
4546
)
4647
assert await stats() == (0, 1, 0), 'recurrent pipe contains request'
4748

@@ -58,7 +59,7 @@ async def stats() -> tuple[int, int, int]:
5859
await Worker(lock, queue).loop(1, handler=RecurrentHandler)
5960
assert await stats() == (1, 1, 0), 'no deffered duplicates'
6061

61-
await sleep(0.01)
62+
await sleep(1)
6263

6364
await lock.release(recurrent_pipe)
6465
await Worker(lock, queue).loop(1, handler=RecurrentHandler)
@@ -71,7 +72,20 @@ async def stats() -> tuple[int, int, int]:
7172
await worker.loop(1, handler=RecurrentHandler)
7273
assert await stats() == (1, 1, 1), 'deferred added'
7374

74-
await sleep(0.01)
75+
await sleep(1)
7576

7677
await worker.loop(1, handler=DeferredHandler)
7778
assert await stats() == (0, 1, 1), 'no validation duplicate'
79+
80+
[recurrent] = await queue.storage.range(recurrent_pipe, 1)
81+
request = queue.serializer.deserialize(RecurrentRequest, recurrent)
82+
assert request.interval == 1
83+
await queue.register(
84+
ValidateAccess, CompanyRequest(1), recurrent=timedelta(seconds=2)
85+
)
86+
87+
assert await queue.storage.length(recurrent_pipe) == 1
88+
89+
[recurrent] = await queue.storage.range(recurrent_pipe, 1)
90+
request = queue.serializer.deserialize(RecurrentRequest, recurrent)
91+
assert request.interval == 2

0 commit comments

Comments
 (0)