Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 37 additions & 21 deletions src/FishyFlip/ATJetStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public sealed class ATJetStream : IDisposable

private readonly JsonSerializerOptions jsonSerializerOptions;
private readonly SourceGenerationContext sourceGenerationContext;
private readonly OutOfOrderCompletionTracker outOfOrderCompletionTracker = new();
private ClientWebSocket client;
private bool disposedValue;
private ILogger? logger;
Expand Down Expand Up @@ -60,6 +61,11 @@ public ATJetStream(ATJetStreamOptions options)
/// </summary>
public event EventHandler<JetStreamATWebSocketRecordEventArgs>? OnRecordReceived;

/// <summary>
/// Gets the last definitely processed firehose cursor.
/// </summary>
public long? LastDefinitelyProcessedCursor => this.outOfOrderCompletionTracker.LastDefinitelyProcessedSeq;

/// <inheritdoc/>
void IDisposable.Dispose()
{
Expand Down Expand Up @@ -286,7 +292,8 @@ private async Task ReceiveMessages(ClientWebSocket webSocket, CancellationToken
var message = Encoding.UTF8.GetString(messageBytes);
#endif
this.OnRawMessageReceived?.Invoke(this, new JetStreamRawMessageEventArgs(message));
this.options.TaskFactory.StartNew(() => this.HandleMessage(message))
var eventId = this.outOfOrderCompletionTracker.OnEventGenerated();
this.options.TaskFactory.StartNew(() => this.HandleMessage(message, eventId))
.FireAndForgetSafeAsync(this.logger);
}
catch (OperationCanceledException)
Expand All @@ -302,35 +309,44 @@ private async Task ReceiveMessages(ClientWebSocket webSocket, CancellationToken
this.OnConnectionUpdated?.Invoke(this, new SubscriptionConnectionStatusEventArgs(webSocket.State));
}

private void HandleMessage(string json)
private void HandleMessage(string json, long eventId)
{
if (string.IsNullOrEmpty(json))
{
this.logger?.LogDebug("WSS: Empty message received.");
return;
}

long? seq = null;
try
{
var atWebSocketRecord = JsonSerializer.Deserialize<ATWebSocketRecord>(json, this.sourceGenerationContext.ATWebSocketRecord);
if (atWebSocketRecord is null)
if (string.IsNullOrEmpty(json))
{
this.logger?.LogError("WSS: Failed to deserialize ATWebSocketRecord.");
this.logger?.LogError(json);
this.logger?.LogDebug("WSS: Empty message received.");
return;
}

this.OnRecordReceived?.Invoke(this, new JetStreamATWebSocketRecordEventArgs(atWebSocketRecord, json));
}
catch (JsonException ex)
{
this.logger?.LogError(ex, "WSS: Failed to deserialize ATWebSocketRecord.");
this.logger?.LogError(json);
try
{
var atWebSocketRecord = JsonSerializer.Deserialize<ATWebSocketRecord>(json, this.sourceGenerationContext.ATWebSocketRecord);
if (atWebSocketRecord is null)
{
this.logger?.LogError("WSS: Failed to deserialize ATWebSocketRecord.");
this.logger?.LogError(json);
return;
}

seq = atWebSocketRecord.TimeUs;
this.OnRecordReceived?.Invoke(this, new JetStreamATWebSocketRecordEventArgs(atWebSocketRecord, json));
}
catch (JsonException ex)
{
this.logger?.LogError(ex, "WSS: Failed to deserialize ATWebSocketRecord.");
this.logger?.LogError(json);
}
catch (Exception ex)
{
this.logger?.LogError(ex, "WSS: An unknown error occurred.");
this.logger?.LogError(json);
}
}
catch (Exception ex)
finally
{
this.logger?.LogError(ex, "WSS: An unknown error occurred.");
this.logger?.LogError(json);
this.outOfOrderCompletionTracker.OnEventProcessed(eventId, seq);
}
}

Expand Down
86 changes: 54 additions & 32 deletions src/FishyFlip/ATWebSocketProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace FishyFlip;
public sealed class ATWebSocketProtocol : IDisposable
{
private readonly TaskFactory taskFactory;
private readonly OutOfOrderCompletionTracker outOfOrderCompletionTracker = new();
private bool subscribedLabels;
private WebSocketWrapper webSocketWrapper;
private IReadOnlyList<ICustomATObjectCBORConverter> customConverters;
Expand Down Expand Up @@ -73,6 +74,11 @@ public ATWebSocketProtocol(ATWebSocketProtocolOptions options)
/// </summary>
public bool IsConnected => this.webSocketWrapper.IsConnected;

/// <summary>
/// Gets the last definitely processed firehose cursor.
/// </summary>
public long? LastDefinitelyProcessedCursor => this.outOfOrderCompletionTracker.LastDefinitelyProcessedSeq;

/// <summary>
/// Connect to the BlueSky instance via a WebSocket connection.
/// </summary>
Expand Down Expand Up @@ -205,48 +211,56 @@ private void Dispose(bool disposing)
}
}

