Skip to content

Commit 3fb75fa

Browse files
authored
Merge pull request #61 from deveel/41-batch-sending-of-webhooks
Batch Sending of Webhooks
2 parents 6ca5334 + d772220 commit 3fb75fa

29 files changed

+788
-160
lines changed

samples/WebhookNotifierApp/Services/UserCreatedWebhookFactory.cs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,22 @@ public UserCreatedWebhookFactory(IUserResolver userResolver) {
88
this.userResolver = userResolver;
99
}
1010

11-
public async Task<IdentityWebhook> CreateAsync(IWebhookSubscription subscription, EventInfo eventInfo, CancellationToken cancellationToken = default) {
12-
var userCreated = (UserCreatedEvent?)eventInfo.Data;
11+
public async Task<IList<IdentityWebhook>> CreateAsync(IWebhookSubscription subscription, EventNotification notification, CancellationToken cancellationToken = default) {
12+
var @event = notification.Events[0];
13+
14+
var userCreated = (UserCreatedEvent?)@event.Data;
1315
var user = await userResolver.ResolveUserAsync(userCreated!.UserId, cancellationToken);
1416

1517
if (user == null)
1618
throw new InvalidOperationException();
1719

18-
return new IdentityWebhook {
19-
EventId = eventInfo.Id,
20-
EventType = "user_created",
21-
TimeStamp = eventInfo.TimeStamp,
22-
User = user
20+
return new [] {
21+
new IdentityWebhook {
22+
EventId = @event.Id,
23+
EventType = @event.EventType,
24+
TimeStamp = @event.TimeStamp,
25+
User = user
26+
}
2327
};
2428
}
2529
}

src/Deveel.Webhooks.DynamicLinq/Webhooks/LinqWebhookFilterEvaluator.cs

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
using System.Collections.Concurrent;
1516
using System.Linq.Dynamic.Core;
1617
using System.Linq.Expressions;
1718

@@ -23,7 +24,7 @@ namespace Deveel.Webhooks {
2324
/// </summary>
2425
/// <typeparam name="TWebhook"></typeparam>
2526
public sealed class LinqWebhookFilterEvaluator<TWebhook> : IWebhookFilterEvaluator<TWebhook> where TWebhook : class {
26-
private readonly IDictionary<string, Func<object, bool>> filterCache;
27+
private readonly IDictionary<FilterKey, Func<object, bool>> filterCache;
2728
private readonly WebhookSenderOptions<TWebhook> senderOptions;
2829

2930
/// <summary>
@@ -35,7 +36,7 @@ public sealed class LinqWebhookFilterEvaluator<TWebhook> : IWebhookFilterEvaluat
3536
/// the filter evaluator.
3637
/// </param>
3738
public LinqWebhookFilterEvaluator(IOptions<WebhookSenderOptions<TWebhook>> senderOptions) {
38-
filterCache = new Dictionary<string, Func<object, bool>>();
39+
filterCache = new ConcurrentDictionary<FilterKey, Func<object, bool>>();
3940
this.senderOptions = senderOptions.Value;
4041
}
4142

@@ -51,15 +52,16 @@ static LinqWebhookFilterEvaluator() {
5152
string IWebhookFilterEvaluator<TWebhook>.Format => "linq";
5253

5354
private Func<object, bool> Compile(Type objType, string filter) {
54-
if (!filterCache.TryGetValue(filter, out var compiled)) {
55+
var key = new FilterKey(objType.FullName!, filter);
56+
if (!filterCache.TryGetValue(key, out var compiled)) {
5557
var config = ParsingConfig.Default;
5658

5759
var parameters = new[] {
5860
Expression.Parameter(objType, "hook")
5961
};
6062
var parsed = DynamicExpressionParser.ParseLambda(config, parameters, typeof(bool), filter).Compile();
6163
compiled = hook => (bool)(parsed.DynamicInvoke(hook)!);
62-
filterCache[filter] = compiled;
64+
filterCache[key] = compiled;
6365
}
6466

6567
return compiled;
@@ -84,10 +86,8 @@ private Func<object, bool> Compile(Type objType, IList<string> filters) {
8486

8587
/// <inheritdoc/>
8688
public async Task<bool> MatchesAsync(WebhookSubscriptionFilter filter, TWebhook webhook, CancellationToken cancellationToken) {
87-
if (filter is null)
88-
throw new ArgumentNullException(nameof(filter));
89-
if (webhook is null)
90-
throw new ArgumentNullException(nameof(webhook));
89+
ArgumentNullException.ThrowIfNull(filter, nameof(filter));
90+
ArgumentNullException.ThrowIfNull(webhook, nameof(webhook));
9191

9292
if (filter.FilterFormat != "linq")
9393
throw new ArgumentException($"Filter format '{filter.FilterFormat}' not supported by the LINQ evaluator");
@@ -111,7 +111,27 @@ public async Task<bool> MatchesAsync(WebhookSubscriptionFilter filter, TWebhook
111111
} catch(Exception ex) {
112112
throw new WebhookException("Unable to evaluate the filter", ex);
113113
}
114+
}
115+
116+
readonly struct FilterKey {
117+
public FilterKey(string typeName, string filter) : this() {
118+
TypeName = typeName;
119+
Filter = filter;
120+
}
121+
122+
public string TypeName { get; }
114123

124+
public string Filter { get; }
125+
126+
public override bool Equals(object? obj) {
127+
return obj is FilterKey key &&
128+
TypeName == key.TypeName &&
129+
Filter == key.Filter;
130+
}
131+
132+
public override int GetHashCode() {
133+
return HashCode.Combine(TypeName, Filter);
134+
}
115135
}
116136
}
117137
}

src/Deveel.Webhooks.Service.MongoDb/Webhooks/DefaultMongoWebhookConverter.cs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ public class DefaultMongoWebhookConverter<TWebhook> : IMongoWebhookConverter<TWe
3232
/// Converts the given <see cref="EventInfo"/> and the webhook object into
3333
/// a <see cref="MongoWebhook"/> object.
3434
/// </summary>
35-
/// <param name="eventInfo">
36-
/// The event information that is being notified.
35+
/// <param name="notification">
36+
/// The event notification that was sent to the subscribers.
3737
/// </param>
3838
/// <param name="webhook">
3939
/// The webhook that was notified to the subscribers.
@@ -42,7 +42,7 @@ public class DefaultMongoWebhookConverter<TWebhook> : IMongoWebhookConverter<TWe
4242
/// Returns an instance of <see cref="MongoWebhook"/> that represents the
4343
/// webhook that can be stored into the database.
4444
/// </returns>
45-
public MongoWebhook ConvertWebhook(EventInfo eventInfo, TWebhook webhook) {
45+
public MongoWebhook ConvertWebhook(EventNotification notification, TWebhook webhook) {
4646
if (webhook is IWebhook obj) {
4747
return new MongoWebhook {
4848
WebhookId = obj.Id,
@@ -52,10 +52,14 @@ public MongoWebhook ConvertWebhook(EventInfo eventInfo, TWebhook webhook) {
5252
};
5353
}
5454

55+
// TODO: we should support multiple events in a single notification
56+
57+
var firstEvent = notification.Events.First();
58+
5559
return new MongoWebhook {
56-
EventType = eventInfo.EventType,
57-
TimeStamp = eventInfo.TimeStamp,
58-
WebhookId = eventInfo.Id,
60+
EventType = notification.EventType,
61+
TimeStamp = firstEvent.TimeStamp,
62+
WebhookId = notification.NotificationId,
5963
Data = BsonValueUtil.ConvertData(webhook)
6064
};
6165
}

src/Deveel.Webhooks.Service.MongoDb/Webhooks/IWebhookConverter.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,8 @@ public interface IMongoWebhookConverter<TWebhook>
2626
/// Converts the given webhook to an object that can be stored
2727
/// in a MongoDB database.
2828
/// </summary>
29-
/// <param name="eventInfo">
30-
/// The information about the event that triggered the
31-
/// notification of the webhook.
29+
/// <param name="notification">
30+
/// The event notification that was sent to the subscribers.
3231
/// </param>
3332
/// <param name="webhook">
3433
/// The instance of the webhook to be converted.
@@ -37,6 +36,6 @@ public interface IMongoWebhookConverter<TWebhook>
3736
/// Returns an instance of <see cref="MongoWebhook"/>
3837
/// that can be stored in a MongoDB database.
3938
/// </returns>
40-
MongoWebhook ConvertWebhook(EventInfo eventInfo, TWebhook webhook);
39+
MongoWebhook ConvertWebhook(EventNotification notification, TWebhook webhook);
4140
}
4241
}

src/Deveel.Webhooks.Service.MongoDb/Webhooks/MongoDbWebhookDeliveryResultLogger.cs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,9 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
using System.Collections;
16-
using System.Reflection;
17-
1815
using Microsoft.Extensions.Logging;
1916
using Microsoft.Extensions.Logging.Abstractions;
2017

21-
using MongoDB.Bson;
22-
2318
namespace Deveel.Webhooks {
2419
/// <summary>
2520
/// An implementation of <see cref="IWebhookDeliveryResultLogger{TWebhook}"/> that
@@ -75,6 +70,9 @@ public MongoDbWebhookDeliveryResultLogger(
7570
/// Converts the given result to an object that can be stored in the
7671
/// MongoDB database collection.
7772
/// </summary>
73+
/// <param name="notification">
74+
/// The aggregate of the events that are being delivered to the receiver.
75+
/// </param>
7876
/// <param name="eventInfo">
7977
/// The information about the event that triggered the delivery of the webhook.
8078
/// </param>
@@ -87,14 +85,14 @@ public MongoDbWebhookDeliveryResultLogger(
8785
/// <returns>
8886
/// Returns an object that can be stored in the MongoDB database collection.
8987
/// </returns>
90-
protected virtual TResult ConvertResult(EventInfo eventInfo, IWebhookSubscription subscription, WebhookDeliveryResult<TWebhook> result) {
88+
protected virtual TResult ConvertResult(EventNotification notification, EventInfo eventInfo, IWebhookSubscription subscription, WebhookDeliveryResult<TWebhook> result) {
9189
var obj = new TResult();
9290

9391
obj.TenantId = subscription.TenantId;
9492
obj.OperationId = result.OperationId;
9593
obj.EventInfo = CreateEvent(eventInfo);
9694
obj.Receiver = CreateReceiver(subscription);
97-
obj.Webhook = ConvertWebhook(eventInfo, result.Webhook);
95+
obj.Webhook = ConvertWebhook(notification, result.Webhook);
9896
obj.DeliveryAttempts = result.Attempts?.Select(ConvertDeliveryAttempt).ToList()
9997
?? new List<MongoWebhookDeliveryAttempt>();
10098

@@ -162,8 +160,8 @@ protected virtual MongoWebhookDeliveryAttempt ConvertDeliveryAttempt(WebhookDeli
162160
/// Converts the given webhook to an object that can be stored in the
163161
/// MongoDB database collection.
164162
/// </summary>
165-
/// <param name="eventInfo">
166-
/// The information about the event that triggered the delivery of the webhook.
163+
/// <param name="notification">
164+
/// The aggregate of the events that are being delivered to the receiver.
167165
/// </param>
168166
/// <param name="webhook">
169167
/// The instance of the webhook to convert.
@@ -175,15 +173,15 @@ protected virtual MongoWebhookDeliveryAttempt ConvertDeliveryAttempt(WebhookDeli
175173
/// Thrown when the given type of webhook is not supported by this instance and
176174
/// no converter was provided.
177175
/// </exception>
178-
protected virtual MongoWebhook ConvertWebhook(EventInfo eventInfo, TWebhook webhook) {
176+
protected virtual MongoWebhook ConvertWebhook(EventNotification notification, TWebhook webhook) {
179177
if (webhookConverter != null)
180-
return webhookConverter.ConvertWebhook(eventInfo, webhook);
178+
return webhookConverter.ConvertWebhook(notification, webhook);
181179

182180
throw new NotSupportedException("The given type of webhook is not supported by this instance of the logger");
183181
}
184182

185183
/// <inheritdoc/>
186-
public async Task LogResultAsync(EventInfo eventInfo, IWebhookSubscription subscription, WebhookDeliveryResult<TWebhook> result, CancellationToken cancellationToken) {
184+
public async Task LogResultAsync(EventNotification notification, IWebhookSubscription subscription, WebhookDeliveryResult<TWebhook> result, CancellationToken cancellationToken) {
187185
ArgumentNullException.ThrowIfNull(result, nameof(result));
188186
ArgumentNullException.ThrowIfNull(subscription, nameof(subscription));
189187

@@ -195,11 +193,11 @@ public async Task LogResultAsync(EventInfo eventInfo, IWebhookSubscription subsc
195193
typeof(TWebhook), subscription.TenantId);
196194

197195
try {
198-
var resultObj = ConvertResult(eventInfo, subscription, result);
196+
var results = notification.Select(e => ConvertResult(notification, e, subscription, result));
199197

200198
var repository = await RepositoryProvider.GetRepositoryAsync(subscription.TenantId, cancellationToken);
201199

202-
await repository.AddAsync(resultObj, cancellationToken);
200+
await repository.AddRangeAsync(results, cancellationToken);
203201
} catch (Exception ex) {
204202
Logger.LogError(ex, "Could not log the result of the delivery of the Webhook of type '{WebhookType}' for tenant '{TenantId}' because of an error",
205203
typeof(TWebhook), subscription.TenantId);

src/Deveel.Webhooks.Service.MongoDb/Webhooks/MongoDbWebhookStorageBuilder.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ public MongoDbWebhookStorageBuilder<TSubscription> UseWebhookConverter<TWebhook,
202202
/// <returns>
203203
/// Returns the current instance of the builder for chaining.
204204
/// </returns>
205-
public MongoDbWebhookStorageBuilder<TSubscription> UseWebhookConverter<TWebhook>(Func<EventInfo, TWebhook, MongoWebhook> converter)
205+
public MongoDbWebhookStorageBuilder<TSubscription> UseWebhookConverter<TWebhook>(Func<EventNotification, TWebhook, MongoWebhook> converter)
206206
where TWebhook : class {
207207

208208
Services.AddSingleton<IMongoWebhookConverter<TWebhook>>(new MongoWebhookConverterWrapper<TWebhook>(converter));
@@ -211,13 +211,14 @@ public MongoDbWebhookStorageBuilder<TSubscription> UseWebhookConverter<TWebhook>
211211
}
212212

213213
private class MongoWebhookConverterWrapper<TWebhook> : IMongoWebhookConverter<TWebhook> where TWebhook : class {
214-
private readonly Func<EventInfo, TWebhook, MongoWebhook> converter;
214+
private readonly Func<EventNotification, TWebhook, MongoWebhook> converter;
215215

216-
public MongoWebhookConverterWrapper(Func<EventInfo, TWebhook, MongoWebhook> converter) {
216+
public MongoWebhookConverterWrapper(Func<EventNotification, TWebhook, MongoWebhook> converter) {
217217
this.converter = converter;
218218
}
219219

220-
public MongoWebhook ConvertWebhook(EventInfo eventInfo, TWebhook webhook) => converter.Invoke(eventInfo, webhook);
220+
public MongoWebhook ConvertWebhook(EventNotification notification, TWebhook webhook)
221+
=> converter.Invoke(notification, webhook);
221222
}
222223
}
223224
}

src/Deveel.Webhooks.Service/Webhooks/WebhookSubscriptionResolver.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public async Task<IList<IWebhookSubscription>> ResolveSubscriptionsAsync(string
7676
var list = await GetCachedAsync(eventType, cancellationToken);
7777

7878
if (list == null) {
79-
logger.LogTrace("No webhook subscriptions to event {EventType} of tenant {TenantId} were found in cache", eventType);
79+
logger.LogTrace("No webhook subscriptions to event {EventType} were found in cache", eventType);
8080

8181
var result = await repository.GetByEventTypeAsync(eventType, activeOnly, cancellationToken);
8282
list = result.Cast<IWebhookSubscription>().ToList();

src/Deveel.Webhooks/Webhooks/DefaultWebhookFactory.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,16 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
using Microsoft.Extensions.Options;
16+
1517
namespace Deveel.Webhooks {
1618
/// <summary>
1719
/// A default implementation of the <see cref="IWebhookFactory{TWebhook}"/>
1820
/// that creates a <see cref="Webhook"/> instance using the information
1921
/// provided by the subscription and the event.
2022
/// </summary>
2123
public sealed class DefaultWebhookFactory : DefaultWebhookFactory<Webhook> {
24+
public DefaultWebhookFactory(IOptions<WebhookFactoryOptions<Webhook>> options) : base(options) {
25+
}
2226
}
2327
}

0 commit comments

Comments
 (0)