Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 20 additions & 13 deletions src/FishyFlip/ATJetStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using System;
using System.Buffers;
using System.Runtime.InteropServices;
using FishyFlip.Events;
using FishyFlip.Lexicon;
using FishyFlip.Tools.Json;
Expand All @@ -28,6 +29,7 @@ public sealed class ATJetStream : IDisposable
private bool compression;
private byte[]? dictionary;
private Decompressor? decompressor;
private long lastEventId;

/// <summary>
/// Initializes a new instance of the <see cref="ATJetStream"/> class.
Expand Down Expand Up @@ -60,6 +62,11 @@ public ATJetStream(ATJetStreamOptions options)
/// </summary>
public event EventHandler<JetStreamATWebSocketRecordEventArgs>? OnRecordReceived;

/// <summary>
/// On AT WebSocket Record Error.
/// </summary>
public event EventHandler<JetStreamATWebSocketRecordErrorEventArgs>? OnRecordError;

/// <inheritdoc/>
void IDisposable.Dispose()
{
Expand Down Expand Up @@ -280,13 +287,14 @@ private async Task ReceiveMessages(ClientWebSocket webSocket, CancellationToken
messageBytes = this.decompressor!.Unwrap(messageBytes);
}

var eventId = Interlocked.Increment(ref this.lastEventId);
#if NETSTANDARD
var message = Encoding.UTF8.GetString(messageBytes.ToArray());
#else
var message = Encoding.UTF8.GetString(messageBytes);
#endif
this.OnRawMessageReceived?.Invoke(this, new JetStreamRawMessageEventArgs(message));
this.options.TaskFactory.StartNew(() => this.HandleMessage(message))
this.OnRawMessageReceived?.Invoke(this, new JetStreamRawMessageEventArgs(message, eventId));
this.options.TaskFactory.StartNew(() => this.HandleMessage(message, eventId))
.FireAndForgetSafeAsync(this.logger);
}
catch (OperationCanceledException)
Expand All @@ -302,33 +310,32 @@ private async Task ReceiveMessages(ClientWebSocket webSocket, CancellationToken
this.OnConnectionUpdated?.Invoke(this, new SubscriptionConnectionStatusEventArgs(webSocket.State));
}

private void HandleMessage(string json)
private void HandleMessage(string json, long eventId)
{
if (string.IsNullOrEmpty(json))
{
this.logger?.LogDebug("WSS: Empty message received.");
return;
}

try
{
if (string.IsNullOrEmpty(json))
{
throw new ArgumentException("Empty message received.");
}

var atWebSocketRecord = JsonSerializer.Deserialize<ATWebSocketRecord>(json, this.sourceGenerationContext.ATWebSocketRecord);
if (atWebSocketRecord is null)
{
this.logger?.LogError("WSS: Failed to deserialize ATWebSocketRecord.");
this.logger?.LogError(json);
return;
throw new ArgumentException("Deserialized event is null.");
}

this.OnRecordReceived?.Invoke(this, new JetStreamATWebSocketRecordEventArgs(atWebSocketRecord, json));
this.OnRecordReceived?.Invoke(this, new JetStreamATWebSocketRecordEventArgs(atWebSocketRecord, json, eventId));
}
catch (JsonException ex)
{
this.OnRecordError?.Invoke(this, new JetStreamATWebSocketRecordErrorEventArgs(eventId));
this.logger?.LogError(ex, "WSS: Failed to deserialize ATWebSocketRecord.");
this.logger?.LogError(json);
}
catch (Exception ex)
{
this.OnRecordError?.Invoke(this, new JetStreamATWebSocketRecordErrorEventArgs(eventId));
this.logger?.LogError(ex, "WSS: An unknown error occurred.");
this.logger?.LogError(json);
}
Expand Down
26 changes: 26 additions & 0 deletions src/FishyFlip/Events/JetStreamATWebSocketRecordErrorEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// <copyright file="JetStreamATWebSocketRecordErrorEventArgs.cs" company="Drastic Actions">
// Copyright (c) Drastic Actions. All rights reserved.
// </copyright>

namespace FishyFlip.Events
{
/// <summary>
/// Provides event data for errors encountered while parsing a JetStream AT WebSocket record.
/// </summary>
public class JetStreamATWebSocketRecordErrorEventArgs : EventArgs
{
/// <summary>
/// Initializes a new instance of the <see cref="JetStreamATWebSocketRecordErrorEventArgs"/> class.
/// </summary>
/// <param name="eventId">The event ID of the record whose parsing failed.</param>
public JetStreamATWebSocketRecordErrorEventArgs(long eventId)
{
this.EventId = eventId;
}

/// <summary>
/// Gets the event ID of the record whose parsing failed.
/// </summary>
public long EventId { get; }
}
}
9 changes: 8 additions & 1 deletion src/FishyFlip/Events/JetStreamATWebSocketRecordEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ public class JetStreamATWebSocketRecordEventArgs : EventArgs
/// </summary>
/// <param name="record"><see cref="ATWebSocketRecord"/>.</param>
/// <param name="json">JSON.</param>
public JetStreamATWebSocketRecordEventArgs(ATWebSocketRecord record, string json)
/// <param name="eventId">An ID that can be used to correlate this parsed record to its original <see cref="JetStreamRawMessageEventArgs"/>.</param>
public JetStreamATWebSocketRecordEventArgs(ATWebSocketRecord record, string json, long eventId)
{
this.Record = record;
this.Json = json;
this.EventId = eventId;
}

/// <summary>
Expand All @@ -29,4 +31,9 @@ public JetStreamATWebSocketRecordEventArgs(ATWebSocketRecord record, string json
/// Gets the JSON representation of the AT WebSocket Record.
/// </summary>
public string Json { get; }

/// <summary>
/// Gets an ID that can be used to correlate this parsed record to its original <see cref="JetStreamRawMessageEventArgs"/>.
/// </summary>
public long EventId { get; }
}
9 changes: 8 additions & 1 deletion src/FishyFlip/Events/JetStreamRawMessageEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,20 @@ public class JetStreamRawMessageEventArgs : EventArgs
/// Initializes a new instance of the <see cref="JetStreamRawMessageEventArgs"/> class.
/// </summary>
/// <param name="messageJson">Raw Message JSON.</param>
public JetStreamRawMessageEventArgs(string messageJson)
/// <param name="eventId">A unique, sequentially increasing ID that can be used to correlate this raw message to its future corresponding <see cref="JetStreamATWebSocketRecordEventArgs"/>.</param>
public JetStreamRawMessageEventArgs(string messageJson, long eventId)
{
this.MessageJson = messageJson;
this.EventId = eventId;
}

/// <summary>
/// Gets the Message JSON.
/// </summary>
public string MessageJson { get; }

/// <summary>
/// Gets the unique, sequentially increasing ID that can be used to correlate this raw message to its future corresponding <see cref="JetStreamATWebSocketRecordEventArgs">.
/// </summary>
public long EventId { get; }
}
Loading