A thin, intuitive, idiomatic and high-performance API for Azure Web PubSub protobuf subprotocol.
Azure Web PubSub protobuf subprotocol is super awesome and general purpose and I can see endless applications for this new service from Azure. The message-based nature of its "API" is not very intuitive or idiomatic for a dotnet developer though, I think.
I wanted to create a super thin layer on top that didn't incur unnecessary allocations or buffer handling or extra threads, since that would detract from the amazing work on performance that .NET 6 brings to the table. I use the best practices for sending binary payloads using low-level (and quite new!) protobuf APIs for avoiding unnecessary buffer creation/handling.
In order to also squeeze every bit of performance, this project uses the protobuf subprotocol exclusively, even though there is support in the service for JSON payloads.
The actual binary payloads you send/receive can of course be decoded into any format you need, including JSON if you just encode/decode it as UTF8 bytes.
First acquire a proper client access URI for Azure Web PubSub using the official client API, such as:
var serviceClient = new WebPubSubServiceClient([WEB_PUB_SUB_CONNECTION_STRING], [HUB_NAME]);
var serviceUri = serviceClient.GenerateClientAccessUri(
userId: Guid.NewGuid().ToString("N"),
roles: new[]
{
"webpubsub.joinLeaveGroup",
"webpubsub.sendToGroup"
});
Next simply connect the WebSocketeer
:
await using IWebSocketeer socketeer = WebSocketeer.ConnectAsync(serviceUri);
NOTE: the
IWebSocketeer
interface implements bothIAsyncDisposable
, which allows theawait using
pattern above, but also the regularIDisposable
interface. The former will perform a gracefulWebSocket
disconnect/close. The latter will simply dispose the underlyingWebSocket
.
At this point, the socketeer
variable contains a properly connected
Web PubSub client, and you can inspect its ConnectionId
and UserId
properties, for example.
Next step is perhaps to join some groups:
IWebSocketeerGroup group = await socketeer.JoinAsync("MyGroup");
The IWebSocketeerGroup
is an observable of ReadOnlyMemory<byte>
, exposing
the incoming messages to that group, and it also provides a
SendAsync(ReadOnlyMemory<byte> message)
method to post messages to the group.
To write all incoming messages for the group to the console, you could write:
using var subscription = group.Subscribe(bytes =>
Console.WriteLine(Encoding.UTF8.GetString(bytes.Span)));
In order to start processing incoming messages, though, you need to start
the socketeer "message loop" first. This would typically be done on a separate thread,
using Task.Run
, for example:
var started = Task.Run(() => socketeer.RunAsync());
The returned task from RunAsync
will remain in progress until the socketeer is disposed,
or the underlying WebSocket
is closed (either by the client or the server), or when an
optional cancellation token passed to it is cancelled.
You can also send messages to a group you haven't joined (provided the roles
specified when opening the connection allow it) via the IWebSocketeer.SendAsync
method too:
await socketeer.SendAsync("YourGroup", Encoding.UTF8.GetBytes("Hello World"));
Sometimes, it's useful to perform group join up-front, but at some
later time you might also need to get the previously joined group
from the same IWebSocketeer
instance.
IWebSocketeer socketeer = /* connect, join some groups, etc. */;
// If group hasn't been joined previously, no incoming messages would arrive in this case.
IWebSocketeerGroup group = socketeer.Joined("incoming");
group.Subscribe(x => /* process incoming */);
You can alternatively handle the WebSocket
yourself. Instead of passing the
service Uri
to ConnectAsync
, you can create and connect a WebSocket
manually
and pass it to the WebSocketeer.ConnectAsync(WebSocket, CancellationToken)
overload.
In this case, it's important to remember to add the protobuf.webpubsub.azure.v1
required subprotocol:
using Devlooped.Net;
var client = new ClientWebSocket();
client.Options.AddSubProtocol("protobuf.webpubsub.azure.v1");
await client.ConnectAsync(serverUri, CancellationToken.None);
await using var socketeer = WebSocketeer.ConnectAsync(client);
You may want to simulate request/response communication patterns over the socketeer. In cases like this, you would typically do the following:
- Server joined to a client-specific group, such as
SERVER_ID-CLIENT_ID
(with a[TO]-[FROM]
format, so, TO=server, FROM=client) - Server replying to requests in that group by sending responses to
CLIENT_ID-SERVER_ID
(TO=client, FROM=server); - Client joined to the responses group
CLIENT_ID-SERVER_ID
and sending requests as needed toSERVER_ID-CLIENT_ID
.
Note that the client must not join the SERVER_ID-CLIENT_ID
group because
otherwise it would also receive its own messages that are intended for the
server only. Likewise, the server cannot join the CLIENT_ID-SERVER_ID
group
either. This is why this pattern might be more common than it would otherwise
seem.
Server-side:
IWebSocketeer socketeer = ...;
var serverId = socketeer.UserId;
// Perhaps from an initial exchange over a shared channel
var clientId = ...;
await using IWebSocketeerGroup clientChannel = socketeer.Split(
await socketeer.JoinAsync($"{serverId}-{clientId}"),
$"{clientId}-{serverId}");
clientChannel.Subscribe(async x =>
{
// do some processing on incoming requests.
...
// send a response via the outgoing group
await clientChannel.SendAsync(response);
});
Client-side:
IWebSocketeer socketeer = ...;
var clientId = socketeer.UserId;
// Perhaps a known identifier, or looked up somehow
var serverId = ...;
await using IWebSocketeerGroup serverChannel = socketeer.Split(
await socketeer.JoinAsync($"{clientId}-{serverId}""),
$"{serverId}-{clientId}");
serverChannel.Subscribe(async x => /* process responses */);
await serverChannel.SendAsync(request);