|
1 | 1 | import os |
2 | | -import base64 |
3 | 2 | from pathlib import Path |
4 | | -from typing import Type, TypeVar, Any |
| 3 | +from typing import Type, TypeVar |
5 | 4 |
|
6 | 5 | import capnp |
7 | 6 | from google.protobuf.message import Message |
8 | 7 |
|
9 | 8 | from xconn.client import connect_anonymous |
10 | | -from xconn import codec |
11 | | -from xconn.types import Event |
| 9 | +from xconn.codec import Codec |
| 10 | +from xconn.types import Event, OutgoingDataMessage, IncomingDataMessage |
12 | 11 | from tests.schemas.profile_pb2 import ProfileCreate, ProfileGet |
13 | 12 |
|
14 | 13 |
|
15 | | -class String(str): |
16 | | - pass |
| 14 | +T = TypeVar("T", bound=Message) |
17 | 15 |
|
18 | 16 |
|
19 | | -class Base64Codec(codec.Codec[String]): |
20 | | - def name(self) -> str: |
21 | | - return "base64" |
22 | | - |
23 | | - def encode(self, obj: String) -> str: |
24 | | - return base64.b64encode(obj.encode("utf-8")).decode("utf-8") |
25 | | - |
26 | | - def decode(self, data: str, out_type: Type[String]) -> String: |
27 | | - return out_type(base64.b64decode(data.encode("utf-8")).decode()) |
28 | | - |
29 | | - |
30 | | -def test_base64_codec(): |
31 | | - encoder = Base64Codec() |
32 | | - encoded = encoder.encode(String("hello")) |
33 | | - assert isinstance(encoded, str) |
34 | | - |
35 | | - decoded = encoder.decode(encoded, String) |
36 | | - assert isinstance(decoded, String) |
37 | | - assert decoded == "hello" |
38 | | - |
39 | | - |
40 | | -class ProtobufCodec(codec.Codec[Message]): |
| 17 | +class ProtobufCodec(Codec[T]): |
41 | 18 | def name(self) -> str: |
42 | 19 | return "protobuf" |
43 | 20 |
|
44 | | - def encode(self, obj: Message) -> bytes: |
45 | | - return obj.SerializeToString() |
| 21 | + def encode(self, obj: T) -> OutgoingDataMessage: |
| 22 | + payload = obj.SerializeToString() |
| 23 | + return OutgoingDataMessage(args=[payload], kwargs={}, details={}) |
46 | 24 |
|
47 | | - def decode(self, data: bytes, out_type: Type[Message]) -> Message: |
48 | | - msg = out_type() |
49 | | - msg.ParseFromString(data) |
| 25 | + def decode(self, msg: IncomingDataMessage, out_type: Type[T]) -> T: |
| 26 | + if len(msg.args) == 0 or not isinstance(msg.args[0], bytes): |
| 27 | + raise ValueError("ProtobufCodec: cannot decode, expected first arg to be bytes") |
50 | 28 |
|
51 | | - return msg |
| 29 | + obj = out_type() |
| 30 | + obj.ParseFromString(msg.args[0]) |
| 31 | + return obj |
52 | 32 |
|
53 | 33 |
|
54 | 34 | def test_rpc_object_protobuf(): |
@@ -78,20 +58,6 @@ def inv_handler(profile: ProfileCreate) -> ProfileGet: |
78 | 58 | session.leave() |
79 | 59 |
|
80 | 60 |
|
81 | | -def test_pubsub_object(): |
82 | | - session = connect_anonymous("ws://localhost:8080/ws", "realm1") |
83 | | - session.set_payload_codec(Base64Codec()) |
84 | | - |
85 | | - def event_handler(event: Event): |
86 | | - assert event.args[0] == "hello" |
87 | | - |
88 | | - session.subscribe_object("io.xconn.object", event_handler, String) |
89 | | - |
90 | | - session.publish_object("io.xconn.object", String("hello")) |
91 | | - |
92 | | - session.leave() |
93 | | - |
94 | | - |
95 | 61 | def test_pubsub_protobuf(): |
96 | 62 | session = connect_anonymous("ws://localhost:8080/ws", "realm1") |
97 | 63 | session.set_payload_codec(ProtobufCodec()) |
@@ -192,15 +158,19 @@ def get_profile_handler() -> ProfileGet: |
192 | 158 | UserGet = user_capnp.UserGet |
193 | 159 |
|
194 | 160 |
|
195 | | -class CapnpProtoCodec(codec.Codec[T]): |
| 161 | +class CapnpProtoCodec(Codec[T]): |
196 | 162 | def name(self) -> str: |
197 | 163 | return "capnproto" |
198 | 164 |
|
199 | | - def encode(self, obj: Any) -> bytes: |
200 | | - return obj.to_bytes_packed() |
| 165 | + def encode(self, obj: T) -> OutgoingDataMessage: |
| 166 | + payload = obj.to_bytes_packed() |
| 167 | + return OutgoingDataMessage(args=[payload], kwargs={}, details={}) |
| 168 | + |
| 169 | + def decode(self, msg: IncomingDataMessage, out_type: Type[T]) -> T: |
| 170 | + if len(msg.args) == 0 or not isinstance(msg.args[0], bytes): |
| 171 | + raise ValueError("CapnpProtoCodec: cannot decode, expected first arg to be bytes") |
201 | 172 |
|
202 | | - def decode(self, data: bytes, out_type: Type[T]) -> T: |
203 | | - return out_type.from_bytes_packed(data) |
| 173 | + return out_type.from_bytes_packed(msg.args[0]) |
204 | 174 |
|
205 | 175 |
|
206 | 176 | def test_rpc_object_capnproto(): |
|
0 commit comments