Skip to content

Commit

Permalink
add client api methods
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Mar 4, 2024
1 parent fceca82 commit b97a5eb
Show file tree
Hide file tree
Showing 6 changed files with 625 additions and 42 deletions.
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ api_key = "<CENTRIFUGO_API_KEY>"

client = Client(api_url, api_key)
request = PublishRequest(channel="channel", data={"input": "Hello world!"})
result = client.send(request)
result = client._send(request)
print(result)
```

Expand Down Expand Up @@ -100,17 +100,18 @@ Note, that `BroadcastRequest` and `BatchRequest` are quite special – since the

```python
from cent import *

c = Client("http://localhost:8000/api", "api_key")
req = BroadcastRequest(channels=["1", "2"], data={})
c.send(req)
c._send(req)
# BroadcastResult(
# responses=[
# Response[PublishResult](error=None, result=PublishResult(offset=7, epoch='rqKx')),
# Response[PublishResult](error=None, result=PublishResult(offset=7, epoch='nUrf'))
# ]
# )
req = BroadcastRequest(channels=["invalid:1", "2"], data={})
c.send(req)
c._send(req)
# BroadcastResult(
# responses=[
# Response[PublishResult](error=Error(code=102, message='unknown channel'), result=None),
Expand Down Expand Up @@ -172,15 +173,15 @@ def main():

start = time()
for request in requests:
client.send(request)
client._send(request)
print("sequential", time() - start)

start = time()
client.send(batch)
client._send(batch)
print("batch", time() - start)

start = time()
client.send(broadcast)
client._send(broadcast)
print("broadcast", time() - start)


Expand Down
4 changes: 2 additions & 2 deletions benchmarks/test_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

