diff --git a/src/FishyFlip/ATJetStream.cs b/src/FishyFlip/ATJetStream.cs index adaa3042..db7d38c1 100644 --- a/src/FishyFlip/ATJetStream.cs +++ b/src/FishyFlip/ATJetStream.cs @@ -4,6 +4,7 @@ using System; using System.Buffers; +using System.Runtime.InteropServices; using FishyFlip.Events; using FishyFlip.Lexicon; using FishyFlip.Tools.Json; @@ -28,6 +29,7 @@ public sealed class ATJetStream : IDisposable private bool compression; private byte[]? dictionary; private Decompressor? decompressor; + private long lastEventId; /// /// Initializes a new instance of the class. @@ -60,6 +62,11 @@ public ATJetStream(ATJetStreamOptions options) /// public event EventHandler? OnRecordReceived; + /// + /// On AT WebSocket Record Error. + /// + public event EventHandler? OnRecordError; + /// void IDisposable.Dispose() { @@ -280,13 +287,14 @@ private async Task ReceiveMessages(ClientWebSocket webSocket, CancellationToken messageBytes = this.decompressor!.Unwrap(messageBytes); } + var eventId = Interlocked.Increment(ref this.lastEventId); #if NETSTANDARD var message = Encoding.UTF8.GetString(messageBytes.ToArray()); #else var message = Encoding.UTF8.GetString(messageBytes); #endif - this.OnRawMessageReceived?.Invoke(this, new JetStreamRawMessageEventArgs(message)); - this.options.TaskFactory.StartNew(() => this.HandleMessage(message)) + this.OnRawMessageReceived?.Invoke(this, new JetStreamRawMessageEventArgs(message, eventId)); + this.options.TaskFactory.StartNew(() => this.HandleMessage(message, eventId)) .FireAndForgetSafeAsync(this.logger); } catch (OperationCanceledException) @@ -302,33 +310,32 @@ private async Task ReceiveMessages(ClientWebSocket webSocket, CancellationToken this.OnConnectionUpdated?.Invoke(this, new SubscriptionConnectionStatusEventArgs(webSocket.State)); } - private void HandleMessage(string json) + private void HandleMessage(string json, long eventId) { - if (string.IsNullOrEmpty(json)) - { - this.logger?.LogDebug("WSS: Empty message received."); - return; - } - try { + if (string.IsNullOrEmpty(json)) + { + throw new ArgumentException("Empty message received."); + } + var atWebSocketRecord = JsonSerializer.Deserialize(json, this.sourceGenerationContext.ATWebSocketRecord); if (atWebSocketRecord is null) { - this.logger?.LogError("WSS: Failed to deserialize ATWebSocketRecord."); - this.logger?.LogError(json); - return; + throw new ArgumentException("Deserialized event is null."); } - this.OnRecordReceived?.Invoke(this, new JetStreamATWebSocketRecordEventArgs(atWebSocketRecord, json)); + this.OnRecordReceived?.Invoke(this, new JetStreamATWebSocketRecordEventArgs(atWebSocketRecord, json, eventId)); } catch (JsonException ex) { + this.OnRecordError?.Invoke(this, new JetStreamATWebSocketRecordErrorEventArgs(eventId)); this.logger?.LogError(ex, "WSS: Failed to deserialize ATWebSocketRecord."); this.logger?.LogError(json); } catch (Exception ex) { + this.OnRecordError?.Invoke(this, new JetStreamATWebSocketRecordErrorEventArgs(eventId)); this.logger?.LogError(ex, "WSS: An unknown error occurred."); this.logger?.LogError(json); } diff --git a/src/FishyFlip/Events/JetStreamATWebSocketRecordErrorEventArgs.cs b/src/FishyFlip/Events/JetStreamATWebSocketRecordErrorEventArgs.cs new file mode 100644 index 00000000..92762ebe --- /dev/null +++ b/src/FishyFlip/Events/JetStreamATWebSocketRecordErrorEventArgs.cs @@ -0,0 +1,26 @@ +// +// Copyright (c) Drastic Actions. All rights reserved. +// + +namespace FishyFlip.Events +{ + /// + /// Provides event data for errors encountered while parsing a JetStream AT WebSocket record. + /// + public class JetStreamATWebSocketRecordErrorEventArgs : EventArgs + { + /// + /// Initializes a new instance of the class. + /// + /// The event ID of the record whose parsing failed. + public JetStreamATWebSocketRecordErrorEventArgs(long eventId) + { + this.EventId = eventId; + } + + /// + /// Gets the event ID of the record whose parsing failed. + /// + public long EventId { get; } + } +} diff --git a/src/FishyFlip/Events/JetStreamATWebSocketRecordEventArgs.cs b/src/FishyFlip/Events/JetStreamATWebSocketRecordEventArgs.cs index 1c47ce53..9aadcb06 100644 --- a/src/FishyFlip/Events/JetStreamATWebSocketRecordEventArgs.cs +++ b/src/FishyFlip/Events/JetStreamATWebSocketRecordEventArgs.cs @@ -14,10 +14,12 @@ public class JetStreamATWebSocketRecordEventArgs : EventArgs /// /// . /// JSON. - public JetStreamATWebSocketRecordEventArgs(ATWebSocketRecord record, string json) + /// An ID that can be used to correlate this parsed record to its original . + public JetStreamATWebSocketRecordEventArgs(ATWebSocketRecord record, string json, long eventId) { this.Record = record; this.Json = json; + this.EventId = eventId; } /// @@ -29,4 +31,9 @@ public JetStreamATWebSocketRecordEventArgs(ATWebSocketRecord record, string json /// Gets the JSON representation of the AT WebSocket Record. /// public string Json { get; } + + /// + /// Gets an ID that can be used to correlate this parsed record to its original . + /// + public long EventId { get; } } \ No newline at end of file diff --git a/src/FishyFlip/Events/JetStreamRawMessageEventArgs.cs b/src/FishyFlip/Events/JetStreamRawMessageEventArgs.cs index 2754049b..8625c168 100644 --- a/src/FishyFlip/Events/JetStreamRawMessageEventArgs.cs +++ b/src/FishyFlip/Events/JetStreamRawMessageEventArgs.cs @@ -13,13 +13,20 @@ public class JetStreamRawMessageEventArgs : EventArgs /// Initializes a new instance of the class. /// /// Raw Message JSON. - public JetStreamRawMessageEventArgs(string messageJson) + /// A unique, sequentially increasing ID that can be used to correlate this raw message to its future corresponding . + public JetStreamRawMessageEventArgs(string messageJson, long eventId) { this.MessageJson = messageJson; + this.EventId = eventId; } /// /// Gets the Message JSON. /// public string MessageJson { get; } + + /// + /// Gets the unique, sequentially increasing ID that can be used to correlate this raw message to its future corresponding . + /// + public long EventId { get; } } \ No newline at end of file