private void HandleMessage(ReadOnlySpan<byte> span)
private void HandleMessage(ReadOnlySpan<byte> span, long eventId)
{
byte[] byteArray = span.ToArray();
if (byteArray.Length == 0)
{
this.logger?.LogDebug("WSS: ATError reading message. Empty byte array.");
return;
}

CBORObject[]? objects = null;
long? seq = null;
try
{
objects = CBORObject.DecodeSequenceFromBytes(byteArray, new CBOREncodeOptions("useIndefLengthStrings=true;float64=true;allowduplicatekeys=true;allowEmpty=true"));
}
catch (Exception e)
{
this.logger?.LogError(e, "WSS: ATError reading message.");
}
byte[] byteArray = span.ToArray();
if (byteArray.Length == 0)
{
this.logger?.LogDebug("WSS: ATError reading message. Empty byte array.");
return;
}

if (objects is null)
{
return;
}
CBORObject[]? objects = null;
try
{
objects = CBORObject.DecodeSequenceFromBytes(byteArray, new CBOREncodeOptions("useIndefLengthStrings=true;float64=true;allowduplicatekeys=true;allowEmpty=true"));
}
catch (Exception e)
{
this.logger?.LogError(e, "WSS: ATError reading message.");
}

if (objects.Length != 2)
{
return;
}
if (objects is null)
{
return;
}

var frameHeader = new FrameHeader(objects[0]);
if (objects.Length != 2)
{
return;
}

if (this.subscribedLabels)
{
this.HandleLabelMessage(frameHeader, objects[1]);
var frameHeader = new FrameHeader(objects[0]);

if (this.subscribedLabels)
{
seq = this.HandleLabelMessage(frameHeader, objects[1]);
}
else
{
seq = this.HandleRepoMessage(frameHeader, objects[1]);
}
}
else
finally
{
this.HandleRepoMessage(frameHeader, objects[1]);
this.outOfOrderCompletionTracker.OnEventProcessed(eventId, seq);
}
}

private void HandleLabelMessage(FrameHeader header, CBORObject obj)
private long? HandleLabelMessage(FrameHeader header, CBORObject obj)
{
var message = new SubscribeLabelMessage();
message.Header = header;
Expand Down Expand Up @@ -281,9 +295,10 @@ private void HandleLabelMessage(FrameHeader header, CBORObject obj)
}

this.OnSubscribedLabelMessage?.Invoke(this, new SubscribedLabelEventArgs(message));
return message.Labels?.Seq;
}

private void HandleRepoMessage(FrameHeader header, CBORObject obj)
private long? HandleRepoMessage(FrameHeader header, CBORObject obj)
{
var message = new SubscribeRepoMessage();
message.Header = header;
Expand Down Expand Up @@ -377,15 +392,22 @@ private void HandleRepoMessage(FrameHeader header, CBORObject obj)
}

this.OnSubscribedRepoMessage?.Invoke(this, new SubscribedRepoEventArgs(message));

return message.Commit?.Seq;
}

private Task OnInternalMessageReceived(ReadOnlySequence<byte> message)
{
this.OnMessageReceived?.Invoke(this, message);
var eventId = this.outOfOrderCompletionTracker.OnEventGenerated();
if (this.OnRecordReceived is not null || this.OnSubscribedRepoMessage is not null || this.OnSubscribedLabelMessage is not null)
{
var newMessage = message.ToArray();
this.taskFactory.StartNew(() => this.HandleMessage(newMessage)).FireAndForgetSafeAsync(this.logger);
this.taskFactory.StartNew(() => this.HandleMessage(newMessage, eventId)).FireAndForgetSafeAsync(this.logger);
}
else
{
this.outOfOrderCompletionTracker.OnEventProcessed(eventId, null);
}

return Task.CompletedTask;
Expand Down
76 changes: 76 additions & 0 deletions src/FishyFlip/OutOfOrderCompletionTracker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// <copyright file="OutOfOrderCompletionTracker.cs" company="Drastic Actions">
// Copyright (c) Drastic Actions. All rights reserved.
// </copyright>
using System.Runtime.CompilerServices;

namespace FishyFlip
{
/// <summary>
/// Keeps track of the most recent event ID such that all previous events have definitely been processed.
/// </summary>
internal class OutOfOrderCompletionTracker
{
private readonly Dictionary<long, long?> eventIdToSeq = new();
private long lastDefinitelyProcessedEventId;
private long? lastDefinitelyProcessedSeq;
private long lastGeneratedEventId;

/// <summary>
/// Gets the last firehose cursor such that this and all previous events have been already processed on the threadpool.
/// </summary>
public long? LastDefinitelyProcessedSeq
{
get
{
lock (this)
{
return this.lastDefinitelyProcessedSeq;
}
}
}

/// <summary>
/// To be called when an event is generated but not processed yet. Each call must eventually result in a <see cref="OnEventProcessed"/>.
/// </summary>
/// <returns>A sequentially increasing event ID that must be eventually passed to <see cref="OnEventProcessed"/>.</returns>
public long OnEventGenerated()
{
return Interlocked.Increment(ref this.lastGeneratedEventId);
}

/// <summary>
/// To be called when an event is fully processed (successfully or unsuccessfully). Each call must come after a corresponding <see cref="OnEventGenerated"/>.
/// </summary>
/// <param name="eventId">The event ID.</param>
/// <param name="seq">The firehose cursor of the current entry, if available.</param>
public void OnEventProcessed(long eventId, long? seq)
{
lock (this)
{
if (eventId <= this.lastDefinitelyProcessedEventId)
{
throw new InvalidOperationException("OnEventProcessed was called with an eventId that was already supposedly processed.");
}

this.eventIdToSeq.Add(eventId, seq);
this.UpdateLastDefinitelyProcessedEvent();
}
}

private void UpdateLastDefinitelyProcessedEvent()
{
while (this.eventIdToSeq.TryGetValue(this.lastDefinitelyProcessedEventId + 1, out var seq))
{
this.lastDefinitelyProcessedEventId++;
if (seq != null)
{
this.lastDefinitelyProcessedSeq = seq;
}

this.eventIdToSeq.Remove(this.lastDefinitelyProcessedEventId);
}
}
}
}


Loading