Skip to content

Commit

Permalink
refactor layout
Browse files Browse the repository at this point in the history
  • Loading branch information
nekufa committed Sep 13, 2023
1 parent d141b81 commit cf2c985
Show file tree
Hide file tree
Showing 10 changed files with 285 additions and 199 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ When a handler is described you can use queue and worker api to manage and proce
```py
from asyncio import gather
from notifications import NotifyHandler, NotifyRequest
from sharded_queue import Queue, RuntimeLock, RuntimeStorage, Worker
from sharded_queue import Queue, Worker
from sharded_queue.drivers import RuntimeLock, RuntimeStorage


async def example():
Expand Down Expand Up @@ -100,6 +101,7 @@ There are several implementations of components:
- `RedisStorage` persist msgs using lists and lrange/lpop/rpush api
- `RuntimeLock` persist locks in memory (process in-memory distribution)
- `RuntimeStorage` persist msgs in dict (process in-memory distribution)

## Handler lifecycle

As you can notice, routing is made using static method, but perform is an instance method. When a worker start processing requests it can bootstrap and tear down the handler using `start` and `stop` methods
Expand Down Expand Up @@ -172,7 +174,9 @@ Worker pause in seconds on empty queue
You can import and change settings manually
```py
from sharded_queue import settings
from sharded_queue.settings import settings
from sharded_queue import Queue, Worker
from sharded_queue.drivers import RuntimeLock RuntimeStorage
settings.worker_acquire_delay = 5
settings.worker_batch_size = 64
Expand Down
203 changes: 11 additions & 192 deletions sharded_queue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,69 +3,16 @@
from dataclasses import dataclass
from functools import cache
from importlib import import_module
from json import dumps, loads
from typing import (AsyncGenerator, Generic, List, NamedTuple, Optional,
Protocol, Self, Sequence, TypeVar, get_type_hints)
from typing import (AsyncGenerator, Generic, NamedTuple, Optional, Self,
TypeVar, get_type_hints)

from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
from redis.asyncio import Redis
from sharded_queue.drivers import JsonTupleSerializer
from sharded_queue.protocols import Lock, Serializer, Storage
from sharded_queue.settings import settings

T = TypeVar('T')


class ShardedQueueSettings(BaseSettings):
default_priority: int = Field(
default='0',
title='Default queue priority'
)

default_thread: int = Field(
default='0',
title='Default queue thread'
)

lock_prefix: str = Field(
default="lock_",
title="Lock key prefix"
)

lock_timeout: int = Field(
default=24*60*60,
title="Lock key ttl"
)

model_config = SettingsConfigDict(env_prefix='queue_')

tube_prefix: str = Field(
default="tube_",
title="Queue prefix"
)

worker_acquire_delay: float = Field(
default=1,
title="Worker acquire delay in seconds on empty queues"
)

worker_batch_size: int = Field(
default=128,
title='Worker batch processing size'
)

worker_empty_limit: int = Field(
default=16,
title="Worker empty queue attempt limit berfore queue rebind",
)

worker_empty_pause: float = Field(
default=0.1,
title="Worker pause in seconds on empty queue",
)


settings = ShardedQueueSettings()


class Route(NamedTuple):
thread: int = settings.default_thread
priority: int = settings.default_priority
Expand Down Expand Up @@ -129,48 +76,13 @@ async def context(self) -> AsyncGenerator:
await instance.stop()


class Serializer(Protocol[T]):
def serialize(self, request: T) -> str:
raise NotImplementedError

def unserialize(self, cls: type[T], source: str) -> T:
raise NotImplementedError


class JsonTupleSerializer(Serializer):
def serialize(self, request: T) -> str:
if isinstance(request, Sequence):
values = [k for k in request]
else:
values = list(request.__dict__)

return dumps(values)

def unserialize(self, cls: type[T], source: str) -> T:
return cls(*loads(source))


class Storage(Protocol):
async def append(self, tube: str, *msgs: str) -> int:
raise NotImplementedError

async def length(self, tube: str) -> int:
raise NotImplementedError

async def pipes(self) -> list[str]:
raise NotImplementedError

async def pop(self, tube: str, max: int) -> list[str]:
raise NotImplementedError

async def range(self, tube: str, max: int) -> list[str]:
raise NotImplementedError


@dataclass
class Queue(Generic[T]):
storage: Storage
serializer: Serializer = JsonTupleSerializer()
def __init__(
self, storage: Storage, serializer: Optional[Serializer] = None
):
self.storage = storage
self.serializer = serializer or JsonTupleSerializer()

async def register(self, handler: type[Handler], *requests: T) -> None:
routes = await handler.route(*requests)
Expand All @@ -187,14 +99,6 @@ async def register(self, handler: type[Handler], *requests: T) -> None:
])


class Lock(Protocol):
async def acquire(self, tube: str) -> bool:
raise NotImplementedError

async def release(self, tube: str) -> None:
raise NotImplementedError


@dataclass
class Worker:
lock: Lock
Expand Down Expand Up @@ -281,90 +185,5 @@ async def process(self, tube: Tube, limit: Optional[int] = None) -> int:
await sleep(settings.worker_empty_pause)

