Skip to content

Commit

Permalink
Message loss in migrate-to-quorum command (#1233)
Browse files Browse the repository at this point in the history
* Add failing test

* Decode UTF8 byte array to get message ID string

* Clarify empty string usage by removing emptyRoutingKey

* Add net7.0 TFM to CommandLine project
  • Loading branch information
bording authored Apr 27, 2023
1 parent e2a98c3 commit 10d9edc
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,22 @@ public async Task Should_preserve_existing_messages()
Assert.AreEqual(numExistingMessages, MessageCount(endpointName));
}

[Test]
public async Task Should_preserve_existing_messages_with_messageIds()
{
var endpointName = "EndpointWithExistingMessages";
var numExistingMessages = 10;

PrepareTestEndpoint(endpointName);

AddMessages(endpointName, numExistingMessages, properties => properties.Headers = new Dictionary<string, object> { { NServiceBus.Headers.MessageId, Guid.NewGuid().ToString() } });

await ExecuteMigration(endpointName);

Assert.True(QueueIsQuorum(endpointName));
Assert.AreEqual(numExistingMessages, MessageCount(endpointName));
}

[Test]
public async Task Should_preserve_existing_messages_in_holding_queue()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{
using System;
using System.CommandLine;
using System.Text;
using global::RabbitMQ.Client;
using global::RabbitMQ.Client.Exceptions;

Expand Down Expand Up @@ -88,11 +89,11 @@ MigrationStage MoveMessagesToHoldingQueue(IConnection connection, CancellationTo

// bind the holding queue to the exchange of the queue under migration
// this will throw if the exchange for the queue doesn't exist
channel.QueueBind(holdingQueueName, queueName, emptyRoutingKey);
channel.QueueBind(holdingQueueName, queueName, string.Empty);
console.WriteLine($"Bound '{holdingQueueName}' to exchange '{queueName}'");

// unbind the queue under migration to stop more messages from coming in
channel.QueueUnbind(queueName, queueName, emptyRoutingKey);
channel.QueueUnbind(queueName, queueName, string.Empty);
console.WriteLine($"Unbound '{queueName}' from exchange '{queueName}' ");

// move all existing messages to the holding queue
Expand All @@ -103,7 +104,7 @@ MigrationStage MoveMessagesToHoldingQueue(IConnection connection, CancellationTo
queueName,
message =>
{
channel.BasicPublish(emptyRoutingKey, holdingQueueName, message.BasicProperties, message.Body);
channel.BasicPublish(string.Empty, holdingQueueName, message.BasicProperties, message.Body);
channel.WaitForConfirmsOrDie();
},
cancellationToken);
Expand Down Expand Up @@ -143,10 +144,10 @@ MigrationStage RestoreMessages(IConnection connection, CancellationToken cancell
{
using var channel = connection.CreateModel();

channel.QueueBind(queueName, queueName, emptyRoutingKey);
channel.QueueBind(queueName, queueName, string.Empty);
console.WriteLine($"Re-bound '{queueName}' to exchange '{queueName}'");

channel.QueueUnbind(holdingQueueName, queueName, emptyRoutingKey);
channel.QueueUnbind(holdingQueueName, queueName, string.Empty);
console.WriteLine($"Unbound '{holdingQueueName}' from exchange '{queueName}'");

var messageIds = new Dictionary<string, string>();
Expand All @@ -163,15 +164,18 @@ MigrationStage RestoreMessages(IConnection connection, CancellationToken cancell
if (message.BasicProperties.Headers.TryGetValue("NServiceBus.MessageId", out var messageId))
{
messageIdString = messageId?.ToString();
if (messageId is byte[] bytes)
{
messageIdString = Encoding.UTF8.GetString(bytes);
}
if (messageIdString != null && messageIds.ContainsKey(messageIdString))
{
return;
}
}
channel.BasicPublish(emptyRoutingKey, queueName, message.BasicProperties, message.Body);
channel.BasicPublish(string.Empty, queueName, message.BasicProperties, message.Body);
channel.WaitForConfirmsOrDie();
if (messageIdString != null)
Expand Down Expand Up @@ -231,7 +235,6 @@ uint ProcessMessages(IModel channel, string sourceQueue, Action<BasicGetResult>
readonly IConsole console;
readonly MigrationState migrationState;

static string emptyRoutingKey = string.Empty;
static Dictionary<string, object> quorumQueueArguments = new Dictionary<string, object> { { "x-queue-type", "quorum" } };

enum MigrationStage
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<TargetFrameworks>net6.0;net7.0</TargetFrameworks>
<OutputType>Exe</OutputType>
<ToolCommandName>rabbitmq-transport</ToolCommandName>
<PackAsTool>True</PackAsTool>
Expand Down

0 comments on commit 10d9edc

Please sign in to comment.