Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds missing parts of the fix in PR #1110 #1130

Merged
merged 7 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Minio.Examples/Cases/ListenNotifications.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public static void Run(IMinioClient minio,
Console.WriteLine();
events ??= new List<EventType> { EventType.BucketCreatedAll };
var args = new ListenBucketNotificationsArgs().WithEvents(events);
var observable = minio.ListenNotificationsAsync(events);
var observable = minio.ListenNotifications(args);

subscription = observable.Subscribe(
notification => Console.WriteLine($"Notification: {notification.Json}"),
Expand Down
151 changes: 55 additions & 96 deletions Minio.Functional.Tests/FunctionalTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public static class FunctionalTest
"IObservable<MinioNotificationRaw> ListenBucketNotificationsAsync(ListenBucketNotificationsArgs args, CancellationToken cancellationToken = default(CancellationToken))";

private const string listenNotificationsSignature =
"IObservable<MinioNotificationRaw> ListenNotificationsAsync(IList<EventType> events, CancellationToken cancellationToken = default(CancellationToken))";
"IObservable<MinioNotificationRaw> ListenNotifications(ListenBucketNotificationsArgs args, CancellationToken cancellationToken = default(CancellationToken))";

private const string copyObjectSignature =
"Task<CopyObjectResult> CopyObjectAsync(CopyObjectArgs args, CancellationToken cancellationToken = default(CancellationToken))";
Expand Down Expand Up @@ -1041,13 +1041,11 @@ internal static async Task<bool> CreateBucket_Tester(IMinioClient minio, string
{
// Create a new bucket
await minio.MakeBucketAsync(new MakeBucketArgs().WithBucket(bucketName)).ConfigureAwait(false);
await Task.Delay(800).ConfigureAwait(false);
ramondeklein marked this conversation as resolved.
Show resolved Hide resolved

// Verify the bucket exists
var bucketExists = await minio.BucketExistsAsync(new BucketExistsArgs().WithBucket(bucketName))
return await minio.BucketExistsAsync(new BucketExistsArgs().WithBucket(bucketName))
.ConfigureAwait(false);
Assert.IsTrue(bucketExists, $"Bucket {bucketName} was not created successfully.");

return bucketExists;
ramondeklein marked this conversation as resolved.
Show resolved Hide resolved
}

internal static async Task StatObject_Test1(IMinioClient minio)
Expand Down Expand Up @@ -1282,7 +1280,7 @@ internal static async Task RemoveObjects_Test3(IMinioClient minio)
.WithBucket(bucketName)
.WithObjectsVersions(objVersions);

await minio.RemoveObjectsAsync(removeObjectsArgs).ConfigureAwait(false);
_ = await minio.RemoveObjectsAsync(removeObjectsArgs).ConfigureAwait(false);

await TearDown(minio, bucketName).ConfigureAwait(false);

Expand Down Expand Up @@ -2639,7 +2637,7 @@ await ListObjects_Test(minio, bucketName, singleObjectName, 1, headers: extractH

#region Global Notifications

internal static async Task ListenNotificationsAsync_Test1(IMinioClient minio)
internal static async Task ListenNotifications_Test1(IMinioClient minio)
{
var startTime = DateTime.Now;
var bucketName = GetRandomName(15);
Expand All @@ -2648,61 +2646,54 @@ internal static async Task ListenNotificationsAsync_Test1(IMinioClient minio)
try
{
var received = new List<MinioNotificationRaw>();

var eventsList = new List<EventType> { EventType.BucketCreatedAll };

var events = minio.ListenNotificationsAsync(eventsList);
var subscription = events.Subscribe(
received.Add,
ex => Console.WriteLine($"OnError: {ex}"),
() => Console.WriteLine("Stopped listening for bucket notifications\n"));

// Ensure the subscription is established
await Task.Delay(1000).ConfigureAwait(false);

// Trigger the event by creating a new bucket
var isBucketCreated1 = await CreateBucket_Tester(minio, bucketName).ConfigureAwait(false);

// No need to define a new "ListenNotificationArgs"
// "ListenBucketNotificationsArgs" is good here
var listenArgs = new ListenBucketNotificationsArgs()
.WithEvents(eventsList);
var events = minio.ListenNotifications(listenArgs);
var eventDetected = false;
using (var subscription = events.Subscribe(
received.Add,
_ => { },
() => { }))
{
await Task.Delay(200).ConfigureAwait(false);
ramondeklein marked this conversation as resolved.
Show resolved Hide resolved
// Trigger the event by creating a new bucket
_ = await CreateBucket_Tester(minio, bucketName).ConfigureAwait(false);
}

for (var attempt = 0; attempt < 20; attempt++)
{
if (received.Count > 0)
// Check if there is a caught event
if (received.Count == 1)
{
var notification = JsonSerializer.Deserialize<MinioNotification>(received[0].Json);

if (notification.Records is not null)
{
Assert.AreEqual(1, notification.Records.Count);
Assert.IsTrue(notification.Records[0].EventName
.Contains("s3:BucketCreated:*", StringComparison.OrdinalIgnoreCase));
eventDetected = true;
break;
}
}

await Task.Delay(500).ConfigureAwait(false); // Delay between attempts
}

subscription.Dispose();
if (!eventDetected)
throw new UnexpectedMinioException("Failed to detect the expected bucket notification event.");

new MintLogger(nameof(ListenNotificationsAsync_Test1),
listenNotificationsSignature,
"Tests whether ListenNotifications passes",
TestStatus.PASS, DateTime.Now - startTime, args: args).Log();
}
catch (NotImplementedException ex)
{
new MintLogger(nameof(ListenNotificationsAsync_Test1),
listenNotificationsSignature,
"Tests whether ListenNotifications passes",
TestStatus.NA, DateTime.Now - startTime, ex.Message,
ex.ToString(), args: args).Log();
if (eventDetected)
new MintLogger(nameof(ListenNotifications_Test1),
listenNotificationsSignature,
"Tests whether ListenNotifications notifies user about \"BucketCreatedAll\" event",
TestStatus.PASS, DateTime.Now - startTime, args: args).Log();
else
throw new UnexpectedMinioException(
"Failed to detect the expected notification event, \"BucketCreatedAll\".");
}
catch (Exception ex)
{
new MintLogger(nameof(ListenNotificationsAsync_Test1),
new MintLogger(nameof(ListenNotifications_Test1),
listenNotificationsSignature,
"Tests whether ListenNotifications passes",
"Tests whether ListenNotifications notifies user about \"BucketCreatedAll\" event",
TestStatus.FAIL, DateTime.Now - startTime, ex.Message,
ex.ToString(), args: args).Log();
throw;
Expand Down Expand Up @@ -2738,57 +2729,26 @@ internal static async Task ListenBucketNotificationsAsync_Test1(IMinioClient min
var received = new List<MinioNotificationRaw>();

var eventsList = new List<EventType> { EventType.ObjectCreatedAll };
var eventDetected = false;

var listenArgs = new ListenBucketNotificationsArgs()
.WithBucket(bucketName)
.WithEvents(eventsList);
var events = minio.ListenBucketNotificationsAsync(listenArgs);
var subscription = events.Subscribe(
received.Add,
ex => { },
() => { }
);


_ = await PutObject_Tester(minio, bucketName, objectName, null, contentType,
0, null, rsg.GenerateStreamFromSeed(1 * KB)).ConfigureAwait(false);
using (var subscription = events.Subscribe(
received.Add,
_ => { },
() => { }))
{
_ = await PutObject_Tester(minio, bucketName, objectName, null, contentType,
0, null, rsg.GenerateStreamFromSeed(1 * KB)).ConfigureAwait(false);
}

// wait for notifications
var eventDetected = false;
for (var attempt = 0; attempt < 10; attempt++)
if (received.Count > 0)
for (var attempt = 0; attempt < 20; attempt++)
// Check if there is a caught event
if (received.Count == 1)
{
// Check if there is any unexpected error returned
// and captured in the receivedJson list, like
// "NotImplemented" api error. If so, we throw an exception
// and skip running this test
if (received.Count > 1 &&
received[1].Json.StartsWith("<Error><Code>", StringComparison.OrdinalIgnoreCase))
{
// Although the attribute is called "json",
// returned data in list "received" is in xml
// format and it is an error.Here, we convert xml
// into json format.
var receivedJson = XmlStrToJsonStr(received[1].Json);

// Cleanup the "Error" key encapsulating "receivedJson"
// data. This is required to match and convert json data
// "receivedJson" into class "ErrorResponse"
var len = "{'Error':".Length;
var trimmedFront = receivedJson[len..];
var trimmedFull = trimmedFront[..^1];

var err = JsonSerializer.Deserialize<ErrorResponse>(trimmedFull);

Exception ex = new UnexpectedMinioException(err.Message);
if (string.Equals(err.Code, "NotImplemented", StringComparison.OrdinalIgnoreCase))
ex = new NotImplementedException(err.Message);
await TearDown(minio, bucketName).ConfigureAwait(false);
throw ex;
}

var notification = JsonSerializer.Deserialize<MinioNotification>(received[0].Json);

if (notification.Records is not null)
{
Assert.AreEqual(1, notification.Records.Count);
Expand All @@ -2804,20 +2764,19 @@ internal static async Task ListenBucketNotificationsAsync_Test1(IMinioClient min
}
}

// subscription.Dispose();
if (!eventDetected)
if (eventDetected)
new MintLogger(nameof(ListenBucketNotificationsAsync_Test1),
listenBucketNotificationsSignature,
"Tests whether ListenBucketNotifications passes for small object",
TestStatus.PASS, DateTime.Now - startTime, args: args).Log();
else
throw new UnexpectedMinioException("Failed to detect the expected bucket notification event.");

new MintLogger(nameof(ListenBucketNotificationsAsync_Test1),
listenBucketNotificationsSignature,
"Tests whether ListenBucketNotifications passes for small object",
TestStatus.PASS, DateTime.Now - startTime, args: args).Log();
}
catch (NotImplementedException ex)
{
new MintLogger(nameof(ListenBucketNotificationsAsync_Test1),
listenBucketNotificationsSignature,
"Tests whether ListenBucketNotifications passes for small object",
"Tests whether ListenBucketNotifications generates notification for a newly created object.",
TestStatus.NA, DateTime.Now - startTime, ex.Message,
ex.ToString(), args: args).Log();
}
Expand All @@ -2841,14 +2800,14 @@ static bool isAWS(string endPoint)
// This is a PASS
new MintLogger(nameof(ListenBucketNotificationsAsync_Test1),
listenBucketNotificationsSignature,
"Tests whether ListenBucketNotifications passes for small object",
"Tests whether ListenBucketNotifications generates notification for a newly created object.",
TestStatus.PASS, DateTime.Now - startTime, args: args).Log();
}
else
{
new MintLogger(nameof(ListenBucketNotificationsAsync_Test1),
listenBucketNotificationsSignature,
"Tests whether ListenBucketNotifications passes for small object",
"Tests whether ListenBucketNotifications generates notification for a newly created object.",
TestStatus.FAIL, DateTime.Now - startTime, ex.Message,
ex.ToString(), args: args).Log();
throw;
Expand Down
2 changes: 1 addition & 1 deletion Minio.Functional.Tests/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public static async Task Main(string[] args)
ConcurrentBag<Task> functionalTestTasks = new();

// Global Notification
await FunctionalTest.ListenNotificationsAsync_Test1(minioClient).ConfigureAwait(false);
await FunctionalTest.ListenNotifications_Test1(minioClient).ConfigureAwait(false);

// Try catch as 'finally' section needs to run in the Functional Tests
// Bucket notification is a minio specific feature.
Expand Down
5 changes: 2 additions & 3 deletions Minio/ApiEndpoints/BucketOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -762,13 +762,12 @@ await this.ExecuteTaskAsync(ResponseErrorHandlers, requestMessageBuilder,
/// <summary>
/// Subscribes to global change notifications (a Minio-only extension)
/// </summary>
/// <param name="events">Events to listen for</param>
/// <param name="args">ListenBucketNotificationsArgs to listen events</param>
/// <param name="cancellationToken">Optional cancellation token to cancel the operation</param>
/// <returns>An observable of JSON-based notification events</returns>
public IObservable<MinioNotificationRaw> ListenNotificationsAsync(IList<EventType> events,
public IObservable<MinioNotificationRaw> ListenNotifications(ListenBucketNotificationsArgs args,
CancellationToken cancellationToken = default)
{
var args = new ListenBucketNotificationsArgs().WithEvents(events);
return ListenBucketNotificationsAsync(args, cancellationToken);
}

Expand Down
2 changes: 1 addition & 1 deletion Minio/ApiEndpoints/IBucketOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ Task<ReplicationConfiguration> GetBucketReplicationAsync(GetBucketReplicationArg

Task<string> GetPolicyAsync(GetPolicyArgs args, CancellationToken cancellationToken = default);

IObservable<MinioNotificationRaw> ListenNotificationsAsync(IList<EventType> events,
IObservable<MinioNotificationRaw> ListenNotifications(ListenBucketNotificationsArgs args,
CancellationToken cancellationToken = default);

IObservable<MinioNotificationRaw> ListenBucketNotificationsAsync(string bucketName, IList<EventType> events,
Expand Down
2 changes: 1 addition & 1 deletion Minio/Minio.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<PackageReference Include="System.Net.Http" Version="4.3.4" />
<PackageReference Include="System.Net.Primitives" Version="4.3.1" />
<PackageReference Include="System.ValueTuple" Version="4.5.0" />
<PackageReference Include="System.Text.Json" Version="8.0.3" />
<PackageReference Include="System.Text.Json" Version="8.0.4" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.4" />
<PackageReference Include="System.Security.Cryptography.Algorithms" Version="4.3.1" />
</ItemGroup>
Expand Down
Loading