|
1 | 1 | import base64 |
2 | | -from typing import Type |
| 2 | +from pathlib import Path |
| 3 | +from typing import Type, TypeVar, Any |
3 | 4 |
|
| 5 | +import capnp |
4 | 6 | from google.protobuf.message import Message |
5 | 7 |
|
6 | 8 | from xconn.client import connect_anonymous |
@@ -179,3 +181,109 @@ def get_profile_handler() -> ProfileGet: |
179 | 181 | assert profile.created_at == "2025-10-30T17:00:00Z" |
180 | 182 |
|
181 | 183 | session.leave() |
| 184 | + |
| 185 | + |
| 186 | +T = TypeVar("T") |
| 187 | +SCHEMA_PATH = Path(__file__).parent / "user.capnp" |
| 188 | +user_capnp = capnp.load(str(SCHEMA_PATH)) |
| 189 | + |
| 190 | +UserCreate = user_capnp.UserCreate |
| 191 | +UserGet = user_capnp.UserGet |
| 192 | + |
| 193 | + |
| 194 | +class CapnpProtoCodec(codec.Codec[T]): |
| 195 | + def name(self) -> str: |
| 196 | + return "capnproto" |
| 197 | + |
| 198 | + def encode(self, obj: Any) -> bytes: |
| 199 | + return obj.to_bytes_packed() |
| 200 | + |
| 201 | + def decode(self, data: bytes, out_type: Type[T]) -> T: |
| 202 | + return out_type.from_bytes_packed(data) |
| 203 | + |
| 204 | + |
| 205 | +def test_register_object_capnproto(): |
| 206 | + session = connect_anonymous("ws://localhost:8080/ws", "realm1") |
| 207 | + session.set_payload_codec(CapnpProtoCodec()) |
| 208 | + |
| 209 | + def create_handler(user_create: UserCreate) -> UserGet: |
| 210 | + user_get = UserGet.new_message() |
| 211 | + user_get.id = 999 |
| 212 | + user_get.name = user_create.name |
| 213 | + user_get.email = user_create.email |
| 214 | + user_get.age = user_create.age |
| 215 | + user_get.isAdmin = False |
| 216 | + |
| 217 | + return user_get |
| 218 | + |
| 219 | + session.register_object("io.xconn.user.create", create_handler) |
| 220 | + |
| 221 | + new_user = UserCreate.new_message() |
| 222 | + new_user.name = "john" |
| 223 | + new_user. email = "[email protected]" |
| 224 | + new_user.age = 35 |
| 225 | + |
| 226 | + result = session.call("io.xconn.user.create", [new_user.to_bytes_packed()]) |
| 227 | + user = UserGet.from_bytes_packed(result.args[0]) |
| 228 | + |
| 229 | + assert user.id == 999 |
| 230 | + assert user.name == "john" |
| 231 | + assert user. email == "[email protected]" |
| 232 | + assert user.age == 35 |
| 233 | + assert not user.isAdmin |
| 234 | + |
| 235 | + session.leave() |
| 236 | + |
| 237 | + |
| 238 | +def test_call_object_capnproto(): |
| 239 | + session = connect_anonymous("ws://localhost:8080/ws", "realm1") |
| 240 | + session.set_payload_codec(CapnpProtoCodec()) |
| 241 | + |
| 242 | + def invocation_handler(inv: Invocation) -> Result: |
| 243 | + user_create = UserCreate.from_bytes_packed(inv.args[0]) |
| 244 | + |
| 245 | + user_get = UserGet.new_message() |
| 246 | + user_get.id = 78 |
| 247 | + user_get.name = user_create.name |
| 248 | + user_get.email = user_create.email |
| 249 | + user_get.age = user_create.age |
| 250 | + user_get.isAdmin = True |
| 251 | + |
| 252 | + return Result(args=[user_get.to_bytes_packed()]) |
| 253 | + |
| 254 | + session.register("io.xconn.user.create", invocation_handler) |
| 255 | + new_user = UserCreate.new_message() |
| 256 | + new_user.name = "alice" |
| 257 | + new_user. email = "[email protected]" |
| 258 | + new_user.age = 23 |
| 259 | + |
| 260 | + result: UserGet = session.call_object("io.xconn.user.create", new_user, UserGet) |
| 261 | + assert result.id == 78 |
| 262 | + assert result.name == "alice" |
| 263 | + assert result. email == "[email protected]" |
| 264 | + assert result.age == 23 |
| 265 | + assert result.isAdmin |
| 266 | + |
| 267 | + session.leave() |
| 268 | + |
| 269 | + |
| 270 | +def test_pubsub_capnproto(): |
| 271 | + session = connect_anonymous("ws://localhost:8080/ws", "realm1") |
| 272 | + session.set_payload_codec(CapnpProtoCodec()) |
| 273 | + |
| 274 | + def event_handler(event: Event): |
| 275 | + user: UserGet = event.args[0] |
| 276 | + assert user.name == "alice" |
| 277 | + assert user. email == "[email protected]" |
| 278 | + assert user.age == 21 |
| 279 | + |
| 280 | + session.subscribe_object("io.xconn.object", event_handler, UserCreate) |
| 281 | + |
| 282 | + new_user = UserCreate.new_message() |
| 283 | + new_user.name = "alice" |
| 284 | + new_user. email = "[email protected]" |
| 285 | + new_user.age = 21 |
| 286 | + |
| 287 | + session.publish_object("io.xconn.object", new_user) |
| 288 | + |
| 289 | + session.leave() |
0 commit comments