Skip to content
This repository has been archived by the owner on Feb 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #33 from imranmomin/develop-improvements
Browse files Browse the repository at this point in the history
Develop improvements - 3.0.0
  • Loading branch information
imranmomin authored Dec 30, 2018
2 parents bbb3883 + 78a5eec commit 6825038
Show file tree
Hide file tree
Showing 12 changed files with 137 additions and 293 deletions.
17 changes: 2 additions & 15 deletions Hangfire.AzureDocumentDB/CountersAggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@ internal class CountersAggregator : IServerComponent
private const string DISTRIBUTED_LOCK_KEY = "locks:counters:aggragator";
private readonly TimeSpan defaultLockTimeout;
private readonly DocumentDbStorage storage;
private readonly Uri spDeleteDocumentsUri;

public CountersAggregator(DocumentDbStorage storage)
{
this.storage = storage ?? throw new ArgumentNullException(nameof(storage));
defaultLockTimeout = TimeSpan.FromSeconds(30) + storage.Options.CountersAggregateInterval;
spDeleteDocumentsUri = UriFactory.CreateStoredProcedureUri(storage.Options.DatabaseName, storage.Options.CollectionName, "deleteDocuments");
}

public void Execute(CancellationToken cancellationToken)
Expand Down Expand Up @@ -97,21 +95,10 @@ public void Execute(CancellationToken cancellationToken)
{
if (t.Result.StatusCode == HttpStatusCode.Created || t.Result.StatusCode == HttpStatusCode.OK)
{
int deleted = 0;
ProcedureResponse response;
string ids = string.Join(",", data.Counters.Select(c => $"'{c.Id}'").ToArray());
string sql = $"SELECT doc._self FROM doc WHERE doc.type = {(int)DocumentTypes.Counter} AND doc.counter_type = {(int)CounterTypes.Raw} AND doc.id IN ({ids})";

do
{
Task<StoredProcedureResponse<ProcedureResponse>> procedureTask = storage.Client.ExecuteStoredProcedureWithRetriesAsync<ProcedureResponse>(spDeleteDocumentsUri, sql);
procedureTask.Wait(cancellationToken);

response = procedureTask.Result;
deleted += response.Affected;

} while (response.Continuation); // if the continuation is true; run the procedure again
string query = $"SELECT doc._self FROM doc WHERE doc.type = {(int)DocumentTypes.Counter} AND doc.counter_type = {(int)CounterTypes.Raw} AND doc.id IN ({ids})";

int deleted = storage.Client.ExecuteDeleteDocuments(query);

logger.Trace($"Total {deleted} records from the 'Counter:{aggregated.Key}' were aggregated.");
}
Expand Down
54 changes: 15 additions & 39 deletions Hangfire.AzureDocumentDB/DocumentDbConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,16 @@ public override List<string> GetRangeFromSet(string key, int startingFrom, int e
{
if (key == null) throw new ArgumentNullException(nameof(key));

return Storage.Client.CreateDocumentQuery<Set>(Storage.CollectionUri)
FeedOptions feedOptions = new FeedOptions { MaxItemCount = endingAt + 1 };
endingAt += 1 - startingFrom;

return Storage.Client.CreateDocumentQuery<Set>(Storage.CollectionUri, feedOptions)
.Where(s => s.DocumentType == DocumentTypes.Set && s.Key == key)
.OrderBy(s => s.Score)
.ToQueryResult()
.OrderBy(s => s.CreatedOn)
.Select((s, i) => new { s.Value, Index = i })
.Where(s => s.Index >= startingFrom && s.Index <= endingAt)
.Select(s => s.Value)
.ToQueryResult()
.Skip(startingFrom)
.Take(endingAt)
.ToList();
}

Expand Down Expand Up @@ -347,25 +349,11 @@ public override int RemoveTimedOutServers(TimeSpan timeOut)
throw new ArgumentException(@"invalid timeout", nameof(timeOut));
}

int removed = 0;
ProcedureResponse response;
int lastHeartbeat = DateTime.UtcNow.Add(timeOut.Negate()).ToEpoch();

string query = $"SELECT doc._self FROM doc WHERE doc.type = {(int)DocumentTypes.Server} AND IS_DEFINED(doc.last_heartbeat) " +
$"AND doc.last_heartbeat <= {lastHeartbeat}";
Uri spDeleteDocuments = UriFactory.CreateStoredProcedureUri(Storage.Options.DatabaseName, Storage.Options.CollectionName, "deleteDocuments");

do
{
Task<StoredProcedureResponse<ProcedureResponse>> task = Storage.Client.ExecuteStoredProcedureWithRetriesAsync<ProcedureResponse>(spDeleteDocuments, query);
task.Wait();

response = task.Result;
removed += response.Affected;

} while (response.Continuation); // if the continuation is true; run the procedure again

return removed;
return Storage.Client.ExecuteDeleteDocuments(query);
}

#endregion
Expand Down Expand Up @@ -415,21 +403,7 @@ public override void SetRangeInHash(string key, IEnumerable<KeyValuePair<string,
}
}

int affected = 0;
Uri spUpsertDocumentsUri = UriFactory.CreateStoredProcedureUri(Storage.Options.DatabaseName, Storage.Options.CollectionName, "upsertDocuments");

do
{
// process only remaining items
data.Items = data.Items.Skip(affected).ToList();

Task<StoredProcedureResponse<int>> task = Storage.Client.ExecuteStoredProcedureWithRetriesAsync<int>(spUpsertDocumentsUri, data);
task.Wait();

// know how much was processed
affected += task.Result;

} while (affected < data.Items.Count);
Storage.Client.ExecuteUpsertDocuments(data);
}

public override long GetHashCount(string key)
Expand Down Expand Up @@ -512,14 +486,16 @@ public override List<string> GetRangeFromList(string key, int startingFrom, int
{
if (key == null) throw new ArgumentNullException(nameof(key));

return Storage.Client.CreateDocumentQuery<List>(Storage.CollectionUri)
FeedOptions feedOptions = new FeedOptions { MaxItemCount = endingAt + 1 };
endingAt += 1 - startingFrom;

return Storage.Client.CreateDocumentQuery<List>(Storage.CollectionUri, feedOptions)
.Where(l => l.DocumentType == DocumentTypes.List && l.Key == key)
.OrderByDescending(l => l.CreatedOn)
.Select(l => l.Value)
.ToQueryResult()
.Select((l, i) => new { Value = l, Index = i })
.Where(l => l.Index >= startingFrom && l.Index <= endingAt)
.Select(l => l.Value)
.Skip(startingFrom)
.Take(endingAt)
.ToList();
}

Expand Down
5 changes: 4 additions & 1 deletion Hangfire.AzureDocumentDB/DocumentDbStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
using Hangfire.Logging;
using Newtonsoft.Json;
using Microsoft.Azure.Documents;
using Newtonsoft.Json.Serialization;
using Microsoft.Azure.Documents.Client;

using Hangfire.Azure.Queue;
using Newtonsoft.Json.Serialization;
using Hangfire.Azure.Helper;

namespace Hangfire.Azure
{
Expand Down Expand Up @@ -70,6 +71,8 @@ public DocumentDbStorage(string url, string authSecret, string database, string
Task continueTask = task.ContinueWith(t => Initialize(), TaskContinuationOptions.OnlyOnRanToCompletion);
continueTask.Wait();

StoredprocedureHelper.Setup(database, collection);

JobQueueProvider provider = new JobQueueProvider(this);
QueueProviders = new PersistentJobQueueProviderCollection(provider);
}
Expand Down
Loading

0 comments on commit 6825038

Please sign in to comment.