Skip to content

Commit

Permalink
Revert "Adds missing parts of the fix in PR minio#1110 (minio#1130)"
Browse files Browse the repository at this point in the history
This reverts commit 93007be.

# Conflicts:
#	Minio/Minio.csproj
  • Loading branch information
ramondeklein committed Dec 19, 2024
1 parent e9b3a81 commit 90d87ed
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 60 deletions.
2 changes: 1 addition & 1 deletion Minio.Examples/Cases/ListenNotifications.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public static void Run(IMinioClient minio,
Console.WriteLine();
events ??= new List<EventType> { EventType.BucketCreatedAll };
var args = new ListenBucketNotificationsArgs().WithEvents(events);
var observable = minio.ListenNotifications(args);
var observable = minio.ListenNotificationsAsync(events);

subscription = observable.Subscribe(
notification => Console.WriteLine($"Notification: {notification.Json}"),
Expand Down
151 changes: 96 additions & 55 deletions Minio.Functional.Tests/FunctionalTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public static class FunctionalTest
"IObservable<MinioNotificationRaw> ListenBucketNotificationsAsync(ListenBucketNotificationsArgs args, CancellationToken cancellationToken = default(CancellationToken))";

private const string listenNotificationsSignature =
"IObservable<MinioNotificationRaw> ListenNotifications(ListenBucketNotificationsArgs args, CancellationToken cancellationToken = default(CancellationToken))";
"IObservable<MinioNotificationRaw> ListenNotificationsAsync(IList<EventType> events, CancellationToken cancellationToken = default(CancellationToken))";

private const string copyObjectSignature =
"Task<CopyObjectResult> CopyObjectAsync(CopyObjectArgs args, CancellationToken cancellationToken = default(CancellationToken))";
Expand Down Expand Up @@ -1041,11 +1041,13 @@ internal static async Task<bool> CreateBucket_Tester(IMinioClient minio, string
{
// Create a new bucket
await minio.MakeBucketAsync(new MakeBucketArgs().WithBucket(bucketName)).ConfigureAwait(false);
await Task.Delay(800).ConfigureAwait(false);

// Verify the bucket exists
return await minio.BucketExistsAsync(new BucketExistsArgs().WithBucket(bucketName))
var bucketExists = await minio.BucketExistsAsync(new BucketExistsArgs().WithBucket(bucketName))
.ConfigureAwait(false);
Assert.IsTrue(bucketExists, $"Bucket {bucketName} was not created successfully.");

return bucketExists;
}

internal static async Task StatObject_Test1(IMinioClient minio)
Expand Down Expand Up @@ -1280,7 +1282,7 @@ internal static async Task RemoveObjects_Test3(IMinioClient minio)
.WithBucket(bucketName)
.WithObjectsVersions(objVersions);

_ = await minio.RemoveObjectsAsync(removeObjectsArgs).ConfigureAwait(false);
await minio.RemoveObjectsAsync(removeObjectsArgs).ConfigureAwait(false);

await TearDown(minio, bucketName).ConfigureAwait(false);

Expand Down Expand Up @@ -2637,7 +2639,7 @@ await ListObjects_Test(minio, bucketName, singleObjectName, 1, headers: extractH

#region Global Notifications

internal static async Task ListenNotifications_Test1(IMinioClient minio)
internal static async Task ListenNotificationsAsync_Test1(IMinioClient minio)
{
var startTime = DateTime.Now;
var bucketName = GetRandomName(15);
Expand All @@ -2646,54 +2648,61 @@ internal static async Task ListenNotifications_Test1(IMinioClient minio)
try
{
var received = new List<MinioNotificationRaw>();

var eventsList = new List<EventType> { EventType.BucketCreatedAll };

// No need to define a new "ListenNotificationArgs"
// "ListenBucketNotificationsArgs" is good here
var listenArgs = new ListenBucketNotificationsArgs()
.WithEvents(eventsList);
var events = minio.ListenNotifications(listenArgs);
var eventDetected = false;
using (var subscription = events.Subscribe(
received.Add,
_ => { },
() => { }))
{
await Task.Delay(200).ConfigureAwait(false);
// Trigger the event by creating a new bucket
_ = await CreateBucket_Tester(minio, bucketName).ConfigureAwait(false);
}
var events = minio.ListenNotificationsAsync(eventsList);
var subscription = events.Subscribe(
received.Add,
ex => Console.WriteLine($"OnError: {ex}"),
() => Console.WriteLine("Stopped listening for bucket notifications\n"));

// Ensure the subscription is established
await Task.Delay(1000).ConfigureAwait(false);

// Trigger the event by creating a new bucket
var isBucketCreated1 = await CreateBucket_Tester(minio, bucketName).ConfigureAwait(false);

var eventDetected = false;
for (var attempt = 0; attempt < 20; attempt++)
// Check if there is a caught event
if (received.Count == 1)
{
if (received.Count > 0)
{
var notification = JsonSerializer.Deserialize<MinioNotification>(received[0].Json);

if (notification.Records is not null)
{
Assert.AreEqual(1, notification.Records.Count);
Assert.IsTrue(notification.Records[0].EventName
.Contains("s3:BucketCreated:*", StringComparison.OrdinalIgnoreCase));
eventDetected = true;
break;
}
}

if (eventDetected)
new MintLogger(nameof(ListenNotifications_Test1),
listenNotificationsSignature,
"Tests whether ListenNotifications notifies user about \"BucketCreatedAll\" event",
TestStatus.PASS, DateTime.Now - startTime, args: args).Log();
else
throw new UnexpectedMinioException(
"Failed to detect the expected notification event, \"BucketCreatedAll\".");
await Task.Delay(500).ConfigureAwait(false); // Delay between attempts
}

subscription.Dispose();
if (!eventDetected)
throw new UnexpectedMinioException("Failed to detect the expected bucket notification event.");

new MintLogger(nameof(ListenNotificationsAsync_Test1),
listenNotificationsSignature,
"Tests whether ListenNotifications passes",
TestStatus.PASS, DateTime.Now - startTime, args: args).Log();
}
catch (NotImplementedException ex)
{
new MintLogger(nameof(ListenNotificationsAsync_Test1),
listenNotificationsSignature,
"Tests whether ListenNotifications passes",
TestStatus.NA, DateTime.Now - startTime, ex.Message,
ex.ToString(), args: args).Log();
}
catch (Exception ex)
{
new MintLogger(nameof(ListenNotifications_Test1),
new MintLogger(nameof(ListenNotificationsAsync_Test1),
listenNotificationsSignature,
"Tests whether ListenNotifications notifies user about \"BucketCreatedAll\" event",
"Tests whether ListenNotifications passes",
TestStatus.FAIL, DateTime.Now - startTime, ex.Message,
ex.ToString(), args: args).Log();
throw;
Expand Down Expand Up @@ -2729,26 +2738,57 @@ internal static async Task ListenBucketNotificationsAsync_Test1(IMinioClient min
var received = new List<MinioNotificationRaw>();

var eventsList = new List<EventType> { EventType.ObjectCreatedAll };
var eventDetected = false;

var listenArgs = new ListenBucketNotificationsArgs()
.WithBucket(bucketName)
.WithEvents(eventsList);
var events = minio.ListenBucketNotificationsAsync(listenArgs);
using (var subscription = events.Subscribe(
received.Add,
_ => { },
() => { }))
{
_ = await PutObject_Tester(minio, bucketName, objectName, null, contentType,
0, null, rsg.GenerateStreamFromSeed(1 * KB)).ConfigureAwait(false);
}
var subscription = events.Subscribe(
received.Add,
ex => { },
() => { }
);

for (var attempt = 0; attempt < 20; attempt++)
// Check if there is a caught event
if (received.Count == 1)

_ = await PutObject_Tester(minio, bucketName, objectName, null, contentType,
0, null, rsg.GenerateStreamFromSeed(1 * KB)).ConfigureAwait(false);

// wait for notifications
var eventDetected = false;
for (var attempt = 0; attempt < 10; attempt++)
if (received.Count > 0)
{
// Check if there is any unexpected error returned
// and captured in the receivedJson list, like
// "NotImplemented" api error. If so, we throw an exception
// and skip running this test
if (received.Count > 1 &&
received[1].Json.StartsWith("<Error><Code>", StringComparison.OrdinalIgnoreCase))
{
// Although the attribute is called "json",
// returned data in list "received" is in xml
// format and it is an error.Here, we convert xml
// into json format.
var receivedJson = XmlStrToJsonStr(received[1].Json);

// Cleanup the "Error" key encapsulating "receivedJson"
// data. This is required to match and convert json data
// "receivedJson" into class "ErrorResponse"
var len = "{'Error':".Length;
var trimmedFront = receivedJson[len..];
var trimmedFull = trimmedFront[..^1];

var err = JsonSerializer.Deserialize<ErrorResponse>(trimmedFull);

Exception ex = new UnexpectedMinioException(err.Message);
if (string.Equals(err.Code, "NotImplemented", StringComparison.OrdinalIgnoreCase))
ex = new NotImplementedException(err.Message);
await TearDown(minio, bucketName).ConfigureAwait(false);
throw ex;
}

var notification = JsonSerializer.Deserialize<MinioNotification>(received[0].Json);

if (notification.Records is not null)
{
Assert.AreEqual(1, notification.Records.Count);
Expand All @@ -2764,19 +2804,20 @@ internal static async Task ListenBucketNotificationsAsync_Test1(IMinioClient min
}
}

if (eventDetected)
new MintLogger(nameof(ListenBucketNotificationsAsync_Test1),
listenBucketNotificationsSignature,
"Tests whether ListenBucketNotifications passes for small object",
TestStatus.PASS, DateTime.Now - startTime, args: args).Log();
else
// subscription.Dispose();
if (!eventDetected)
throw new UnexpectedMinioException("Failed to detect the expected bucket notification event.");

new MintLogger(nameof(ListenBucketNotificationsAsync_Test1),
listenBucketNotificationsSignature,
"Tests whether ListenBucketNotifications passes for small object",
TestStatus.PASS, DateTime.Now - startTime, args: args).Log();
}
catch (NotImplementedException ex)
{
new MintLogger(nameof(ListenBucketNotificationsAsync_Test1),
listenBucketNotificationsSignature,
"Tests whether ListenBucketNotifications generates notification for a newly created object.",
"Tests whether ListenBucketNotifications passes for small object",
TestStatus.NA, DateTime.Now - startTime, ex.Message,
ex.ToString(), args: args).Log();
}
Expand All @@ -2800,14 +2841,14 @@ static bool isAWS(string endPoint)
// This is a PASS
new MintLogger(nameof(ListenBucketNotificationsAsync_Test1),
listenBucketNotificationsSignature,
"Tests whether ListenBucketNotifications generates notification for a newly created object.",
"Tests whether ListenBucketNotifications passes for small object",
TestStatus.PASS, DateTime.Now - startTime, args: args).Log();
}
else
{
new MintLogger(nameof(ListenBucketNotificationsAsync_Test1),
listenBucketNotificationsSignature,
"Tests whether ListenBucketNotifications generates notification for a newly created object.",
"Tests whether ListenBucketNotifications passes for small object",
TestStatus.FAIL, DateTime.Now - startTime, ex.Message,
ex.ToString(), args: args).Log();
throw;
Expand Down
2 changes: 1 addition & 1 deletion Minio.Functional.Tests/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public static async Task Main(string[] args)
ConcurrentBag<Task> functionalTestTasks = new();

// Global Notification
await FunctionalTest.ListenNotifications_Test1(minioClient).ConfigureAwait(false);
await FunctionalTest.ListenNotificationsAsync_Test1(minioClient).ConfigureAwait(false);

// Try catch as 'finally' section needs to run in the Functional Tests
// Bucket notification is a minio specific feature.
Expand Down
5 changes: 3 additions & 2 deletions Minio/ApiEndpoints/BucketOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -762,12 +762,13 @@ await this.ExecuteTaskAsync(ResponseErrorHandlers, requestMessageBuilder,
/// <summary>
/// Subscribes to global change notifications (a Minio-only extension)
/// </summary>
/// <param name="args">ListenBucketNotificationsArgs to listen events</param>
/// <param name="events">Events to listen for</param>
/// <param name="cancellationToken">Optional cancellation token to cancel the operation</param>
/// <returns>An observable of JSON-based notification events</returns>
public IObservable<MinioNotificationRaw> ListenNotifications(ListenBucketNotificationsArgs args,
public IObservable<MinioNotificationRaw> ListenNotificationsAsync(IList<EventType> events,
CancellationToken cancellationToken = default)
{
var args = new ListenBucketNotificationsArgs().WithEvents(events);
return ListenBucketNotificationsAsync(args, cancellationToken);
}

Expand Down
2 changes: 1 addition & 1 deletion Minio/ApiEndpoints/IBucketOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ Task<ReplicationConfiguration> GetBucketReplicationAsync(GetBucketReplicationArg

Task<string> GetPolicyAsync(GetPolicyArgs args, CancellationToken cancellationToken = default);

IObservable<MinioNotificationRaw> ListenNotifications(ListenBucketNotificationsArgs args,
IObservable<MinioNotificationRaw> ListenNotificationsAsync(IList<EventType> events,
CancellationToken cancellationToken = default);

IObservable<MinioNotificationRaw> ListenBucketNotificationsAsync(string bucketName, IList<EventType> events,
Expand Down

0 comments on commit 90d87ed

Please sign in to comment.