def sync_requests(client: Client) -> None:
channel_number = random.randint(0, 1000) # noqa: S311
client.send(
client._send(
PublishRequest(
channel=f"personal_{channel_number}",
data={"message": "Hello world!"},
Expand All @@ -16,7 +16,7 @@ def sync_requests(client: Client) -> None:

async def async_requests(client: AsyncClient) -> None:
channel_number = random.randint(0, 1000) # noqa: S311
await client.send(
await client.publish(
PublishRequest(
channel=f"personal_{channel_number}",
data={"message": "Hello world!"},
Expand Down
295 changes: 293 additions & 2 deletions cent/client/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,74 @@
from aiohttp import ClientSession

from cent.client.session import AiohttpSession
from cent.dto import CentRequest, CentResultType
from cent.dto import (
CentRequest,
CentResultType,
PublishResult,
PublishRequest,
BroadcastRequest,
BroadcastResult,
BatchResult,
BatchRequest,
CancelPushResult,
CancelPushRequest,
UpdatePushStatusResult,
UpdatePushStatusRequest,
SendPushNotificationResult,
SendPushNotificationRequest,
UserTopicUpdateResult,
UserTopicUpdateRequest,
UserTopicListResult,
UserTopicListRequest,
DeviceTopicUpdateResult,
DeviceTopicUpdateRequest,
DeviceTopicListResult,
DeviceTopicListRequest,
DeviceListResult,
DeviceListRequest,
DeviceRemoveResult,
DeviceRemoveRequest,
DeviceUpdateResult,
DeviceUpdateRequest,
DeviceRegisterResult,
DeviceRegisterRequest,
InvalidateUserTokensResult,
InvalidateUserTokensRequest,
RevokeTokenResult,
RevokeTokenRequest,
UnblockUserResult,
UnblockUserRequest,
BlockUserResult,
BlockUserRequest,
DeleteUserStatusResult,
DeleteUserStatusRequest,
GetUserStatusResult,
GetUserStatusRequest,
UpdateUserStatusResult,
UpdateUserStatusRequest,
ConnectionsResult,
ConnectionsRequest,
ChannelsResult,
ChannelsRequest,
RefreshResult,
RefreshRequest,
InfoResult,
InfoRequest,
HistoryRemoveResult,
HistoryRemoveRequest,
HistoryResult,
HistoryRequest,
PresenceStatsResult,
PresenceStatsRequest,
PresenceResult,
PresenceRequest,
DisconnectResult,
DisconnectRequest,
UnsubscribeResult,
UnsubscribeRequest,
SubscribeResult,
SubscribeRequest,
)


class AsyncClient:
Expand All @@ -27,7 +94,7 @@ def __init__(
session=session,
)

async def send(
async def _send(
self,
request: CentRequest[CentResultType],
timeout: Optional[float] = None,
Expand All @@ -43,6 +110,230 @@ async def send(
response = request.parse_response(content)
return cast(CentResultType, response.result)

async def publish(
self,
request: PublishRequest,
timeout: Optional[float] = None,
) -> PublishResult:
return await self._send(request, timeout=timeout)

async def broadcast(
self,
request: BroadcastRequest,
timeout: Optional[float] = None,
) -> BroadcastResult:
return await self._send(request, timeout=timeout)

async def subscribe(
self,
request: SubscribeRequest,
timeout: Optional[float] = None,
) -> SubscribeResult:
return await self._send(request, timeout=timeout)

async def unsubscribe(
self,
request: UnsubscribeRequest,
timeout: Optional[float] = None,
) -> UnsubscribeResult:
return await self._send(request, timeout=timeout)

async def disconnect(
self,
request: DisconnectRequest,
timeout: Optional[float] = None,
) -> DisconnectResult:
return await self._send(request, timeout=timeout)

async def presence(
self,
request: PresenceRequest,
timeout: Optional[float] = None,
) -> PresenceResult:
return await self._send(request, timeout=timeout)

async def presence_stats(
self,
request: PresenceStatsRequest,
timeout: Optional[float] = None,
) -> PresenceStatsResult:
return await self._send(request, timeout=timeout)

async def history(
self,
request: HistoryRequest,
timeout: Optional[float] = None,
) -> HistoryResult:
return await self._send(request, timeout=timeout)

async def history_remove(
self,
request: HistoryRemoveRequest,
timeout: Optional[float] = None,
) -> HistoryRemoveResult:
return await self._send(request, timeout=timeout)

async def info(
self,
request: InfoRequest,
timeout: Optional[float] = None,
) -> InfoResult:
return await self._send(request, timeout=timeout)

async def refresh(
self,
request: RefreshRequest,
timeout: Optional[float] = None,
) -> RefreshResult:
return await self._send(request, timeout=timeout)

async def channels(
self,
request: ChannelsRequest,
timeout: Optional[float] = None,
) -> ChannelsResult:
return await self._send(request, timeout=timeout)

async def connections(
self,
request: ConnectionsRequest,
timeout: Optional[float] = None,
) -> ConnectionsResult:
return await self._send(request, timeout=timeout)

async def update_user_status(
self,
request: UpdateUserStatusRequest,
timeout: Optional[float] = None,
) -> UpdateUserStatusResult:
return await self._send(request, timeout=timeout)

async def get_user_status(
self,
request: GetUserStatusRequest,
timeout: Optional[float] = None,
) -> GetUserStatusResult:
return await self._send(request, timeout=timeout)

async def delete_user_status(
self,
request: DeleteUserStatusRequest,
timeout: Optional[float] = None,
) -> DeleteUserStatusResult:
return await self._send(request, timeout=timeout)

async def block_user(
self,
request: BlockUserRequest,
timeout: Optional[float] = None,
) -> BlockUserResult:
return await self._send(request, timeout=timeout)

async def unblock_user(
self,
request: UnblockUserRequest,
timeout: Optional[float] = None,
) -> UnblockUserResult:
return await self._send(request, timeout=timeout)

async def revoke_token(
self,
request: RevokeTokenRequest,
timeout: Optional[float] = None,
) -> RevokeTokenResult:
return await self._send(request, timeout=timeout)

async def invalidate_user_tokens(
self,
request: InvalidateUserTokensRequest,
timeout: Optional[float] = None,
) -> InvalidateUserTokensResult:
return await self._send(request, timeout=timeout)

async def device_register(
self,
request: DeviceRegisterRequest,
timeout: Optional[float] = None,
) -> DeviceRegisterResult:
return await self._send(request, timeout=timeout)

async def device_update(
self,
request: DeviceUpdateRequest,
timeout: Optional[float] = None,
) -> DeviceUpdateResult:
return await self._send(request, timeout=timeout)

async def device_remove(
self,
request: DeviceRemoveRequest,
timeout: Optional[float] = None,
) -> DeviceRemoveResult:
return await self._send(request, timeout=timeout)

async def device_list(
self,
request: DeviceListRequest,
timeout: Optional[float] = None,
) -> DeviceListResult:
return await self._send(request, timeout=timeout)

async def device_topic_list(
self,
request: DeviceTopicListRequest,
timeout: Optional[float] = None,
) -> DeviceTopicListResult:
return await self._send(request, timeout=timeout)

async def device_topic_update(
self,
request: DeviceTopicUpdateRequest,
timeout: Optional[float] = None,
) -> DeviceTopicUpdateResult:
return await self._send(request, timeout=timeout)

async def user_topic_list(
self,
request: UserTopicListRequest,
timeout: Optional[float] = None,
) -> UserTopicListResult:
return await self._send(request, timeout=timeout)

async def user_topic_update(
self,
request: UserTopicUpdateRequest,
timeout: Optional[float] = None,
) -> UserTopicUpdateResult:
return await self._send(request, timeout=timeout)

async def send_push_notification(
self,
request: SendPushNotificationRequest,
timeout: Optional[float] = None,
) -> SendPushNotificationResult:
return await self._send(request, timeout=timeout)

async def update_push_status(
self,
request: UpdatePushStatusRequest,
timeout: Optional[float] = None,
) -> UpdatePushStatusResult:
return await self._send(request, timeout=timeout)

async def cancel_push(
self,
request: CancelPushRequest,
timeout: Optional[float] = None,
) -> CancelPushResult:
return await self._send(request, timeout=timeout)

async def batch(
self,
request: BatchRequest,
timeout: Optional[float] = None,
) -> BatchResult:
return await self._send(request, timeout=timeout)

async def close(self) -> None:
await self._session.close()

Expand Down
Loading

0 comments on commit b97a5eb

Please sign in to comment.