Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds events for delivered and dropped messages. #1866

Merged
merged 5 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ReleaseNotes.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
* [Client] Added support for custom CA chain validation (#1851, thanks to @rido-min).
* [Client] Added support for custom CA chain validation (#1851, thanks to @rido-min).
* [Client] Fixed handling of unobserved tasks exceptions (#1871).
* [Client] Fixed not specified ReasonCode when using _SendExtendedAuthenticationExchangeDataAsync_ (#1882, thanks to @rido-min).
* [Server] Fixed not working _UpdateRetainedMessageAsync_ public api (#1858, thanks to @kimdiego2098).
* [Server] Added support for custom DISCONNECT packets when stopping the server or disconnect a client (BREAKING CHANGE!, #1846).
* [Server] Added new property to stop the server from accepting new connections even if it is running (#1846).
* [Server] Added new events for delivered and dropped messages (#1866, thanks to @kallayj).
9 changes: 7 additions & 2 deletions Source/MQTTnet/Internal/MqttPacketBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,22 @@ public void Dispose()
_signal.Dispose();
}

public void DropFirstItem(MqttPacketBusPartition partition)
public MqttPacketBusItem DropFirstItem(MqttPacketBusPartition partition)
{
lock (_syncRoot)
{
var partitionInstance = _partitions[(int)partition];

if (partitionInstance.Any())
if (partitionInstance.Count > 0)
{
var firstItem = partitionInstance.First.Value;
partitionInstance.RemoveFirst();

return firstItem;
}
}

return null;
}

public void EnqueueItem(MqttPacketBusItem item, MqttPacketBusPartition partition)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;

namespace MQTTnet.Server
{
public sealed class ApplicationMessageEnqueuedEventArgs : EventArgs
{
public ApplicationMessageEnqueuedEventArgs(string senderClientId, string receiverClientId, MqttApplicationMessage applicationMessage, bool isDropped)
{
SenderClientId = senderClientId ?? throw new ArgumentNullException( nameof(senderClientId));
ReceiverClientId = receiverClientId ?? throw new ArgumentNullException(nameof(receiverClientId));
ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage));
IsDropped = isDropped;
}

public string SenderClientId { get; }

public string ReceiverClientId { get; }

public bool IsDropped { get; }

public MqttApplicationMessage ApplicationMessage { get; }
}
}
22 changes: 22 additions & 0 deletions Source/MQTTnet/Server/Events/QueueMessageOverwrittenEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using MQTTnet.Packets;

namespace MQTTnet.Server
{
public sealed class QueueMessageOverwrittenEventArgs : EventArgs
{
public QueueMessageOverwrittenEventArgs(string receiverClientId, MqttPacket packet)
{
ReceiverClientId = receiverClientId ?? throw new ArgumentNullException(nameof(receiverClientId));
Packet = packet ?? throw new ArgumentNullException(nameof(packet));
}

public MqttPacket Packet { get; }

public string ReceiverClientId { get; }
}
}
12 changes: 12 additions & 0 deletions Source/MQTTnet/Server/Internal/EnqueueDataPacketResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

namespace MQTTnet.Server
{
public enum EnqueueDataPacketResult
{
Enqueued,
Dropped
}
}
9 changes: 8 additions & 1 deletion Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,14 @@ public async Task<DispatchApplicationMessageResult> DispatchApplicationMessage(

matchingSubscribersCount++;

session.EnqueueDataPacket(new MqttPacketBusItem(publishPacketCopy));
var result = session.EnqueueDataPacket(new MqttPacketBusItem(publishPacketCopy));

if (_eventContainer.ApplicationMessageEnqueuedOrDroppedEvent.HasHandlers)
{
var eventArgs = new ApplicationMessageEnqueuedEventArgs(senderId, session.Id, applicationMessage, result == EnqueueDataPacketResult.Dropped);
await _eventContainer.ApplicationMessageEnqueuedOrDroppedEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
}

_logger.Verbose("Client '{0}': Queued PUBLISH packet with topic '{1}'", session.Id, applicationMessage.Topic);
}

Expand Down
4 changes: 4 additions & 0 deletions Source/MQTTnet/Server/Internal/MqttServerEventContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ public sealed class MqttServerEventContainer
public AsyncEvent<ClientUnsubscribedTopicEventArgs> ClientUnsubscribedTopicEvent { get; } = new AsyncEvent<ClientUnsubscribedTopicEventArgs>();

public AsyncEvent<InterceptingClientApplicationMessageEnqueueEventArgs> InterceptingClientEnqueueEvent { get; } = new AsyncEvent<InterceptingClientApplicationMessageEnqueueEventArgs>();

public AsyncEvent<ApplicationMessageEnqueuedEventArgs> ApplicationMessageEnqueuedOrDroppedEvent { get; } = new AsyncEvent<ApplicationMessageEnqueuedEventArgs>();

public AsyncEvent<QueueMessageOverwrittenEventArgs> QueuedApplicationMessageOverwrittenEvent { get; } = new AsyncEvent<QueueMessageOverwrittenEventArgs>();

public AsyncEvent<InterceptingPacketEventArgs> InterceptingInboundPacketEvent { get; } = new AsyncEvent<InterceptingPacketEventArgs>();

Expand Down
14 changes: 11 additions & 3 deletions Source/MQTTnet/Server/Internal/MqttSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace MQTTnet.Server
public sealed class MqttSession : IDisposable
{
readonly MqttClientSessionsManager _clientSessionsManager;
readonly MqttServerEventContainer _eventContainer;
readonly MqttPacketBus _packetBus = new MqttPacketBus();
readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();

Expand All @@ -44,6 +45,7 @@ public MqttSession(
_connectPacket = connectPacket ?? throw new ArgumentNullException(nameof(connectPacket));
_serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions));
_clientSessionsManager = clientSessionsManager ?? throw new ArgumentNullException(nameof(clientSessionsManager));
_eventContainer = eventContainer ?? throw new ArgumentNullException(nameof(eventContainer));

_subscriptionsManager = new MqttClientSubscriptionsManager(this, eventContainer, retainedMessagesManager, clientSessionsManager);
}
Expand Down Expand Up @@ -117,20 +119,25 @@ public void EnqueueControlPacket(MqttPacketBusItem packetBusItem)
_packetBus.EnqueueItem(packetBusItem, MqttPacketBusPartition.Control);
}

public void EnqueueDataPacket(MqttPacketBusItem packetBusItem)
public EnqueueDataPacketResult EnqueueDataPacket(MqttPacketBusItem packetBusItem)
{
if (_packetBus.ItemsCount(MqttPacketBusPartition.Data) >= _serverOptions.MaxPendingMessagesPerClient)
{
if (_serverOptions.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
{
return;
return EnqueueDataPacketResult.Dropped;
}

if (_serverOptions.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
{
// Only drop from the data partition. Dropping from control partition might break the connection
// because the client does not receive PINGREQ packets etc. any longer.
_packetBus.DropFirstItem(MqttPacketBusPartition.Data);
var firstItem = _packetBus.DropFirstItem(MqttPacketBusPartition.Data);
if (firstItem != null && _eventContainer.QueuedApplicationMessageOverwrittenEvent.HasHandlers)
{
var eventArgs = new QueueMessageOverwrittenEventArgs(Id, firstItem.Packet);
_eventContainer.QueuedApplicationMessageOverwrittenEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
}
}
}

Expand All @@ -147,6 +154,7 @@ public void EnqueueDataPacket(MqttPacketBusItem packetBusItem)
}

_packetBus.EnqueueItem(packetBusItem, MqttPacketBusPartition.Data);
return EnqueueDataPacketResult.Enqueued;
}

public void EnqueueHealthPacket(MqttPacketBusItem packetBusItem)
Expand Down
12 changes: 12 additions & 0 deletions Source/MQTTnet/Server/MqttServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ public MqttServer(MqttServerOptions options, IEnumerable<IMqttServerAdapter> ada
_keepAliveMonitor = new MqttServerKeepAliveMonitor(options, _clientSessionsManager, _rootLogger);
}

public event Func<ApplicationMessageEnqueuedEventArgs, Task> ApplicationMessageEnqueuedOrDroppedAsync
{
add => _eventContainer.ApplicationMessageEnqueuedOrDroppedEvent.AddHandler(value);
remove => _eventContainer.ApplicationMessageEnqueuedOrDroppedEvent.RemoveHandler(value);
}

public event Func<ApplicationMessageNotConsumedEventArgs, Task> ApplicationMessageNotConsumedAsync
{
add => _eventContainer.ApplicationMessageNotConsumedEvent.AddHandler(value);
Expand Down Expand Up @@ -135,6 +141,12 @@ public event Func<EventArgs, Task> PreparingSessionAsync
remove => _eventContainer.PreparingSessionEvent.RemoveHandler(value);
}

public event Func<QueueMessageOverwrittenEventArgs, Task> QueuedApplicationMessageOverwrittenAsync
{
add => _eventContainer.QueuedApplicationMessageOverwrittenEvent.AddHandler(value);
remove => _eventContainer.QueuedApplicationMessageOverwrittenEvent.RemoveHandler(value);
}

public event Func<RetainedMessageChangedEventArgs, Task> RetainedMessageChangedAsync
{
add => _eventContainer.RetainedMessageChangedEvent.AddHandler(value);
Expand Down
Loading