From b3862ad0ed809c238dfbe196bea1cad28bac5d8e Mon Sep 17 00:00:00 2001 From: imranmomin Date: Tue, 25 Dec 2018 16:54:42 -0500 Subject: [PATCH 1/6] moved all the common stored procedures into a helper class --- .../CountersAggregator.cs | 17 +- .../DocumentDbConnection.cs | 32 +-- .../DocumentDbWriteOnlyTransaction.cs | 217 ++---------------- Hangfire.AzureDocumentDB/ExpirationManager.cs | 19 +- .../Helper/StoredprocedureHelper.cs | 76 ++++++ 5 files changed, 99 insertions(+), 262 deletions(-) create mode 100644 Hangfire.AzureDocumentDB/Helper/StoredprocedureHelper.cs diff --git a/Hangfire.AzureDocumentDB/CountersAggregator.cs b/Hangfire.AzureDocumentDB/CountersAggregator.cs index adb2f10..d9c5622 100644 --- a/Hangfire.AzureDocumentDB/CountersAggregator.cs +++ b/Hangfire.AzureDocumentDB/CountersAggregator.cs @@ -23,13 +23,11 @@ internal class CountersAggregator : IServerComponent private const string DISTRIBUTED_LOCK_KEY = "locks:counters:aggragator"; private readonly TimeSpan defaultLockTimeout; private readonly DocumentDbStorage storage; - private readonly Uri spDeleteDocumentsUri; public CountersAggregator(DocumentDbStorage storage) { this.storage = storage ?? throw new ArgumentNullException(nameof(storage)); defaultLockTimeout = TimeSpan.FromSeconds(30) + storage.Options.CountersAggregateInterval; - spDeleteDocumentsUri = UriFactory.CreateStoredProcedureUri(storage.Options.DatabaseName, storage.Options.CollectionName, "deleteDocuments"); } public void Execute(CancellationToken cancellationToken) @@ -97,21 +95,10 @@ public void Execute(CancellationToken cancellationToken) { if (t.Result.StatusCode == HttpStatusCode.Created || t.Result.StatusCode == HttpStatusCode.OK) { - int deleted = 0; - ProcedureResponse response; string ids = string.Join(",", data.Counters.Select(c => $"'{c.Id}'").ToArray()); - string sql = $"SELECT doc._self FROM doc WHERE doc.type = {(int)DocumentTypes.Counter} AND doc.counter_type = {(int)CounterTypes.Raw} AND doc.id IN ({ids})"; - - do - { - Task> procedureTask = storage.Client.ExecuteStoredProcedureWithRetriesAsync(spDeleteDocumentsUri, sql); - procedureTask.Wait(cancellationToken); - - response = procedureTask.Result; - deleted += response.Affected; - - } while (response.Continuation); // if the continuation is true; run the procedure again + string query = $"SELECT doc._self FROM doc WHERE doc.type = {(int)DocumentTypes.Counter} AND doc.counter_type = {(int)CounterTypes.Raw} AND doc.id IN ({ids})"; + int deleted = storage.Client.ExecuteDeleteDocuments(query); logger.Trace($"Total {deleted} records from the 'Counter:{aggregated.Key}' were aggregated."); } diff --git a/Hangfire.AzureDocumentDB/DocumentDbConnection.cs b/Hangfire.AzureDocumentDB/DocumentDbConnection.cs index 7ef1f89..fa64288 100644 --- a/Hangfire.AzureDocumentDB/DocumentDbConnection.cs +++ b/Hangfire.AzureDocumentDB/DocumentDbConnection.cs @@ -347,25 +347,11 @@ public override int RemoveTimedOutServers(TimeSpan timeOut) throw new ArgumentException(@"invalid timeout", nameof(timeOut)); } - int removed = 0; - ProcedureResponse response; int lastHeartbeat = DateTime.UtcNow.Add(timeOut.Negate()).ToEpoch(); - string query = $"SELECT doc._self FROM doc WHERE doc.type = {(int)DocumentTypes.Server} AND IS_DEFINED(doc.last_heartbeat) " + $"AND doc.last_heartbeat <= {lastHeartbeat}"; - Uri spDeleteDocuments = UriFactory.CreateStoredProcedureUri(Storage.Options.DatabaseName, Storage.Options.CollectionName, "deleteDocuments"); - - do - { - Task> task = Storage.Client.ExecuteStoredProcedureWithRetriesAsync(spDeleteDocuments, query); - task.Wait(); - - response = task.Result; - removed += response.Affected; - - } while (response.Continuation); // if the continuation is true; run the procedure again - return removed; + return Storage.Client.ExecuteDeleteDocuments(query); } #endregion @@ -415,21 +401,7 @@ public override void SetRangeInHash(string key, IEnumerable> task = Storage.Client.ExecuteStoredProcedureWithRetriesAsync(spUpsertDocumentsUri, data); - task.Wait(); - - // know how much was processed - affected += task.Result; - - } while (affected < data.Items.Count); + Storage.Client.ExecuteUpsertDocuments(data); } public override long GetHashCount(string key) diff --git a/Hangfire.AzureDocumentDB/DocumentDbWriteOnlyTransaction.cs b/Hangfire.AzureDocumentDB/DocumentDbWriteOnlyTransaction.cs index ce129cc..ceddd32 100644 --- a/Hangfire.AzureDocumentDB/DocumentDbWriteOnlyTransaction.cs +++ b/Hangfire.AzureDocumentDB/DocumentDbWriteOnlyTransaction.cs @@ -19,19 +19,8 @@ internal class DocumentDbWriteOnlyTransaction : JobStorageTransaction { private readonly DocumentDbConnection connection; private readonly List commands = new List(); - private readonly Uri spPersistDocumentsUri; - private readonly Uri spExpireDocumentsUri; - private readonly Uri spDeleteDocumentsUri; - private readonly Uri spUpsertDocumentsUri; - public DocumentDbWriteOnlyTransaction(DocumentDbConnection connection) - { - this.connection = connection; - spPersistDocumentsUri = UriFactory.CreateStoredProcedureUri(connection.Storage.Options.DatabaseName, connection.Storage.Options.CollectionName, "persistDocuments"); - spExpireDocumentsUri = UriFactory.CreateStoredProcedureUri(connection.Storage.Options.DatabaseName, connection.Storage.Options.CollectionName, "expireDocuments"); - spDeleteDocumentsUri = UriFactory.CreateStoredProcedureUri(connection.Storage.Options.DatabaseName, connection.Storage.Options.CollectionName, "deleteDocuments"); - spUpsertDocumentsUri = UriFactory.CreateStoredProcedureUri(connection.Storage.Options.DatabaseName, connection.Storage.Options.CollectionName, "upsertDocuments"); - } + public DocumentDbWriteOnlyTransaction(DocumentDbConnection connection) => this.connection = connection; private void QueueCommand(Action command) => commands.Add(command); public override void Commit() => commands.ForEach(command => command()); @@ -142,17 +131,7 @@ public override void ExpireJob(string jobId, TimeSpan expireIn) { int epoch = DateTime.UtcNow.Add(expireIn).ToEpoch(); string query = $"SELECT * FROM doc WHERE doc.type = {(int)DocumentTypes.Job} AND doc.id = '{jobId}'"; - ProcedureResponse response; - - do - { - Task> task = connection.Storage.Client.ExecuteStoredProcedureWithRetriesAsync(spExpireDocumentsUri, query, epoch); - task.Wait(); - - response = task.Result; - - // if the continuation is true; run the procedure again - } while (response.Continuation); + connection.Storage.Client.ExecuteExpireDocuments(query, epoch); }); } @@ -163,17 +142,7 @@ public override void PersistJob(string jobId) QueueCommand(() => { string query = $"SELECT * FROM doc WHERE doc.type = {(int)DocumentTypes.Job} AND doc.id = '{jobId}'"; - ProcedureResponse response; - - do - { - Task> task = connection.Storage.Client.ExecuteStoredProcedureWithRetriesAsync(spPersistDocumentsUri, query); - task.Wait(); - - response = task.Result; - - // if the continuation is true; run the procedure again - } while (response.Continuation); + connection.Storage.Client.ExecutePersistDocuments(query); }); } @@ -247,18 +216,7 @@ public override void RemoveFromSet(string key, string value) string ids = string.Join(",", sets.Select(s => $"'{s}'")); string query = $"SELECT doc._self FROM doc WHERE doc.type = {(int)DocumentTypes.Set} AND doc.id IN ({ids})"; - ProcedureResponse response; - - do - { - Task> task = connection.Storage.Client.ExecuteStoredProcedureWithRetriesAsync(spDeleteDocumentsUri, query); - task.Wait(); - - response = task.Result; - - // if the continuation is true; run the procedure again - } while (response.Continuation); - + connection.Storage.Client.ExecuteDeleteDocuments(query); }); } @@ -293,21 +251,8 @@ public override void AddToSet(string key, string value, double score) sets.ForEach(s => s.Score = score); } - int affected = 0; Data data = new Data(sets); - - do - { - // process only remaining items - data.Items = data.Items.Skip(affected).ToList(); - - Task> task = connection.Storage.Client.ExecuteStoredProcedureWithRetriesAsync(spUpsertDocumentsUri, data); - task.Wait(); - - // know how much was processed - affected += task.Result; - - } while (affected < data.Items.Count); + connection.Storage.Client.ExecuteUpsertDocuments(data); }); } @@ -318,17 +263,7 @@ public override void PersistSet(string key) QueueCommand(() => { string query = $"SELECT * FROM doc WHERE doc.type = {(int)DocumentTypes.Set} AND doc.key = '{key}'"; - ProcedureResponse response; - - do - { - Task> task = connection.Storage.Client.ExecuteStoredProcedureWithRetriesAsync(spPersistDocumentsUri, query); - task.Wait(); - - response = task.Result; - - // if the continuation is true; run the procedure again - } while (response.Continuation); + connection.Storage.Client.ExecutePersistDocuments(query); }); } @@ -340,17 +275,7 @@ public override void ExpireSet(string key, TimeSpan expireIn) { string query = $"SELECT * FROM doc WHERE doc.type = {(int)DocumentTypes.Set} AND doc.key = '{key}'"; int epoch = DateTime.UtcNow.Add(expireIn).ToEpoch(); - ProcedureResponse response; - - do - { - Task> task = connection.Storage.Client.ExecuteStoredProcedureWithRetriesAsync(spExpireDocumentsUri, query, epoch); - task.Wait(); - - response = task.Result; - - // if the continuation is true; run the procedure again - } while (response.Continuation); + connection.Storage.Client.ExecuteExpireDocuments(query, epoch); }); } @@ -369,21 +294,8 @@ public override void AddRangeToSet(string key, IList items) CreatedOn = DateTime.UtcNow }).ToList(); - int affected = 0; Data data = new Data(sets); - - do - { - // process only remaining items - data.Items = data.Items.Skip(affected).ToList(); - - Task> task = connection.Storage.Client.ExecuteStoredProcedureWithRetriesAsync(spUpsertDocumentsUri, data); - task.Wait(); - - // know how much was processed - affected += task.Result; - - } while (affected < data.Items.Count); + connection.Storage.Client.ExecuteUpsertDocuments(data); }); } @@ -394,17 +306,7 @@ public override void RemoveSet(string key) QueueCommand(() => { string query = $"SELECT doc._self FROM doc WHERE doc.type = {(int)DocumentTypes.Set} AND doc.key == '{key}'"; - ProcedureResponse response; - - do - { - Task> task = connection.Storage.Client.ExecuteStoredProcedureWithRetriesAsync(spDeleteDocumentsUri, query); - task.Wait(); - - response = task.Result; - - // if the continuation is true; run the procedure again - } while (response.Continuation); + connection.Storage.Client.ExecuteDeleteDocuments(query); }); } @@ -419,17 +321,7 @@ public override void RemoveHash(string key) QueueCommand(() => { string query = $"SELECT doc._self FROM doc WHERE doc.type = {(int)DocumentTypes.Hash} AND doc.key == '{key}'"; - ProcedureResponse response; - - do - { - Task> task = connection.Storage.Client.ExecuteStoredProcedureWithRetriesAsync(spDeleteDocumentsUri, query); - task.Wait(); - - response = task.Result; - - // if the continuation is true; run the procedure again - } while (response.Continuation); + connection.Storage.Client.ExecuteDeleteDocuments(query); }); } @@ -467,20 +359,7 @@ public override void SetRangeInHash(string key, IEnumerable> task = connection.Storage.Client.ExecuteStoredProcedureWithRetriesAsync(spUpsertDocumentsUri, data); - task.Wait(); - - // know how much was processed - affected += task.Result; - - } while (affected < data.Items.Count); + connection.Storage.Client.ExecuteUpsertDocuments(data); }); } @@ -492,17 +371,7 @@ public override void ExpireHash(string key, TimeSpan expireIn) { string query = $"SELECT * FROM doc WHERE doc.type = {(int)DocumentTypes.Hash} AND doc.key = '{key}'"; int epoch = DateTime.UtcNow.Add(expireIn).ToEpoch(); - ProcedureResponse response; - - do - { - Task> task = connection.Storage.Client.ExecuteStoredProcedureWithRetriesAsync(spExpireDocumentsUri, query, epoch); - task.Wait(); - - response = task.Result; - - // if the continuation is true; run the procedure again - } while (response.Continuation); + connection.Storage.Client.ExecuteExpireDocuments(query, epoch); }); } @@ -513,17 +382,7 @@ public override void PersistHash(string key) QueueCommand(() => { string query = $"SELECT * FROM doc WHERE doc.type = {(int)DocumentTypes.Hash} AND doc.key = '{key}'"; - ProcedureResponse response; - - do - { - Task> task = connection.Storage.Client.ExecuteStoredProcedureWithRetriesAsync(spPersistDocumentsUri, query); - task.Wait(); - - response = task.Result; - - // if the continuation is true; run the procedure again - } while (response.Continuation); + connection.Storage.Client.ExecutePersistDocuments(query); }); } @@ -569,18 +428,7 @@ public override void RemoveFromList(string key, string value) string ids = string.Join(",", lists.Select(l => $"'{l}'")); string query = $"SELECT doc._self FROM doc WHERE doc.type = {(int)DocumentTypes.List} AND doc.id IN ({ids})"; - ProcedureResponse response; - - do - { - Task> task = connection.Storage.Client.ExecuteStoredProcedureWithRetriesAsync(spDeleteDocumentsUri, query); - task.Wait(); - - response = task.Result; - - // if the continuation is true; run the procedure again - } while (response.Continuation); - + connection.Storage.Client.ExecuteDeleteDocuments(query); }); } @@ -604,18 +452,7 @@ public override void TrimList(string key, int keepStartingFrom, int keepEndingAt string ids = string.Join(",", lists.Select(l => $"'{l}'")); string query = $"SELECT doc._self FROM doc WHERE doc.type = {(int)DocumentTypes.List} AND doc.id IN ({ids})"; - ProcedureResponse response; - - do - { - Task> task = connection.Storage.Client.ExecuteStoredProcedureWithRetriesAsync(spDeleteDocumentsUri, query); - task.Wait(); - - response = task.Result; - - // if the continuation is true; run the procedure again - } while (response.Continuation); - + connection.Storage.Client.ExecuteDeleteDocuments(query); }); } @@ -627,17 +464,7 @@ public override void ExpireList(string key, TimeSpan expireIn) { string query = $"SELECT * FROM doc WHERE doc.type = {(int)DocumentTypes.List} AND doc.key = '{key}'"; int epoch = DateTime.UtcNow.Add(expireIn).ToEpoch(); - ProcedureResponse response; - - do - { - Task> task = connection.Storage.Client.ExecuteStoredProcedureWithRetriesAsync(spExpireDocumentsUri, query, epoch); - task.Wait(); - - response = task.Result; - - // if the continuation is true; run the procedure again - } while (response.Continuation); + connection.Storage.Client.ExecuteExpireDocuments(query, epoch); }); } @@ -648,17 +475,7 @@ public override void PersistList(string key) QueueCommand(() => { string query = $"SELECT * FROM doc WHERE doc.type = {(int)DocumentTypes.List} AND doc.key = '{key}'"; - ProcedureResponse response; - - do - { - Task> task = connection.Storage.Client.ExecuteStoredProcedureWithRetriesAsync(spPersistDocumentsUri, query); - task.Wait(); - - response = task.Result; - - // if the continuation is true; run the procedure again - } while (response.Continuation); + connection.Storage.Client.ExecutePersistDocuments(query); }); } diff --git a/Hangfire.AzureDocumentDB/ExpirationManager.cs b/Hangfire.AzureDocumentDB/ExpirationManager.cs index b3c4d41..e8d1ce7 100644 --- a/Hangfire.AzureDocumentDB/ExpirationManager.cs +++ b/Hangfire.AzureDocumentDB/ExpirationManager.cs @@ -1,10 +1,8 @@ using System; using System.Threading; -using System.Threading.Tasks; using Hangfire.Server; using Hangfire.Logging; -using Microsoft.Azure.Documents.Client; using Hangfire.Azure.Helper; using Hangfire.Azure.Documents; @@ -21,13 +19,11 @@ internal class ExpirationManager : IServerComponent private readonly TimeSpan defaultLockTimeout; private readonly DocumentTypes[] documents = { DocumentTypes.Lock, DocumentTypes.Job, DocumentTypes.List, DocumentTypes.Set, DocumentTypes.Hash, DocumentTypes.Counter }; private readonly DocumentDbStorage storage; - private readonly Uri spDeleteDocumentsUri; public ExpirationManager(DocumentDbStorage storage) { this.storage = storage ?? throw new ArgumentNullException(nameof(storage)); defaultLockTimeout = TimeSpan.FromSeconds(30) + storage.Options.ExpirationCheckInterval; - spDeleteDocumentsUri = UriFactory.CreateStoredProcedureUri(storage.Options.DatabaseName, storage.Options.CollectionName, "deleteDocuments"); } public void Execute(CancellationToken cancellationToken) @@ -50,19 +46,8 @@ public void Execute(CancellationToken cancellationToken) query += $" AND doc.counter_type = {(int)CounterTypes.Aggregate}"; } - int deleted = 0; - ProcedureResponse response; - do - { - Task> procedureTask = storage.Client.ExecuteStoredProcedureWithRetriesAsync(spDeleteDocumentsUri, query); - procedureTask.Wait(cancellationToken); - - response = procedureTask.Result; - deleted += response.Affected; - - // if the continuation is true; run the procedure again - } while (response.Continuation); - + int deleted = storage.Client.ExecuteDeleteDocuments(query); + logger.Trace($"Outdated {deleted} records removed from the '{type}' document."); } } diff --git a/Hangfire.AzureDocumentDB/Helper/StoredprocedureHelper.cs b/Hangfire.AzureDocumentDB/Helper/StoredprocedureHelper.cs new file mode 100644 index 0000000..3fd24a7 --- /dev/null +++ b/Hangfire.AzureDocumentDB/Helper/StoredprocedureHelper.cs @@ -0,0 +1,76 @@ +using System; +using System.Linq; +using System.Threading.Tasks; + +using Microsoft.Azure.Documents.Client; + +using Hangfire.Azure.Documents; + +namespace Hangfire.Azure.Helper +{ + internal static class StoredprocedureHelper + { + private static Uri spPersistDocumentsUri; + private static Uri spExpireDocumentsUri; + private static Uri spDeleteDocumentsUri; + private static Uri spUpsertDocumentsUri; + + internal static void Setup(string databaseName, string collectionName) + { + spPersistDocumentsUri = UriFactory.CreateStoredProcedureUri(databaseName, collectionName, "persistDocuments"); + spExpireDocumentsUri = UriFactory.CreateStoredProcedureUri(databaseName, collectionName, "expireDocuments"); + spDeleteDocumentsUri = UriFactory.CreateStoredProcedureUri(databaseName, collectionName, "deleteDocuments"); + spUpsertDocumentsUri = UriFactory.CreateStoredProcedureUri(databaseName, collectionName, "upsertDocuments"); + } + + internal static int ExecuteUpsertDocuments(this DocumentClient client, Data data) + { + int affected = 0; + Data records = new Data(data.Items); + do + { + records.Items = data.Items.Skip(affected).ToList(); + Task> task = client.ExecuteStoredProcedureWithRetriesAsync(spUpsertDocumentsUri, records); + task.Wait(); + affected += task.Result; + } while (affected < data.Items.Count); + return affected; + } + + internal static int ExecuteDeleteDocuments(this DocumentClient client, string query) + { + int affected = 0; + ProcedureResponse response; + do + { + Task> task = client.ExecuteStoredProcedureWithRetriesAsync(spDeleteDocumentsUri, query); + task.Wait(); + response = task.Result; + affected += response.Affected; + } while (response.Continuation); + return affected; + } + + internal static void ExecutePersistDocuments(this DocumentClient client, string query) + { + ProcedureResponse response; + do + { + Task> task = client.ExecuteStoredProcedureWithRetriesAsync(spPersistDocumentsUri, query); + task.Wait(); + response = task.Result; + } while (response.Continuation); + } + + internal static void ExecuteExpireDocuments(this DocumentClient client, string query, int epoch) + { + ProcedureResponse response; + do + { + Task> task = client.ExecuteStoredProcedureWithRetriesAsync(spExpireDocumentsUri, query, epoch); + task.Wait(); + response = task.Result; + } while (response.Continuation); + } + } +} From 8ef9a098e7dace20aaf388e846ad08256d9836ca Mon Sep 17 00:00:00 2001 From: imranmomin Date: Tue, 25 Dec 2018 16:55:55 -0500 Subject: [PATCH 2/6] changed the class access modifiers to internal --- Hangfire.AzureDocumentDB/DocumentDbStorage.cs | 5 ++++- Hangfire.AzureDocumentDB/Entities/Queue.cs | 2 +- .../Helper/ClientHelper.cs | 18 +++++++++--------- Hangfire.AzureDocumentDB/Helper/HashHelper.cs | 4 ++-- Hangfire.AzureDocumentDB/Helper/QueryHelper.cs | 4 ++-- Hangfire.AzureDocumentDB/Helper/TimeHelper.cs | 6 +++--- 6 files changed, 21 insertions(+), 18 deletions(-) diff --git a/Hangfire.AzureDocumentDB/DocumentDbStorage.cs b/Hangfire.AzureDocumentDB/DocumentDbStorage.cs index 8738f9d..0cac1d0 100644 --- a/Hangfire.AzureDocumentDB/DocumentDbStorage.cs +++ b/Hangfire.AzureDocumentDB/DocumentDbStorage.cs @@ -10,10 +10,11 @@ using Hangfire.Logging; using Newtonsoft.Json; using Microsoft.Azure.Documents; +using Newtonsoft.Json.Serialization; using Microsoft.Azure.Documents.Client; using Hangfire.Azure.Queue; -using Newtonsoft.Json.Serialization; +using Hangfire.Azure.Helper; namespace Hangfire.Azure { @@ -70,6 +71,8 @@ public DocumentDbStorage(string url, string authSecret, string database, string Task continueTask = task.ContinueWith(t => Initialize(), TaskContinuationOptions.OnlyOnRanToCompletion); continueTask.Wait(); + StoredprocedureHelper.Setup(database, collection); + JobQueueProvider provider = new JobQueueProvider(this); QueueProviders = new PersistentJobQueueProviderCollection(provider); } diff --git a/Hangfire.AzureDocumentDB/Entities/Queue.cs b/Hangfire.AzureDocumentDB/Entities/Queue.cs index 69b70f0..e5719d2 100644 --- a/Hangfire.AzureDocumentDB/Entities/Queue.cs +++ b/Hangfire.AzureDocumentDB/Entities/Queue.cs @@ -5,7 +5,7 @@ // ReSharper disable once CheckNamespace namespace Hangfire.Azure.Documents { - class Queue : DocumentBase + internal class Queue : DocumentBase { [JsonProperty("name")] public string Name { get; set; } diff --git a/Hangfire.AzureDocumentDB/Helper/ClientHelper.cs b/Hangfire.AzureDocumentDB/Helper/ClientHelper.cs index 7990c92..20cf778 100644 --- a/Hangfire.AzureDocumentDB/Helper/ClientHelper.cs +++ b/Hangfire.AzureDocumentDB/Helper/ClientHelper.cs @@ -8,7 +8,7 @@ namespace Hangfire.Azure.Helper { - public static class ClientHelper + internal static class ClientHelper { /// /// Creates a document as an asynchronous operation in the Azure Cosmos DB service. @@ -20,7 +20,7 @@ public static class ClientHelper /// Disables the automatic id generation, will throw an exception if id is missing. /// (Optional) representing request cancellation. /// - public static Task> CreateDocumentWithRetriesAsync(this DocumentClient client, Uri documentCollectionUri, object document, RequestOptions options = null, bool disableAutomaticIdGeneration = false, CancellationToken cancellationToken = default(CancellationToken)) + internal static Task> CreateDocumentWithRetriesAsync(this DocumentClient client, Uri documentCollectionUri, object document, RequestOptions options = null, bool disableAutomaticIdGeneration = false, CancellationToken cancellationToken = default(CancellationToken)) { return Task.Run(async () => await client.ExecuteWithRetries(() => client.CreateDocumentAsync(documentCollectionUri, document, options, disableAutomaticIdGeneration, cancellationToken)), cancellationToken); } @@ -34,7 +34,7 @@ public static class ClientHelper /// The request options for the request. /// (Optional) representing request cancellation. /// - public static Task> ReadDocumentWithRetriesAsync(this DocumentClient client, Uri documentUri, RequestOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) + internal static Task> ReadDocumentWithRetriesAsync(this DocumentClient client, Uri documentUri, RequestOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { return Task.Run(async () => await client.ExecuteWithRetries(() => client.ReadDocumentAsync(documentUri, options, cancellationToken)), cancellationToken); } @@ -48,7 +48,7 @@ public static class ClientHelper /// The request options for the request. /// Disables the automatic id generation, will throw an exception if id is missing. /// (Optional) representing request cancellation. - public static Task> UpsertDocumentWithRetriesAsync(this DocumentClient client, Uri documentCollectionUri, object document, RequestOptions options = null, bool disableAutomaticIdGeneration = false, CancellationToken cancellationToken = default(CancellationToken)) + internal static Task> UpsertDocumentWithRetriesAsync(this DocumentClient client, Uri documentCollectionUri, object document, RequestOptions options = null, bool disableAutomaticIdGeneration = false, CancellationToken cancellationToken = default(CancellationToken)) { return Task.Run(async () => await client.ExecuteWithRetries(() => client.UpsertDocumentAsync(documentCollectionUri, document, options, disableAutomaticIdGeneration, cancellationToken)), cancellationToken); } @@ -60,7 +60,7 @@ public static class ClientHelper /// the URI of the document to delete. /// The request options for the request. /// (Optional) representing request cancellation. - public static Task> DeleteDocumentWithRetriesAsync(this DocumentClient client, Uri documentUri, RequestOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) + internal static Task> DeleteDocumentWithRetriesAsync(this DocumentClient client, Uri documentUri, RequestOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { return Task.Run(async () => await client.ExecuteWithRetries(() => client.DeleteDocumentAsync(documentUri, options, cancellationToken)), cancellationToken); } @@ -74,7 +74,7 @@ public static class ClientHelper /// The request options for the request. /// (Optional) representing request cancellation. /// - public static Task> ReplaceDocumentWithRetriesAsync(this DocumentClient client, Uri documentUri, object document, RequestOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) + internal static Task> ReplaceDocumentWithRetriesAsync(this DocumentClient client, Uri documentUri, object document, RequestOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { return Task.Run(async () => await client.ExecuteWithRetries(() => client.ReplaceDocumentAsync(documentUri, document, options, cancellationToken)), cancellationToken); } @@ -87,7 +87,7 @@ public static class ClientHelper /// the URI of the stored procedure to be executed. /// the parameters for the stored procedure execution. /// - public static Task> ExecuteStoredProcedureWithRetriesAsync(this DocumentClient client, Uri storedProcedureUri, params object[] procedureParams) + internal static Task> ExecuteStoredProcedureWithRetriesAsync(this DocumentClient client, Uri storedProcedureUri, params object[] procedureParams) { return Task.Run(async () => await client.ExecuteWithRetries(() => client.ExecuteStoredProcedureAsync(storedProcedureUri, procedureParams))); } @@ -95,7 +95,7 @@ public static Task> ExecuteStoredProcedureWithRetries /// /// Execute the function with retries on throttle /// - public static async Task> ExecuteNextWithRetriesAsync(this IDocumentQuery query) + internal static async Task> ExecuteNextWithRetriesAsync(this IDocumentQuery query) { while (true) { @@ -121,7 +121,7 @@ public static async Task> ExecuteNextWithRetriesAsync(this ID /// /// Execute the function with retries on throttle /// - public static async Task ExecuteWithRetries(this DocumentClient client, Func> function) + internal static async Task ExecuteWithRetries(this DocumentClient client, Func> function) { while (true) { diff --git a/Hangfire.AzureDocumentDB/Helper/HashHelper.cs b/Hangfire.AzureDocumentDB/Helper/HashHelper.cs index 407fba5..808c218 100644 --- a/Hangfire.AzureDocumentDB/Helper/HashHelper.cs +++ b/Hangfire.AzureDocumentDB/Helper/HashHelper.cs @@ -3,9 +3,9 @@ namespace Hangfire.Azure.Helper { - public static class HashHelper + internal static class HashHelper { - public static string GenerateHash(this string input) + internal static string GenerateHash(this string input) { using (System.Security.Cryptography.MD5 md5 = System.Security.Cryptography.MD5.Create()) { diff --git a/Hangfire.AzureDocumentDB/Helper/QueryHelper.cs b/Hangfire.AzureDocumentDB/Helper/QueryHelper.cs index f8c9efa..593f443 100644 --- a/Hangfire.AzureDocumentDB/Helper/QueryHelper.cs +++ b/Hangfire.AzureDocumentDB/Helper/QueryHelper.cs @@ -7,9 +7,9 @@ namespace Hangfire.Azure.Helper { - public static class QueryHelper + internal static class QueryHelper { - public static List ToQueryResult(this IQueryable source) + internal static List ToQueryResult(this IQueryable source) { IDocumentQuery query = source.AsDocumentQuery(); List results = new List(); diff --git a/Hangfire.AzureDocumentDB/Helper/TimeHelper.cs b/Hangfire.AzureDocumentDB/Helper/TimeHelper.cs index da3ea6a..6d38130 100644 --- a/Hangfire.AzureDocumentDB/Helper/TimeHelper.cs +++ b/Hangfire.AzureDocumentDB/Helper/TimeHelper.cs @@ -8,16 +8,16 @@ internal static class TimeHelper { private static readonly DateTime epochDateTime = new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc); - public static int ToEpoch(this DateTime date) + internal static int ToEpoch(this DateTime date) { if (date.Equals(DateTime.MinValue)) return int.MinValue; TimeSpan epochTimeSpan = date - epochDateTime; return (int)epochTimeSpan.TotalSeconds; } - public static DateTime ToDateTime(this int totalSeconds) => epochDateTime.AddSeconds(totalSeconds); + internal static DateTime ToDateTime(this int totalSeconds) => epochDateTime.AddSeconds(totalSeconds); - public static string TryParseToEpoch(this string s) + internal static string TryParseToEpoch(this string s) { return DateTime.TryParse(s, CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind, out DateTime date) ? date.ToEpoch().ToString(CultureInfo.InvariantCulture) From 1e6938326d35c4ae7f1e10c10451cf64001a300a Mon Sep 17 00:00:00 2001 From: imranmomin Date: Wed, 26 Dec 2018 09:08:57 -0500 Subject: [PATCH 3/6] fixed the equal-to operator --- Hangfire.AzureDocumentDB/DocumentDbWriteOnlyTransaction.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Hangfire.AzureDocumentDB/DocumentDbWriteOnlyTransaction.cs b/Hangfire.AzureDocumentDB/DocumentDbWriteOnlyTransaction.cs index ceddd32..db31789 100644 --- a/Hangfire.AzureDocumentDB/DocumentDbWriteOnlyTransaction.cs +++ b/Hangfire.AzureDocumentDB/DocumentDbWriteOnlyTransaction.cs @@ -305,7 +305,7 @@ public override void RemoveSet(string key) QueueCommand(() => { - string query = $"SELECT doc._self FROM doc WHERE doc.type = {(int)DocumentTypes.Set} AND doc.key == '{key}'"; + string query = $"SELECT doc._self FROM doc WHERE doc.type = {(int)DocumentTypes.Set} AND doc.key = '{key}'"; connection.Storage.Client.ExecuteDeleteDocuments(query); }); } @@ -320,7 +320,7 @@ public override void RemoveHash(string key) QueueCommand(() => { - string query = $"SELECT doc._self FROM doc WHERE doc.type = {(int)DocumentTypes.Hash} AND doc.key == '{key}'"; + string query = $"SELECT doc._self FROM doc WHERE doc.type = {(int)DocumentTypes.Hash} AND doc.key = '{key}'"; connection.Storage.Client.ExecuteDeleteDocuments(query); }); } From 3173233cfbbdddc59ecbfbe1e8a601912817183b Mon Sep 17 00:00:00 2001 From: imranmomin Date: Wed, 26 Dec 2018 09:09:30 -0500 Subject: [PATCH 4/6] use of Ternary operator on Select --- Hangfire.AzureDocumentDB/Queue/JobQueueMonitoringApi.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Hangfire.AzureDocumentDB/Queue/JobQueueMonitoringApi.cs b/Hangfire.AzureDocumentDB/Queue/JobQueueMonitoringApi.cs index 6312c36..4784953 100644 --- a/Hangfire.AzureDocumentDB/Queue/JobQueueMonitoringApi.cs +++ b/Hangfire.AzureDocumentDB/Queue/JobQueueMonitoringApi.cs @@ -91,10 +91,10 @@ public IEnumerable GetFetchedJobIds(string queue, int from, int perPage) { (int EnqueuedCount, int FetchedCount) result = storage.Client.CreateDocumentQuery(storage.CollectionUri) .Where(q => q.DocumentType == DocumentTypes.Queue && q.Name == queue) + .Select(q => new { q.Name, EnqueuedCount = q.FetchedAt.IsDefined() ? 0 : 1, FetchedCount = q.FetchedAt.IsDefined() ? 1 : 0 }) .ToQueryResult() - .Select(q => new { q.Name, q.FetchedAt }) .GroupBy(q => q.Name) - .Select(v => (EnqueuedCount: v.Sum(q => q.FetchedAt.HasValue ? 0 : 1), FetchedCount: v.Sum(q => q.FetchedAt.HasValue ? 1 : 0))) + .Select(v => (EnqueuedCount: v.Sum(q => q.EnqueuedCount), FetchedCount: v.Sum(q => q.FetchedCount))) .FirstOrDefault(); return result; From e00801bc23dd7c4f73f2c0fa2f61f65d40b6e7ce Mon Sep 17 00:00:00 2001 From: imranmomin Date: Wed, 26 Dec 2018 09:09:57 -0500 Subject: [PATCH 5/6] fixed the paging on recurring page --- Hangfire.AzureDocumentDB/DocumentDbConnection.cs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/Hangfire.AzureDocumentDB/DocumentDbConnection.cs b/Hangfire.AzureDocumentDB/DocumentDbConnection.cs index fa64288..ec50db9 100644 --- a/Hangfire.AzureDocumentDB/DocumentDbConnection.cs +++ b/Hangfire.AzureDocumentDB/DocumentDbConnection.cs @@ -214,7 +214,9 @@ public override List GetRangeFromSet(string key, int startingFrom, int e { if (key == null) throw new ArgumentNullException(nameof(key)); - return Storage.Client.CreateDocumentQuery(Storage.CollectionUri) + FeedOptions feedOptions = new FeedOptions { MaxItemCount = endingAt + 1 }; + + return Storage.Client.CreateDocumentQuery(Storage.CollectionUri, feedOptions) .Where(s => s.DocumentType == DocumentTypes.Set && s.Key == key) .OrderBy(s => s.Score) .ToQueryResult() @@ -484,7 +486,9 @@ public override List GetRangeFromList(string key, int startingFrom, int { if (key == null) throw new ArgumentNullException(nameof(key)); - return Storage.Client.CreateDocumentQuery(Storage.CollectionUri) + FeedOptions feedOptions = new FeedOptions { MaxItemCount = endingAt + 1 }; + + return Storage.Client.CreateDocumentQuery(Storage.CollectionUri, feedOptions) .Where(l => l.DocumentType == DocumentTypes.List && l.Key == key) .OrderByDescending(l => l.CreatedOn) .Select(l => l.Value) From 78a5eec6b0c7e2616440ec0d394932cb1fbb350b Mon Sep 17 00:00:00 2001 From: imranmomin Date: Wed, 26 Dec 2018 12:01:25 -0500 Subject: [PATCH 6/6] improvement on paging LINQ --- Hangfire.AzureDocumentDB/DocumentDbConnection.cs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/Hangfire.AzureDocumentDB/DocumentDbConnection.cs b/Hangfire.AzureDocumentDB/DocumentDbConnection.cs index ec50db9..977d2d7 100644 --- a/Hangfire.AzureDocumentDB/DocumentDbConnection.cs +++ b/Hangfire.AzureDocumentDB/DocumentDbConnection.cs @@ -215,15 +215,15 @@ public override List GetRangeFromSet(string key, int startingFrom, int e if (key == null) throw new ArgumentNullException(nameof(key)); FeedOptions feedOptions = new FeedOptions { MaxItemCount = endingAt + 1 }; + endingAt += 1 - startingFrom; return Storage.Client.CreateDocumentQuery(Storage.CollectionUri, feedOptions) .Where(s => s.DocumentType == DocumentTypes.Set && s.Key == key) - .OrderBy(s => s.Score) - .ToQueryResult() .OrderBy(s => s.CreatedOn) - .Select((s, i) => new { s.Value, Index = i }) - .Where(s => s.Index >= startingFrom && s.Index <= endingAt) .Select(s => s.Value) + .ToQueryResult() + .Skip(startingFrom) + .Take(endingAt) .ToList(); } @@ -487,15 +487,15 @@ public override List GetRangeFromList(string key, int startingFrom, int if (key == null) throw new ArgumentNullException(nameof(key)); FeedOptions feedOptions = new FeedOptions { MaxItemCount = endingAt + 1 }; + endingAt += 1 - startingFrom; return Storage.Client.CreateDocumentQuery(Storage.CollectionUri, feedOptions) .Where(l => l.DocumentType == DocumentTypes.List && l.Key == key) .OrderByDescending(l => l.CreatedOn) .Select(l => l.Value) .ToQueryResult() - .Select((l, i) => new { Value = l, Index = i }) - .Where(l => l.Index >= startingFrom && l.Index <= endingAt) - .Select(l => l.Value) + .Skip(startingFrom) + .Take(endingAt) .ToList(); }