Skip to content

Commit

Permalink
worker lock prolongate
Browse files Browse the repository at this point in the history
  • Loading branch information
nekufa committed Sep 18, 2023
1 parent 7c43827 commit db60246
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 7 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,10 @@ Worker pause in seconds on empty queue

You can import and change settings manually
```py
from sharded_queue.settings import settings
from sharded_queue import Queue, Worker
from sharded_queue.drivers import RuntimeLock RuntimeStorage
from sharded_queue.drivers import RuntimeLock, RuntimeStorage
from sharded_queue.settings import settings

settings.worker_acquire_delay = 5
settings.worker_batch_size = 64

Expand Down
14 changes: 10 additions & 4 deletions sharded_queue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,16 @@ async def process(self, tube: Tube, limit: Optional[int] = None) -> int:
break
await sleep(settings.worker_empty_pause)

if tube.handler is DeferredHandler:
await self.lock.ttl(tube.pipe, settings.deferred_retry_delay)
else:
await self.lock.release(tube.pipe)
if tube.handler is DeferredHandler:
await self.lock.ttl(
key=tube.pipe,
ttl=settings.deferred_retry_delay,
)
return processed_counter
else:
await self.lock.ttl(tube.pipe, settings.lock_timeout)

await self.lock.release(tube.pipe)

return processed_counter

Expand Down
35 changes: 34 additions & 1 deletion tests/test_lock.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,41 @@
from asyncio import gather, sleep

from pytest import mark
from redis.asyncio import Redis

from sharded_queue.drivers import RedisLock, RuntimeLock
from sharded_queue import Handler, Queue, Route, Tube, Worker
from sharded_queue.drivers import RedisLock, RuntimeLock, RuntimeStorage
from sharded_queue.protocols import Lock
from sharded_queue.settings import settings


class Request:
...


class TestHandler(Handler):
async def handle(self, *requests: Request) -> None:
...


@mark.asyncio
async def test_lock_prolongate() -> None:
queue: Queue = Queue(RuntimeStorage())
redis: Redis = Redis(decode_responses=True)
await redis.flushall()
worker: Worker = Worker(RedisLock(redis), queue)
pipe: str = settings.lock_prefix + Tube(TestHandler, Route()).pipe

async def register_task():
ttl1 = await redis.ttl(pipe)
await sleep(1)
ttl2 = await redis.ttl(pipe)
assert ttl1 == ttl2, 'ttl is updated'
await queue.register(TestHandler, Request())

await queue.register(TestHandler, Request())
await gather(worker.loop(2), register_task())
await redis.close()


@mark.asyncio
Expand Down

0 comments on commit db60246

Please sign in to comment.