await self.lock.release(tube.pipe)
return processed_counter


class RuntimeLock(Lock):
def __init__(self) -> None:
self.storage: dict[str, bool] = {}

async def acquire(self, pipe: str) -> bool:
if pipe in self.storage:
return False
self.storage[pipe] = True
return True

async def release(self, pipe: str) -> None:
del self.storage[pipe]


class RuntimeStorage(Storage):
data: dict[str, List[str]]

def __init__(self) -> None:
self.data = {}

async def append(self, tube: str, *msgs: str) -> int:
if tube not in self.data:
self.data[tube] = list(msgs)
else:
self.data[tube].extend(list(msgs))

return len(self.data[tube])

async def length(self, tube: str) -> int:
return len(self.data[tube]) if tube in self.data else 0

async def pop(self, tube: str, max: int) -> list[str]:
res = await self.range(tube, max)
if len(res):
self.data[tube] = self.data[tube][len(res):]
return res

async def pipes(self) -> list[str]:
return list(self.data.keys())

async def range(self, tube: str, max: int) -> list[str]:
return self.data[tube][0:max] if tube in self.data else []


class RedisLock(Lock):
def __init__(self, redis: Redis) -> None:
self.redis = redis

async def acquire(self, tube: str) -> bool:
return None is not await self.redis.set(
name=settings.lock_prefix + tube,
ex=settings.lock_timeout,
nx=True,
value=1,
)

async def release(self, tube: str) -> None:
await self.redis.delete(settings.lock_prefix + tube)


class RedisStorage(Storage):
def __init__(self, redis: Redis) -> None:
self.redis = redis

async def append(self, tube: str, *msgs: str) -> int:
return await self.redis.rpush(self.key(tube), *msgs)

def key(self, tube):
return settings.tube_prefix + tube

async def length(self, tube: str) -> int:
return await self.redis.llen(self.key(tube))

async def pipes(self) -> list[str]:
return [
key[len(settings.tube_prefix):]
for key in await self.redis.keys(self.key('*'))
]

async def pop(self, tube: str, max: int) -> list[str]:
return await self.redis.lpop(self.key(tube), max) or []

async def range(self, tube: str, max: int) -> list[str]:
return await self.redis.lrange(self.key(tube), 0, max-1) or []
return processed_counter
108 changes: 108 additions & 0 deletions sharded_queue/drivers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from json import dumps, loads
from typing import List, Sequence, TypeVar

from redis.asyncio import Redis

from sharded_queue.protocols import Lock, Serializer, Storage
from sharded_queue.settings import settings

T = TypeVar('T')


class JsonTupleSerializer(Serializer):
def serialize(self, request: T) -> str:
if isinstance(request, Sequence):
values = [k for k in request]
else:
values = list(request.__dict__.values())

return dumps(values)

def unserialize(self, cls: type[T], source: str) -> T:
return cls(*loads(source))


class RuntimeLock(Lock):
def __init__(self) -> None:
self.storage: dict[str, bool] = {}

async def acquire(self, pipe: str) -> bool:
if pipe in self.storage:
return False
self.storage[pipe] = True
return True

async def release(self, pipe: str) -> None:
del self.storage[pipe]


class RuntimeStorage(Storage):
data: dict[str, List[str]]

def __init__(self) -> None:
self.data = {}

async def append(self, tube: str, *msgs: str) -> int:
if tube not in self.data:
self.data[tube] = list(msgs)
else:
self.data[tube].extend(list(msgs))

return len(self.data[tube])

async def length(self, tube: str) -> int:
return len(self.data[tube]) if tube in self.data else 0

async def pop(self, tube: str, max: int) -> list[str]:
res = await self.range(tube, max)
if len(res):
self.data[tube] = self.data[tube][len(res):]
return res

async def pipes(self) -> list[str]:
return list(self.data.keys())

async def range(self, tube: str, max: int) -> list[str]:
return self.data[tube][0:max] if tube in self.data else []


class RedisLock(Lock):
def __init__(self, redis: Redis) -> None:
self.redis = redis

async def acquire(self, tube: str) -> bool:
return None is not await self.redis.set(
name=settings.lock_prefix + tube,
ex=settings.lock_timeout,
nx=True,
value=1,
)

async def release(self, tube: str) -> None:
await self.redis.delete(settings.lock_prefix + tube)


class RedisStorage(Storage):
def __init__(self, redis: Redis) -> None:
self.redis = redis

async def append(self, tube: str, *msgs: str) -> int:
return await self.redis.rpush(self.key(tube), *msgs)

def key(self, tube):
return settings.tube_prefix + tube

async def length(self, tube: str) -> int:
return await self.redis.llen(self.key(tube))

async def pipes(self) -> list[str]:
return [
key[len(settings.tube_prefix):]
for key in await self.redis.keys(self.key('*'))
]

async def pop(self, tube: str, max: int) -> list[str]:
return await self.redis.lpop(self.key(tube), max) or []

async def range(self, tube: str, max: int) -> list[str]:
return await self.redis.lrange(self.key(tube), 0, max-1) or []
Loading

0 comments on commit cf2c985

Please sign in to comment.