Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
improve typing
Browse files Browse the repository at this point in the history
alex-oleshkevich committed Aug 23, 2024
1 parent 7d58010 commit f11024e
Showing 4 changed files with 13 additions and 7 deletions.
4 changes: 2 additions & 2 deletions broadcaster/_backends/base.py
Original file line number Diff line number Diff line change
@@ -13,10 +13,10 @@ async def connect(self) -> None:
async def disconnect(self) -> None:
raise NotImplementedError()

async def subscribe(self, group: str) -> None:
async def subscribe(self, channel: str) -> None:
raise NotImplementedError()

async def unsubscribe(self, group: str) -> None:
async def unsubscribe(self, channel: str) -> None:
raise NotImplementedError()

async def publish(self, channel: str, message: Any) -> None:
12 changes: 9 additions & 3 deletions broadcaster/_backends/kafka.py
Original file line number Diff line number Diff line change
@@ -18,8 +18,8 @@ def __init__(self, urls: str | list[str]) -> None:
self._ready = asyncio.Event()

async def connect(self) -> None:
self._producer = AIOKafkaProducer(bootstrap_servers=self._servers)
self._consumer = AIOKafkaConsumer(bootstrap_servers=self._servers)
self._producer = AIOKafkaProducer(bootstrap_servers=self._servers) # pyright: ignore
self._consumer = AIOKafkaConsumer(bootstrap_servers=self._servers) # pyright: ignore
await self._producer.start()
await self._consumer.start()

@@ -41,7 +41,13 @@ async def publish(self, channel: str, message: typing.Any) -> None:
async def next_published(self) -> Event:
await self._ready.wait()
message = await self._consumer.getone()
return Event(channel=message.topic, message=message.value.decode("utf8"))
value = message.value

# for type compatibility:
# we declare Event.message as str, so convert None to empty string
if value is None:
value = b""
return Event(channel=message.topic, message=value.decode("utf8"))

async def _wait_for_assignment(self) -> None:
"""Wait for the consumer to be assigned to the partition."""
2 changes: 1 addition & 1 deletion broadcaster/_backends/redis.py
Original file line number Diff line number Diff line change
@@ -65,7 +65,7 @@ async def _pubsub_listener(self) -> None:
class RedisStreamBackend(BroadcastBackend):
def __init__(self, url: str):
url = url.replace("redis-stream", "redis", 1)
self.streams: dict[str, str] = {}
self.streams: dict[bytes | str | memoryview, int | bytes | str | memoryview] = {}
self._ready = asyncio.Event()
self._producer = redis.Redis.from_url(url)
self._consumer = redis.Redis.from_url(url)
2 changes: 1 addition & 1 deletion broadcaster/_base.py
Original file line number Diff line number Diff line change
@@ -110,7 +110,7 @@ class Subscriber:
def __init__(self, queue: asyncio.Queue[Event | None]) -> None:
self._queue = queue

async def __aiter__(self) -> AsyncGenerator[Event | None, None] | None:
async def __aiter__(self) -> AsyncGenerator[Event | None, None]:
try:
while True:
yield await self.get()

0 comments on commit f11024e

Please sign in to comment.