diff --git a/Hangfire.AzureDocumentDB.sln b/Hangfire.AzureDocumentDB.sln index 0e69caf..a65aa10 100644 --- a/Hangfire.AzureDocumentDB.sln +++ b/Hangfire.AzureDocumentDB.sln @@ -1,9 +1,9 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio 14 -VisualStudioVersion = 14.0.25123.0 +# Visual Studio Version 16 +VisualStudioVersion = 16.0.29806.167 MinimumVisualStudioVersion = 10.0.40219.1 -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Hangfire.AzureDocumentDB", "Hangfire.AzureDocumentDB\Hangfire.AzureDocumentDB.csproj", "{E0AD3801-5504-49A7-80C6-8B4373DDE0E5}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Hangfire.AzureDocumentDB", "src\Hangfire.AzureDocumentDB.csproj", "{E0AD3801-5504-49A7-80C6-8B4373DDE0E5}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -19,4 +19,7 @@ Global GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {C15DCC20-26DF-41C7-8C83-0A8D63706B22} + EndGlobalSection EndGlobal diff --git a/Hangfire.AzureDocumentDB/Helper/QueryHelper.cs b/Hangfire.AzureDocumentDB/Helper/QueryHelper.cs deleted file mode 100644 index 593f443..0000000 --- a/Hangfire.AzureDocumentDB/Helper/QueryHelper.cs +++ /dev/null @@ -1,27 +0,0 @@ -using System.Linq; -using System.Threading.Tasks; -using System.Collections.Generic; - -using Microsoft.Azure.Documents.Linq; -using Microsoft.Azure.Documents.Client; - -namespace Hangfire.Azure.Helper -{ - internal static class QueryHelper - { - internal static List ToQueryResult(this IQueryable source) - { - IDocumentQuery query = source.AsDocumentQuery(); - List results = new List(); - - while (query.HasMoreResults) - { - Task> task = Task.Run(async () => await query.ExecuteNextWithRetriesAsync()); - task.Wait(); - results.AddRange(task.Result); - } - - return results; - } - } -} \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB/StoredProcedure/setJobState.js b/Hangfire.AzureDocumentDB/StoredProcedure/setJobState.js deleted file mode 100644 index 92f1c5b..0000000 --- a/Hangfire.AzureDocumentDB/StoredProcedure/setJobState.js +++ /dev/null @@ -1,53 +0,0 @@ -function setJobState(id, state) { - let context = getContext(); - let collection = context.getCollection(); - let response = getContext().getResponse(); - let collectionLink = collection.getAltLink(); - let documentLink = `${collectionLink}/docs/${id}`; - const keys = Object.keys(state.data); - for (const key of keys) { - const newKey = camelCaseToPascalCase(key); - if (key !== newKey) { - state.data[newKey] = state.data[key]; - delete state.data[key]; - } - } - response.setBody(false); - let isAccepted = collection.readDocument(documentLink, (error, job) => { - if (error) { - throw error; - } - createState(state, (doc) => { - job.state_id = doc.id; - job.state_name = doc.name; - let options = { etag: job._etag }; - let success = collection.replaceDocument(job._self, job, options, (err) => { - if (err) { - throw err; - } - response.setBody(true); - }); - if (!success) { - throw new Error("The call was not accepted"); - } - }); - }); - function createState(doc, callback) { - let success = collection.createDocument(collectionLink, doc, (error, document) => { - if (error) { - throw error; - } - callback(document); - }); - if (!success) { - throw new Error("The call was not accepted"); - } - } - function camelCaseToPascalCase(input) { - return input.replace(/([A-Z])/g, '$1') - .replace(/^./, (match) => match.toUpperCase()); - } - if (!isAccepted) { - throw new Error("The call was not accepted"); - } -} diff --git a/Hangfire.AzureDocumentDB/StoredProcedure/setJobState.ts b/Hangfire.AzureDocumentDB/StoredProcedure/setJobState.ts deleted file mode 100644 index 9474759..0000000 --- a/Hangfire.AzureDocumentDB/StoredProcedure/setJobState.ts +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Set the Job state data - * @param {string} id - the job id - * @param {IState} state - the state document - */ -function setJobState(id: string, state: IState) { - let context: IContext = getContext(); - let collection: ICollection = context.getCollection(); - let response: IResponse = getContext().getResponse(); - let collectionLink: string = collection.getAltLink(); - let documentLink: string = `${collectionLink}/docs/${id}`; - - // convert the case for the data - const keys: Array = Object.keys(state.data); - for (const key of keys) { - const newKey = camelCaseToPascalCase(key); - if (key !== newKey) { - state.data[newKey] = state.data[key]; - delete state.data[key]; - } - } - - // default response - response.setBody(false); - - let isAccepted: boolean = collection.readDocument(documentLink, (error: IRequestCallbackError, job: IJob) => { - if (error) { - throw error; - } - - // now create the state document - // on callback replace the job documented with state_id, state_name - createState(state, (doc: IState): void => { - job.state_id = doc.id; - job.state_name = doc.name; - let options: IReplaceOptions = { etag: job._etag }; - - let success: boolean = collection.replaceDocument(job._self, job, options, (err: IRequestCallbackError) => { - if (err) { - throw err; - } - response.setBody(true); - }); - - if (!success) { - throw new Error("The call was not accepted"); - } - }); - }); - - /** - * Creates a new state - * @param {Object} doc - information for the job - * @param {function} callback - return the newly create state document - */ - function createState(doc: IState, callback: (doc: IState) => void) { - let success: boolean = collection.createDocument(collectionLink, doc, (error: IRequestCallbackError, document: IState) => { - if (error) { - throw error; - } - callback(document); - }); - - if (!success) { - throw new Error("The call was not accepted"); - } - } - - /** - * Convert the camel case to pascal - * @param input - The text which needs to be converted - */ - function camelCaseToPascalCase(input: string): string { - return input.replace(/([A-Z])/g, '$1') - .replace(/^./, (match) => match.toUpperCase()); - } - - if (!isAccepted) { - throw new Error("The call was not accepted"); - } -} \ No newline at end of file diff --git a/README.md b/README.md index 724ee62..b3e017d 100644 --- a/README.md +++ b/README.md @@ -4,17 +4,28 @@ [![Latest version](https://img.shields.io/nuget/v/Hangfire.AzureDocumentDB.svg)](https://www.nuget.org/packages/Hangfire.AzureDocumentDB) [![Build status](https://ci.appveyor.com/api/projects/status/uvxh94dhxcokga47?svg=true)](https://ci.appveyor.com/project/imranmomin/hangfire-azuredocumentdb) -This repo will add a [Microsoft Azure DocumentDB](https://azure.microsoft.com/en-ca/services/documentdb) storage support to [Hangfire](http://hangfire.io) - fire-and-forget, delayed and recurring tasks runner for .NET. Scalable and reliable background job runner. Supports multiple servers, CPU and I/O intensive, long-running and short-running jobs. +This repo will add a [Microsoft Azure Cosmos DB](https://azure.microsoft.com/en-ca/services/cosmos-db) storage support to [Hangfire](http://hangfire.io) - fire-and-forget, delayed and recurring tasks runner for .NET. Scalable and reliable background job runner. Supports multiple servers, CPU and I/O intensive, long-running and short-running jobs. ## Installation -[Hangfire.AzureDocumentDB](https://www.nuget.org/packages/Hangfire.AzureDocumentDB) is available as a NuGet package. Install it using the NuGet Package Console window: +[Hangfire.AzureDocumentDB](https://www.nuget.org/packages/Hangfire.AzureDocumentDB) is available on NuGet. + +Package Manager ```powershell PM> Install-Package Hangfire.AzureDocumentDB ``` +.NET CLI +``` +> dotnet add package Hangfire.AzureDocumentDB +``` + +PackageReference +```xml + +``` ## Usage @@ -36,7 +47,8 @@ Hangfire.Azure.DocumentDbStorageOptions options = new Hangfire.Azure.DocumentDbS CountersAggregateInterval = TimeSpan.FromMinutes(2), QueuePollInterval = TimeSpan.FromSeconds(15), ConnectionMode = ConnectionMode.Direct, - ConnectionProtocol = Protocol.Tcp + ConnectionProtocol = Protocol.Tcp, + EnablePartition = false // default: false true; to enable partition on /type }; GlobalConfiguration.Configuration.UseAzureDocumentDbStorage("", "", "", "", options); @@ -47,6 +59,12 @@ Hangfire.Azure.DocumentDbStorage storage = new Hangfire.Azure.DocumentDbStorage( GlobalConfiguration.Configuration.UseStorage(storage); ``` +## Recommendations +- Keep seperate database/collection for the hangfire. (Now you can [enable free tier](https://docs.microsoft.com/en-us/azure/cosmos-db/optimize-dev-test#azure-cosmos-db-free-tier) on Azure) +- Enable partitioning by ```/type``` + +## SDK Support +This package only support using [Microsoft.Azure.DocumentDB](https://www.nuget.org/packages/Microsoft.Azure.DocumentDB/). If you want the support for the latest SDK v3 [Microsoft.Azure.Cosmos](https://www.nuget.org/packages/Microsoft.Azure.Cosmos), you will have to use [Hangfire.AzureCosmosDb](https://github.com/imranmomin/Hangfire.AzureCosmosDb) ## Questions? Problems? diff --git a/Hangfire.AzureDocumentDB/CountersAggregator.cs b/src/CountersAggregator.cs similarity index 90% rename from Hangfire.AzureDocumentDB/CountersAggregator.cs rename to src/CountersAggregator.cs index d9c5622..3f1a1bf 100644 --- a/Hangfire.AzureDocumentDB/CountersAggregator.cs +++ b/src/CountersAggregator.cs @@ -1,18 +1,18 @@ using System; -using System.Net; +using System.Collections.Generic; using System.Linq; +using System.Net; using System.Threading; using System.Threading.Tasks; -using System.Collections.Generic; -using Hangfire.Server; +using Hangfire.Azure.Documents; +using Hangfire.Azure.Helper; using Hangfire.Logging; +using Hangfire.Server; + using Microsoft.Azure.Documents; using Microsoft.Azure.Documents.Client; -using Hangfire.Azure.Helper; -using Hangfire.Azure.Documents; - namespace Hangfire.Azure { #pragma warning disable 618 @@ -23,6 +23,7 @@ internal class CountersAggregator : IServerComponent private const string DISTRIBUTED_LOCK_KEY = "locks:counters:aggragator"; private readonly TimeSpan defaultLockTimeout; private readonly DocumentDbStorage storage; + private readonly PartitionKey partitionKey = new PartitionKey((int)DocumentTypes.Counter); public CountersAggregator(DocumentDbStorage storage) { @@ -36,9 +37,10 @@ public void Execute(CancellationToken cancellationToken) { logger.Trace("Aggregating records in 'Counter' table."); - List rawCounters = storage.Client.CreateDocumentQuery(storage.CollectionUri) + List rawCounters = storage.Client.CreateDocumentQueryAsync(storage.CollectionUri, new FeedOptions { PartitionKey = partitionKey }) .Where(c => c.DocumentType == DocumentTypes.Counter && c.Type == CounterTypes.Raw) - .ToQueryResult(); + .ToQueryResult() + .ToList(); Dictionary Counters)> counters = rawCounters.GroupBy(c => c.Key) .ToDictionary(k => k.Key, v => (Value: v.Sum(c => c.Value), ExpireOn: v.Max(c => c.ExpireOn), Counters: v.ToList())); @@ -55,7 +57,7 @@ public void Execute(CancellationToken cancellationToken) try { - Task> readTask = storage.Client.ReadDocumentWithRetriesAsync(uri, cancellationToken: cancellationToken); + Task> readTask = storage.Client.ReadDocumentWithRetriesAsync(uri, new RequestOptions { PartitionKey = partitionKey }, cancellationToken: cancellationToken); readTask.Wait(cancellationToken); if (readTask.Result.StatusCode == HttpStatusCode.OK) @@ -90,7 +92,7 @@ public void Execute(CancellationToken cancellationToken) } } - Task> task = storage.Client.UpsertDocumentWithRetriesAsync(storage.CollectionUri, aggregated, cancellationToken: cancellationToken); + Task> task = storage.Client.UpsertDocumentWithRetriesAsync(storage.CollectionUri, aggregated, new RequestOptions { PartitionKey = partitionKey }, cancellationToken: cancellationToken); Task continueTask = task.ContinueWith(t => { if (t.Result.StatusCode == HttpStatusCode.Created || t.Result.StatusCode == HttpStatusCode.OK) @@ -98,7 +100,7 @@ public void Execute(CancellationToken cancellationToken) string ids = string.Join(",", data.Counters.Select(c => $"'{c.Id}'").ToArray()); string query = $"SELECT doc._self FROM doc WHERE doc.type = {(int)DocumentTypes.Counter} AND doc.counter_type = {(int)CounterTypes.Raw} AND doc.id IN ({ids})"; - int deleted = storage.Client.ExecuteDeleteDocuments(query); + int deleted = storage.Client.ExecuteDeleteDocuments(query, new RequestOptions { PartitionKey = partitionKey }, cancellationToken); logger.Trace($"Total {deleted} records from the 'Counter:{aggregated.Key}' were aggregated."); } diff --git a/Hangfire.AzureDocumentDB/DocumentDbConnection.cs b/src/DocumentDbConnection.cs similarity index 82% rename from Hangfire.AzureDocumentDB/DocumentDbConnection.cs rename to src/DocumentDbConnection.cs index 11accc7..eb3a424 100644 --- a/Hangfire.AzureDocumentDB/DocumentDbConnection.cs +++ b/src/DocumentDbConnection.cs @@ -1,545 +1,541 @@ -using System; -using System.Net; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using System.Collections.Generic; - -using Hangfire.Common; -using Hangfire.Server; -using Hangfire.Storage; -using Microsoft.Azure.Documents; -using Microsoft.Azure.Documents.Client; - -using Hangfire.Azure.Queue; -using Hangfire.Azure.Helper; -using Hangfire.Azure.Documents; -using Hangfire.Azure.Documents.Helper; - -namespace Hangfire.Azure -{ - internal sealed class DocumentDbConnection : JobStorageConnection - { - public DocumentDbStorage Storage { get; } - public PersistentJobQueueProviderCollection QueueProviders { get; } - - public DocumentDbConnection(DocumentDbStorage storage) - { - Storage = storage; - QueueProviders = storage.QueueProviders; - } - - public override IDisposable AcquireDistributedLock(string resource, TimeSpan timeout) => new DocumentDbDistributedLock(resource, timeout, Storage); - public override IWriteOnlyTransaction CreateWriteTransaction() => new DocumentDbWriteOnlyTransaction(this); - - #region Job - - public override string CreateExpiredJob(Common.Job job, IDictionary parameters, DateTime createdAt, TimeSpan expireIn) - { - if (job == null) throw new ArgumentNullException(nameof(job)); - if (parameters == null) throw new ArgumentNullException(nameof(parameters)); - - InvocationData invocationData = InvocationData.SerializeJob(job); - Documents.Job entityJob = new Documents.Job - { - InvocationData = invocationData, - Arguments = invocationData.Arguments, - CreatedOn = createdAt, - ExpireOn = createdAt.Add(expireIn), - - Parameters = parameters.Select(p => new Parameter - { - Name = p.Key, - Value = p.Value - }).ToArray() - }; - - Task> task = Storage.Client.CreateDocumentWithRetriesAsync(Storage.CollectionUri, entityJob); - task.Wait(); - - if (task.Result.StatusCode == HttpStatusCode.Created || task.Result.StatusCode == HttpStatusCode.OK) - { - return entityJob.Id; - } - - return string.Empty; - } - - public override IFetchedJob FetchNextJob(string[] queues, CancellationToken cancellationToken) - { - if (queues == null || queues.Length == 0) throw new ArgumentNullException(nameof(queues)); - - IPersistentJobQueueProvider[] providers = queues.Select(q => QueueProviders.GetProvider(q)) - .Distinct() - .ToArray(); - - if (providers.Length != 1) - { - throw new InvalidOperationException($"Multiple provider instances registered for queues: {string.Join(", ", queues)}. You should choose only one type of persistent queues per server instance."); - } - - IPersistentJobQueue persistentQueue = providers.Single().GetJobQueue(); - IFetchedJob queue = persistentQueue.Dequeue(queues, cancellationToken); - return queue; - } - - public override JobData GetJobData(string jobId) - { - if (jobId == null) throw new ArgumentNullException(nameof(jobId)); - - Uri uri = UriFactory.CreateDocumentUri(Storage.Options.DatabaseName, Storage.Options.CollectionName, jobId); - Task> task = Storage.Client.ReadDocumentWithRetriesAsync(uri); - task.Wait(); - - if (task.Result.Document != null) - { - Documents.Job data = task.Result; - InvocationData invocationData = data.InvocationData; - invocationData.Arguments = data.Arguments; - - Common.Job job = null; - JobLoadException loadException = null; - - try - { - job = invocationData.DeserializeJob(); - } - catch (JobLoadException ex) - { - loadException = ex; - } - - return new JobData - { - Job = job, - State = data.StateName, - CreatedAt = data.CreatedOn, - LoadException = loadException - }; - } - - return null; - } - - public override StateData GetStateData(string jobId) - { - if (jobId == null) throw new ArgumentNullException(nameof(jobId)); - - Uri uri = UriFactory.CreateDocumentUri(Storage.Options.DatabaseName, Storage.Options.CollectionName, jobId); - Task> task = Storage.Client.ReadDocumentWithRetriesAsync(uri); - task.Wait(); - - if (task.Result.Document != null) - { - Documents.Job job = task.Result; - - // get the state document - uri = UriFactory.CreateDocumentUri(Storage.Options.DatabaseName, Storage.Options.CollectionName, job.StateId); - Task> stateTask = Storage.Client.ReadDocumentWithRetriesAsync(uri); - stateTask.Wait(); - - if (stateTask.Result.Document != null) - { - State state = stateTask.Result; - return new StateData - { - Name = state.Name, - Reason = state.Reason, - Data = state.Data - }; - } - } - - return null; - } - - #endregion - - #region Parameter - - public override string GetJobParameter(string id, string name) - { - if (id == null) throw new ArgumentNullException(nameof(id)); - if (name == null) throw new ArgumentNullException(nameof(name)); - - Uri uri = UriFactory.CreateDocumentUri(Storage.Options.DatabaseName, Storage.Options.CollectionName, id); - Task> task = Storage.Client.ReadDocumentWithRetriesAsync(uri); - Documents.Job data = task.Result; - - return data?.Parameters.Where(p => p.Name == name).Select(p => p.Value).FirstOrDefault(); - } - - public override void SetJobParameter(string id, string name, string value) - { - if (id == null) throw new ArgumentNullException(nameof(id)); - if (name == null) throw new ArgumentNullException(nameof(name)); - - Parameter parameter = new Parameter - { - Value = value, - Name = name - }; - - Uri spSetJobParameterUri = UriFactory.CreateStoredProcedureUri(Storage.Options.DatabaseName, Storage.Options.CollectionName, "setJobParameter"); - Task> task = Storage.Client.ExecuteStoredProcedureWithRetriesAsync(spSetJobParameterUri, id, parameter); - task.Wait(); - } - - #endregion - - #region Set - - public override TimeSpan GetSetTtl(string key) - { - if (key == null) throw new ArgumentNullException(nameof(key)); - - SqlQuerySpec sql = new SqlQuerySpec - { - QueryText = "SELECT TOP 1 VALUE MIN(doc['expire_on']) FROM doc WHERE doc.type = @type AND doc.key = @key", - Parameters = new SqlParameterCollection - { - new SqlParameter("@key", key), - new SqlParameter("@type", (int)DocumentTypes.Set) - } - }; - - int? expireOn = Storage.Client.CreateDocumentQuery(Storage.CollectionUri, sql) - .ToQueryResult() - .FirstOrDefault(); - - return expireOn.HasValue ? expireOn.Value.ToDateTime() - DateTime.UtcNow : TimeSpan.FromSeconds(-1); - } - - public override List GetRangeFromSet(string key, int startingFrom, int endingAt) - { - if (key == null) throw new ArgumentNullException(nameof(key)); - - FeedOptions feedOptions = new FeedOptions - { - EnableCrossPartitionQuery = true - }; - endingAt += 1 - startingFrom; - - return Storage.Client.CreateDocumentQuery(Storage.CollectionUri, feedOptions) - .Where(s => s.DocumentType == DocumentTypes.Set && s.Key == key) - .OrderBy(s => s.CreatedOn) - .Skip(startingFrom).Take(endingAt) - .Select(s => s.Value) - .ToQueryResult(); - } - - public override long GetCounter(string key) - { - if (key == null) throw new ArgumentNullException(nameof(key)); - - SqlQuerySpec sql = new SqlQuerySpec - { - QueryText = "SELECT TOP 1 VALUE SUM(doc['value']) FROM doc WHERE doc.type = @type AND doc.key = @key", - Parameters = new SqlParameterCollection - { - new SqlParameter("@key", key), - new SqlParameter("@type", (int)DocumentTypes.Counter) - } - }; - - return Storage.Client.CreateDocumentQuery(Storage.CollectionUri, sql) - .ToQueryResult() - .FirstOrDefault(); - } - - public override long GetSetCount(string key) - { - if (key == null) throw new ArgumentNullException(nameof(key)); - - SqlQuerySpec sql = new SqlQuerySpec - { - QueryText = "SELECT TOP 1 VALUE COUNT(1) FROM doc WHERE doc.type = @type AND doc.key = @key", - Parameters = new SqlParameterCollection - { - new SqlParameter("@key", key), - new SqlParameter("@type",(int)DocumentTypes.Set) - } - }; - - return Storage.Client.CreateDocumentQuery(Storage.CollectionUri, sql) - .ToQueryResult() - .FirstOrDefault(); - } - - public override HashSet GetAllItemsFromSet(string key) - { - if (key == null) throw new ArgumentNullException(nameof(key)); - - IEnumerable sets = Storage.Client.CreateDocumentQuery(Storage.CollectionUri) - .Where(s => s.DocumentType == DocumentTypes.Set && s.Key == key) - .Select(s => s.Value) - .ToQueryResult(); - - return new HashSet(sets); - } - - public override string GetFirstByLowestScoreFromSet(string key, double fromScore, double toScore) - { - return GetFirstByLowestScoreFromSet(key, fromScore, toScore, 1).FirstOrDefault(); - } - - public override List GetFirstByLowestScoreFromSet(string key, double fromScore, double toScore, int count) - { - if (key == null) throw new ArgumentNullException(nameof(key)); - if (count <= 0) throw new ArgumentException("The value must be a positive number", nameof(count)); - if (toScore < fromScore) throw new ArgumentException("The `toScore` value must be higher or equal to the `fromScore` value."); - - return Storage.Client.CreateDocumentQuery(Storage.CollectionUri) - .Where(s => s.DocumentType == DocumentTypes.Set && s.Key == key && s.Score >= fromScore && s.Score <= toScore) - .OrderBy(s => s.Score) - .Take(count) - .Select(s => s.Value) - .ToQueryResult() - .ToList(); - } - - #endregion - - #region Server - - public override void AnnounceServer(string serverId, ServerContext context) - { - if (serverId == null) throw new ArgumentNullException(nameof(serverId)); - if (context == null) throw new ArgumentNullException(nameof(context)); - - Documents.Server server = new Documents.Server - { - Id = $"{serverId}:{DocumentTypes.Server}".GenerateHash(), - ServerId = serverId, - Workers = context.WorkerCount, - Queues = context.Queues, - CreatedOn = DateTime.UtcNow, - LastHeartbeat = DateTime.UtcNow - }; - Task> task = Storage.Client.UpsertDocumentWithRetriesAsync(Storage.CollectionUri, server); - task.Wait(); - } - - public override void Heartbeat(string serverId) - { - if (serverId == null) throw new ArgumentNullException(nameof(serverId)); - string id = $"{serverId}:{DocumentTypes.Server}".GenerateHash(); - - Uri spHeartbeatServerUri = UriFactory.CreateStoredProcedureUri(Storage.Options.DatabaseName, Storage.Options.CollectionName, "heartbeatServer"); - Task> task = Storage.Client.ExecuteStoredProcedureWithRetriesAsync(spHeartbeatServerUri, id, DateTime.UtcNow.ToEpoch()); - task.Wait(); - } - - public override void RemoveServer(string serverId) - { - if (serverId == null) throw new ArgumentNullException(nameof(serverId)); - string id = $"{serverId}:{DocumentTypes.Server}".GenerateHash(); - - Uri documentUri = UriFactory.CreateDocumentUri(Storage.Options.DatabaseName, Storage.Options.CollectionName, id); - Task> task = Storage.Client.DeleteDocumentWithRetriesAsync(documentUri); - task.Wait(); - } - - public override int RemoveTimedOutServers(TimeSpan timeOut) - { - if (timeOut.Duration() != timeOut) - { - throw new ArgumentException(@"invalid timeout", nameof(timeOut)); - } - - int lastHeartbeat = DateTime.UtcNow.Add(timeOut.Negate()).ToEpoch(); - string query = $"SELECT doc._self FROM doc WHERE doc.type = {(int)DocumentTypes.Server} AND IS_DEFINED(doc.last_heartbeat) " + - $"AND doc.last_heartbeat <= {lastHeartbeat}"; - - return Storage.Client.ExecuteDeleteDocuments(query); - } - - #endregion - - #region Hash - - public override Dictionary GetAllEntriesFromHash(string key) - { - if (key == null) throw new ArgumentNullException(nameof(key)); - - return Storage.Client.CreateDocumentQuery(Storage.CollectionUri) - .Where(h => h.DocumentType == DocumentTypes.Hash && h.Key == key) - .Select(h => new { h.Field, h.Value }) - .ToQueryResult() - .ToDictionary(h => h.Field, h => h.Value); - } - - public override void SetRangeInHash(string key, IEnumerable> keyValuePairs) - { - if (key == null) throw new ArgumentNullException(nameof(key)); - if (keyValuePairs == null) throw new ArgumentNullException(nameof(keyValuePairs)); - - Data data = new Data(); - - List hashes = Storage.Client.CreateDocumentQuery(Storage.CollectionUri) - .Where(h => h.DocumentType == DocumentTypes.Hash && h.Key == key) - .ToQueryResult(); - - Hash[] sources = keyValuePairs.Select(k => new Hash - { - Key = key, - Field = k.Key, - Value = k.Value.TryParseToEpoch() - }).ToArray(); - - foreach (Hash source in sources) - { - Hash hash = hashes.SingleOrDefault(h => h.Field == source.Field); - if (hash == null) - { - data.Items.Add(source); - } - else if (!string.Equals(hash.Value, source.Value, StringComparison.InvariantCultureIgnoreCase)) - { - hash.Value = source.Value; - data.Items.Add(hash); - } - } - - Storage.Client.ExecuteUpsertDocuments(data); - } - - public override long GetHashCount(string key) - { - if (key == null) throw new ArgumentNullException(nameof(key)); - - SqlQuerySpec sql = new SqlQuerySpec - { - QueryText = "SELECT TOP 1 VALUE COUNT(1) FROM doc WHERE doc.type = @type AND doc.key = @key", - Parameters = new SqlParameterCollection - { - new SqlParameter("@key", key), - new SqlParameter("@type", (int)DocumentTypes.Hash) - } - }; - - return Storage.Client.CreateDocumentQuery(Storage.CollectionUri, sql) - .ToQueryResult() - .FirstOrDefault(); - } - - public override string GetValueFromHash(string key, string name) - { - if (key == null) throw new ArgumentNullException(nameof(key)); - if (name == null) throw new ArgumentNullException(nameof(name)); - - SqlQuerySpec sql = new SqlQuerySpec - { - QueryText = "SELECT TOP 1 VALUE doc['value'] FROM doc WHERE doc.type = @type AND doc.key = @key AND doc.field = @field", - Parameters = new SqlParameterCollection - { - new SqlParameter("@key", key), - new SqlParameter("@field", name), - new SqlParameter("@type", (int)DocumentTypes.Hash) - } - }; - - return Storage.Client.CreateDocumentQuery(Storage.CollectionUri, sql) - .ToQueryResult() - .FirstOrDefault(); - } - - public override TimeSpan GetHashTtl(string key) - { - if (key == null) throw new ArgumentNullException(nameof(key)); - - SqlQuerySpec sql = new SqlQuerySpec - { - QueryText = "SELECT TOP 1 VALUE MIN(doc['expire_on']) FROM doc WHERE doc.type = @type AND doc.key = @key ", - Parameters = new SqlParameterCollection - { - new SqlParameter("@key", key), - new SqlParameter("@type", (int)DocumentTypes.Hash) - } - }; - - int? expireOn = Storage.Client.CreateDocumentQuery(Storage.CollectionUri, sql) - .ToQueryResult() - .FirstOrDefault(); - - return expireOn.HasValue ? expireOn.Value.ToDateTime() - DateTime.UtcNow : TimeSpan.FromSeconds(-1); - } - - #endregion - - #region List - - public override List GetAllItemsFromList(string key) - { - if (key == null) throw new ArgumentNullException(nameof(key)); - - return Storage.Client.CreateDocumentQuery(Storage.CollectionUri) - .Where(l => l.DocumentType == DocumentTypes.List && l.Key == key) - .OrderByDescending(l => l.CreatedOn) - .Select(l => l.Value) - .ToQueryResult(); - } - - public override List GetRangeFromList(string key, int startingFrom, int endingAt) - { - if (key == null) throw new ArgumentNullException(nameof(key)); - - FeedOptions feedOptions = new FeedOptions - { - EnableCrossPartitionQuery = true - }; - endingAt += 1 - startingFrom; - - return Storage.Client.CreateDocumentQuery(Storage.CollectionUri, feedOptions) - .Where(l => l.DocumentType == DocumentTypes.List && l.Key == key) - .OrderByDescending(l => l.CreatedOn) - .Skip(startingFrom).Take(endingAt) - .Select(l => l.Value) - .ToQueryResult(); - } - - public override TimeSpan GetListTtl(string key) - { - if (key == null) throw new ArgumentNullException(nameof(key)); - - - SqlQuerySpec sql = new SqlQuerySpec - { - QueryText = "SELECT TOP 1 VALUE MIN(doc['expire_on']) FROM doc WHERE doc.type = @type AND doc.key = @key", - Parameters = new SqlParameterCollection - { - new SqlParameter("@key", key), - new SqlParameter("@type", (int)DocumentTypes.List) - } - }; - - int? expireOn = Storage.Client.CreateDocumentQuery(Storage.CollectionUri, sql) - .ToQueryResult() - .FirstOrDefault(); - - return expireOn.HasValue ? expireOn.Value.ToDateTime() - DateTime.UtcNow : TimeSpan.FromSeconds(-1); - } - - public override long GetListCount(string key) - { - if (key == null) throw new ArgumentNullException(nameof(key)); - - SqlQuerySpec sql = new SqlQuerySpec - { - QueryText = "SELECT TOP 1 VALUE COUNT(1) FROM doc WHERE doc.type = @type AND doc.key = @key", - Parameters = new SqlParameterCollection - { - new SqlParameter("@key", key), - new SqlParameter("@type",(int)DocumentTypes.List) - } - }; - - return Storage.Client.CreateDocumentQuery(Storage.CollectionUri, sql) - .ToQueryResult() - .FirstOrDefault(); - } - - #endregion - - } +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Threading; +using System.Threading.Tasks; + +using Hangfire.Azure.Documents; +using Hangfire.Azure.Documents.Helper; +using Hangfire.Azure.Helper; +using Hangfire.Azure.Queue; +using Hangfire.Common; +using Hangfire.Server; +using Hangfire.Storage; + +using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.Client; + +namespace Hangfire.Azure +{ + internal sealed class DocumentDbConnection : JobStorageConnection + { + public DocumentDbStorage Storage { get; } + public PersistentJobQueueProviderCollection QueueProviders { get; } + + public DocumentDbConnection(DocumentDbStorage storage) + { + Storage = storage; + QueueProviders = storage.QueueProviders; + } + + public override IDisposable AcquireDistributedLock(string resource, TimeSpan timeout) => new DocumentDbDistributedLock(resource, timeout, Storage); + public override IWriteOnlyTransaction CreateWriteTransaction() => new DocumentDbWriteOnlyTransaction(this); + + #region Job + + public override string CreateExpiredJob(Common.Job job, IDictionary parameters, DateTime createdAt, TimeSpan expireIn) + { + if (job == null) throw new ArgumentNullException(nameof(job)); + if (parameters == null) throw new ArgumentNullException(nameof(parameters)); + + InvocationData invocationData = InvocationData.SerializeJob(job); + Documents.Job entityJob = new Documents.Job + { + InvocationData = invocationData, + Arguments = invocationData.Arguments, + CreatedOn = createdAt, + ExpireOn = createdAt.Add(expireIn), + + Parameters = parameters.Select(p => new Parameter + { + Name = p.Key, + Value = p.Value + }).ToArray() + }; + + Task> task = Storage.Client.CreateDocumentWithRetriesAsync(Storage.CollectionUri, entityJob, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Job) }); + task.Wait(); + + if (task.Result.StatusCode == HttpStatusCode.Created || task.Result.StatusCode == HttpStatusCode.OK) + { + return entityJob.Id; + } + + return string.Empty; + } + + public override IFetchedJob FetchNextJob(string[] queues, CancellationToken cancellationToken) + { + if (queues == null || queues.Length == 0) throw new ArgumentNullException(nameof(queues)); + + IPersistentJobQueueProvider[] providers = queues.Select(q => QueueProviders.GetProvider(q)) + .Distinct() + .ToArray(); + + if (providers.Length != 1) + { + throw new InvalidOperationException($"Multiple provider instances registered for queues: {string.Join(", ", queues)}. You should choose only one type of persistent queues per server instance."); + } + + IPersistentJobQueue persistentQueue = providers.Single().GetJobQueue(); + IFetchedJob queue = persistentQueue.Dequeue(queues, cancellationToken); + return queue; + } + + public override JobData GetJobData(string jobId) + { + if (jobId == null) throw new ArgumentNullException(nameof(jobId)); + + Uri uri = UriFactory.CreateDocumentUri(Storage.Options.DatabaseName, Storage.Options.CollectionName, jobId); + Task> task = Storage.Client.ReadDocumentWithRetriesAsync(uri, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Job) }); + task.Wait(); + + if (task.Result.Document != null) + { + Documents.Job data = task.Result; + InvocationData invocationData = data.InvocationData; + invocationData.Arguments = data.Arguments; + + Common.Job job = null; + JobLoadException loadException = null; + + try + { + job = invocationData.DeserializeJob(); + } + catch (JobLoadException ex) + { + loadException = ex; + } + + return new JobData + { + Job = job, + State = data.StateName, + CreatedAt = data.CreatedOn, + LoadException = loadException + }; + } + + return null; + } + + public override StateData GetStateData(string jobId) + { + if (jobId == null) throw new ArgumentNullException(nameof(jobId)); + + Uri uri = UriFactory.CreateDocumentUri(Storage.Options.DatabaseName, Storage.Options.CollectionName, jobId); + Task> task = Storage.Client.ReadDocumentWithRetriesAsync(uri, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Job) }); + task.Wait(); + + if (task.Result.Document != null) + { + Documents.Job job = task.Result; + + // get the state document + uri = UriFactory.CreateDocumentUri(Storage.Options.DatabaseName, Storage.Options.CollectionName, job.StateId); + Task> stateTask = Storage.Client.ReadDocumentWithRetriesAsync(uri, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.State) }); + stateTask.Wait(); + + if (stateTask.Result.Document != null) + { + State state = stateTask.Result; + return new StateData + { + Name = state.Name, + Reason = state.Reason, + Data = state.Data + }; + } + } + + return null; + } + + #endregion + + #region Parameter + + public override string GetJobParameter(string id, string name) + { + if (id == null) throw new ArgumentNullException(nameof(id)); + if (name == null) throw new ArgumentNullException(nameof(name)); + + Uri uri = UriFactory.CreateDocumentUri(Storage.Options.DatabaseName, Storage.Options.CollectionName, id); + Task> task = Storage.Client.ReadDocumentWithRetriesAsync(uri, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Job) }); + Documents.Job data = task.Result; + + return data?.Parameters.Where(p => p.Name == name).Select(p => p.Value).FirstOrDefault(); + } + + public override void SetJobParameter(string id, string name, string value) + { + if (id == null) throw new ArgumentNullException(nameof(id)); + if (name == null) throw new ArgumentNullException(nameof(name)); + + Parameter parameter = new Parameter + { + Value = value, + Name = name + }; + + Uri spSetJobParameterUri = UriFactory.CreateStoredProcedureUri(Storage.Options.DatabaseName, Storage.Options.CollectionName, "setJobParameter"); + Task> task = Storage.Client.ExecuteStoredProcedureWithRetriesAsync(spSetJobParameterUri, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Job) }, default, id, parameter); + task.Wait(); + } + + #endregion + + #region Set + + public override TimeSpan GetSetTtl(string key) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + + SqlQuerySpec sql = new SqlQuerySpec + { + QueryText = "SELECT TOP 1 VALUE MIN(doc['expire_on']) FROM doc WHERE doc.type = @type AND doc.key = @key", + Parameters = new SqlParameterCollection + { + new SqlParameter("@key", key), + new SqlParameter("@type", (int)DocumentTypes.Set) + } + }; + + int? expireOn = Storage.Client.CreateDocumentQueryAsync(Storage.CollectionUri, sql, new FeedOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Set) }) + .ToQueryResult() + .FirstOrDefault(); + + return expireOn.HasValue ? expireOn.Value.ToDateTime() - DateTime.UtcNow : TimeSpan.FromSeconds(-1); + } + + public override List GetRangeFromSet(string key, int startingFrom, int endingAt) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + + endingAt += 1 - startingFrom; + + return Storage.Client.CreateDocumentQueryAsync(Storage.CollectionUri, new FeedOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Set) }) + .Where(s => s.DocumentType == DocumentTypes.Set && s.Key == key) + .OrderBy(s => s.CreatedOn) + .Skip(startingFrom).Take(endingAt) + .Select(s => s.Value) + .ToQueryResult() + .ToList(); + } + + public override long GetCounter(string key) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + + SqlQuerySpec sql = new SqlQuerySpec + { + QueryText = "SELECT TOP 1 VALUE SUM(doc['value']) FROM doc WHERE doc.type = @type AND doc.key = @key", + Parameters = new SqlParameterCollection + { + new SqlParameter("@key", key), + new SqlParameter("@type", (int)DocumentTypes.Counter) + } + }; + + return Storage.Client.CreateDocumentQueryAsync(Storage.CollectionUri, sql, new FeedOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Counter) }) + .ToQueryResult() + .FirstOrDefault(); + } + + public override long GetSetCount(string key) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + + SqlQuerySpec sql = new SqlQuerySpec + { + QueryText = "SELECT TOP 1 VALUE COUNT(1) FROM doc WHERE doc.type = @type AND doc.key = @key", + Parameters = new SqlParameterCollection + { + new SqlParameter("@key", key), + new SqlParameter("@type",(int)DocumentTypes.Set) + } + }; + + return Storage.Client.CreateDocumentQueryAsync(Storage.CollectionUri, sql, new FeedOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Set) }) + .ToQueryResult() + .FirstOrDefault(); + } + + public override HashSet GetAllItemsFromSet(string key) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + + IEnumerable sets = Storage.Client.CreateDocumentQueryAsync(Storage.CollectionUri, new FeedOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Set) }) + .Where(s => s.DocumentType == DocumentTypes.Set && s.Key == key) + .Select(s => s.Value) + .ToQueryResult(); + + return new HashSet(sets); + } + + public override string GetFirstByLowestScoreFromSet(string key, double fromScore, double toScore) + { + return GetFirstByLowestScoreFromSet(key, fromScore, toScore, 1).FirstOrDefault(); + } + + public override List GetFirstByLowestScoreFromSet(string key, double fromScore, double toScore, int count) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + if (count <= 0) throw new ArgumentException("The value must be a positive number", nameof(count)); + if (toScore < fromScore) throw new ArgumentException("The `toScore` value must be higher or equal to the `fromScore` value."); + + return Storage.Client.CreateDocumentQueryAsync(Storage.CollectionUri, new FeedOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Set) }) + .Where(s => s.DocumentType == DocumentTypes.Set && s.Key == key && s.Score >= fromScore && s.Score <= toScore) + .OrderBy(s => s.Score) + .Take(count) + .Select(s => s.Value) + .ToQueryResult() + .ToList(); + } + + #endregion + + #region Server + + public override void AnnounceServer(string serverId, ServerContext context) + { + if (serverId == null) throw new ArgumentNullException(nameof(serverId)); + if (context == null) throw new ArgumentNullException(nameof(context)); + + Documents.Server server = new Documents.Server + { + Id = $"{serverId}:{DocumentTypes.Server}".GenerateHash(), + ServerId = serverId, + Workers = context.WorkerCount, + Queues = context.Queues, + CreatedOn = DateTime.UtcNow, + LastHeartbeat = DateTime.UtcNow + }; + Task> task = Storage.Client.UpsertDocumentWithRetriesAsync(Storage.CollectionUri, server, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Server) }); + task.Wait(); + } + + public override void Heartbeat(string serverId) + { + if (serverId == null) throw new ArgumentNullException(nameof(serverId)); + string id = $"{serverId}:{DocumentTypes.Server}".GenerateHash(); + + Uri spHeartbeatServerUri = UriFactory.CreateStoredProcedureUri(Storage.Options.DatabaseName, Storage.Options.CollectionName, "heartbeatServer"); + Task> task = Storage.Client.ExecuteStoredProcedureWithRetriesAsync(spHeartbeatServerUri, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Server) }, default, id, DateTime.UtcNow.ToEpoch()); + task.Wait(); + } + + public override void RemoveServer(string serverId) + { + if (serverId == null) throw new ArgumentNullException(nameof(serverId)); + string id = $"{serverId}:{DocumentTypes.Server}".GenerateHash(); + + Uri documentUri = UriFactory.CreateDocumentUri(Storage.Options.DatabaseName, Storage.Options.CollectionName, id); + Task> task = Storage.Client.DeleteDocumentWithRetriesAsync(documentUri, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Server) }); + task.Wait(); + } + + public override int RemoveTimedOutServers(TimeSpan timeOut) + { + if (timeOut.Duration() != timeOut) + { + throw new ArgumentException(@"invalid timeout", nameof(timeOut)); + } + + int lastHeartbeat = DateTime.UtcNow.Add(timeOut.Negate()).ToEpoch(); + string query = $"SELECT doc._self FROM doc WHERE doc.type = {(int)DocumentTypes.Server} AND IS_DEFINED(doc.last_heartbeat) " + + $"AND doc.last_heartbeat <= {lastHeartbeat}"; + + return Storage.Client.ExecuteDeleteDocuments(query, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Server) }); + } + + #endregion + + #region Hash + + public override Dictionary GetAllEntriesFromHash(string key) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + + return Storage.Client.CreateDocumentQueryAsync(Storage.CollectionUri, new FeedOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Hash) }) + .Where(h => h.DocumentType == DocumentTypes.Hash && h.Key == key) + .Select(h => new { h.Field, h.Value }) + .ToQueryResult() + .ToDictionary(h => h.Field, h => h.Value); + } + + public override void SetRangeInHash(string key, IEnumerable> keyValuePairs) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + if (keyValuePairs == null) throw new ArgumentNullException(nameof(keyValuePairs)); + + Data data = new Data(); + + PartitionKey partitionKey = new PartitionKey((int)DocumentTypes.Hash); + List hashes = Storage.Client.CreateDocumentQueryAsync(Storage.CollectionUri, new FeedOptions { PartitionKey = partitionKey }) + .Where(h => h.DocumentType == DocumentTypes.Hash && h.Key == key) + .ToQueryResult() + .ToList(); + + Hash[] sources = keyValuePairs.Select(k => new Hash + { + Key = key, + Field = k.Key, + Value = k.Value.TryParseToEpoch() + }).ToArray(); + + foreach (Hash source in sources) + { + Hash hash = hashes.SingleOrDefault(h => h.Field == source.Field); + if (hash == null) + { + data.Items.Add(source); + } + else if (!string.Equals(hash.Value, source.Value, StringComparison.InvariantCultureIgnoreCase)) + { + hash.Value = source.Value; + data.Items.Add(hash); + } + } + + Storage.Client.ExecuteUpsertDocuments(data, new RequestOptions { PartitionKey = partitionKey }); + } + + public override long GetHashCount(string key) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + + SqlQuerySpec sql = new SqlQuerySpec + { + QueryText = "SELECT TOP 1 VALUE COUNT(1) FROM doc WHERE doc.type = @type AND doc.key = @key", + Parameters = new SqlParameterCollection + { + new SqlParameter("@key", key), + new SqlParameter("@type", (int)DocumentTypes.Hash) + } + }; + + return Storage.Client.CreateDocumentQueryAsync(Storage.CollectionUri, sql, new FeedOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Hash) }) + .ToQueryResult() + .FirstOrDefault(); + } + + public override string GetValueFromHash(string key, string name) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + if (name == null) throw new ArgumentNullException(nameof(name)); + + SqlQuerySpec sql = new SqlQuerySpec + { + QueryText = "SELECT TOP 1 VALUE doc['value'] FROM doc WHERE doc.type = @type AND doc.key = @key AND doc.field = @field", + Parameters = new SqlParameterCollection + { + new SqlParameter("@key", key), + new SqlParameter("@field", name), + new SqlParameter("@type", (int)DocumentTypes.Hash) + } + }; + + return Storage.Client.CreateDocumentQueryAsync(Storage.CollectionUri, sql, new FeedOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Hash) }) + .ToQueryResult() + .FirstOrDefault(); + } + + public override TimeSpan GetHashTtl(string key) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + + SqlQuerySpec sql = new SqlQuerySpec + { + QueryText = "SELECT TOP 1 VALUE MIN(doc['expire_on']) FROM doc WHERE doc.type = @type AND doc.key = @key ", + Parameters = new SqlParameterCollection + { + new SqlParameter("@key", key), + new SqlParameter("@type", (int)DocumentTypes.Hash) + } + }; + + int? expireOn = Storage.Client.CreateDocumentQueryAsync(Storage.CollectionUri, sql, new FeedOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Hash) }) + .ToQueryResult() + .FirstOrDefault(); + + return expireOn.HasValue ? expireOn.Value.ToDateTime() - DateTime.UtcNow : TimeSpan.FromSeconds(-1); + } + + #endregion + + #region List + + public override List GetAllItemsFromList(string key) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + + return Storage.Client.CreateDocumentQueryAsync(Storage.CollectionUri, new FeedOptions { PartitionKey = new PartitionKey((int)DocumentTypes.List) }) + .Where(l => l.DocumentType == DocumentTypes.List && l.Key == key) + .OrderByDescending(l => l.CreatedOn) + .Select(l => l.Value) + .ToQueryResult() + .ToList(); + } + + public override List GetRangeFromList(string key, int startingFrom, int endingAt) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + + endingAt += 1 - startingFrom; + + return Storage.Client.CreateDocumentQueryAsync(Storage.CollectionUri, new FeedOptions { PartitionKey = new PartitionKey((int)DocumentTypes.List) }) + .Where(l => l.DocumentType == DocumentTypes.List && l.Key == key) + .OrderByDescending(l => l.CreatedOn) + .Skip(startingFrom).Take(endingAt) + .Select(l => l.Value) + .ToQueryResult() + .ToList(); + } + + public override TimeSpan GetListTtl(string key) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + + SqlQuerySpec sql = new SqlQuerySpec + { + QueryText = "SELECT TOP 1 VALUE MIN(doc['expire_on']) FROM doc WHERE doc.type = @type AND doc.key = @key", + Parameters = new SqlParameterCollection + { + new SqlParameter("@key", key), + new SqlParameter("@type", (int)DocumentTypes.List) + } + }; + + int? expireOn = Storage.Client.CreateDocumentQueryAsync(Storage.CollectionUri, sql, new FeedOptions { PartitionKey = new PartitionKey((int)DocumentTypes.List) }) + .ToQueryResult() + .FirstOrDefault(); + + return expireOn.HasValue ? expireOn.Value.ToDateTime() - DateTime.UtcNow : TimeSpan.FromSeconds(-1); + } + + public override long GetListCount(string key) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + + SqlQuerySpec sql = new SqlQuerySpec + { + QueryText = "SELECT TOP 1 VALUE COUNT(1) FROM doc WHERE doc.type = @type AND doc.key = @key", + Parameters = new SqlParameterCollection + { + new SqlParameter("@key", key), + new SqlParameter("@type",(int)DocumentTypes.List) + } + }; + + return Storage.Client.CreateDocumentQueryAsync(Storage.CollectionUri, sql, new FeedOptions { PartitionKey = new PartitionKey((int)DocumentTypes.List) }) + .ToQueryResult() + .FirstOrDefault(); + } + + #endregion + + } } \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB/DocumentDbDistributedLock.cs b/src/DocumentDbDistributedLock.cs similarity index 91% rename from Hangfire.AzureDocumentDB/DocumentDbDistributedLock.cs rename to src/DocumentDbDistributedLock.cs index 1ec3aa5..62e79d4 100644 --- a/Hangfire.AzureDocumentDB/DocumentDbDistributedLock.cs +++ b/src/DocumentDbDistributedLock.cs @@ -2,13 +2,13 @@ using System.Net; using System.Threading.Tasks; +using Hangfire.Azure.Documents; +using Hangfire.Azure.Helper; using Hangfire.Logging; + using Microsoft.Azure.Documents; using Microsoft.Azure.Documents.Client; -using Hangfire.Azure.Helper; -using Hangfire.Azure.Documents; - namespace Hangfire.Azure { internal class DocumentDbDistributedLock : IDisposable @@ -17,6 +17,7 @@ internal class DocumentDbDistributedLock : IDisposable private readonly string resource; private readonly DocumentDbStorage storage; private string resourceId; + private readonly PartitionKey partitionKey = new PartitionKey((int)DocumentTypes.Lock); public DocumentDbDistributedLock(string resource, TimeSpan timeout, DocumentDbStorage storage) { @@ -30,7 +31,7 @@ public void Dispose() if (!string.IsNullOrEmpty(resourceId)) { Uri uri = UriFactory.CreateDocumentUri(storage.Options.DatabaseName, storage.Options.CollectionName, resourceId); - Task task = storage.Client.DeleteDocumentWithRetriesAsync(uri).ContinueWith(t => + Task task = storage.Client.DeleteDocumentWithRetriesAsync(uri, new RequestOptions { PartitionKey = partitionKey }).ContinueWith(t => { resourceId = string.Empty; logger.Trace($"Lock released for {resource}"); @@ -56,7 +57,7 @@ private void Acquire(TimeSpan timeout) try { - Task> readTask = storage.Client.ReadDocumentWithRetriesAsync(uri); + Task> readTask = storage.Client.ReadDocumentWithRetriesAsync(uri, new RequestOptions { PartitionKey = partitionKey }); readTask.Wait(); if (readTask.Result.Document != null) @@ -65,7 +66,7 @@ private void Acquire(TimeSpan timeout) @lock.ExpireOn = DateTime.UtcNow.Add(timeout); @lock.TimeToLive = (int)ttl.TotalSeconds; - Task> updateTask = storage.Client.UpsertDocumentWithRetriesAsync(storage.CollectionUri, @lock); + Task> updateTask = storage.Client.UpsertDocumentWithRetriesAsync(storage.CollectionUri, @lock, new RequestOptions { PartitionKey = partitionKey }); updateTask.Wait(); if (updateTask.Result.StatusCode == HttpStatusCode.OK) @@ -85,7 +86,7 @@ private void Acquire(TimeSpan timeout) TimeToLive = (int)ttl.TotalSeconds }; - Task> createTask = storage.Client.UpsertDocumentWithRetriesAsync(storage.CollectionUri, @lock); + Task> createTask = storage.Client.UpsertDocumentWithRetriesAsync(storage.CollectionUri, @lock, new RequestOptions { PartitionKey = partitionKey }); createTask.Wait(); if (createTask.Result.StatusCode == HttpStatusCode.OK || createTask.Result.StatusCode == HttpStatusCode.Created) diff --git a/Hangfire.AzureDocumentDB/DocumentDbDistributedLockException.cs b/src/DocumentDbDistributedLockException.cs similarity index 100% rename from Hangfire.AzureDocumentDB/DocumentDbDistributedLockException.cs rename to src/DocumentDbDistributedLockException.cs diff --git a/Hangfire.AzureDocumentDB/DocumentDbMonitoringApi.cs b/src/DocumentDbMonitoringApi.cs similarity index 85% rename from Hangfire.AzureDocumentDB/DocumentDbMonitoringApi.cs rename to src/DocumentDbMonitoringApi.cs index a69d00a..3515ae3 100644 --- a/Hangfire.AzureDocumentDB/DocumentDbMonitoringApi.cs +++ b/src/DocumentDbMonitoringApi.cs @@ -1,484 +1,477 @@ -using System; -using System.Linq; -using System.Threading.Tasks; -using System.Collections.Generic; - -using Hangfire.Common; -using Hangfire.States; -using Hangfire.Storage; -using Microsoft.Azure.Documents; -using Hangfire.Storage.Monitoring; -using Microsoft.Azure.Documents.Client; - -using Hangfire.Azure.Queue; -using Hangfire.Azure.Helper; -using Hangfire.Azure.Documents; - -namespace Hangfire.Azure -{ - internal sealed class DocumentDbMonitoringApi : IMonitoringApi - { - private readonly DocumentDbStorage storage; - private readonly object cacheLock = new object(); - private static readonly TimeSpan cacheTimeout = TimeSpan.FromSeconds(2); - private static DateTime cacheUpdated; - private static StatisticsDto cacheStatisticsDto; - - public DocumentDbMonitoringApi(DocumentDbStorage storage) => this.storage = storage; - - public IList Queues() - { - List queueJobs = new List(); - - var tuples = storage.QueueProviders - .Select(x => x.GetJobQueueMonitoringApi()) - .SelectMany(x => x.GetQueues(), (monitoring, queue) => new { Monitoring = monitoring, Queue = queue }) - .OrderBy(x => x.Queue) - .ToArray(); - - foreach (var tuple in tuples) - { - (int? EnqueuedCount, int? FetchedCount) counters = tuple.Monitoring.GetEnqueuedAndFetchedCount(tuple.Queue); - JobList jobs = EnqueuedJobs(tuple.Queue, 0, 5); - - queueJobs.Add(new QueueWithTopEnqueuedJobsDto - { - Length = counters.EnqueuedCount ?? 0, - Fetched = counters.FetchedCount ?? 0, - Name = tuple.Queue, - FirstJobs = jobs - }); - } - - return queueJobs; - } - - public IList Servers() - { - return storage.Client.CreateDocumentQuery(storage.CollectionUri) - .Where(s => s.DocumentType == DocumentTypes.Server) - .OrderByDescending(s => s.CreatedOn) - .ToQueryResult() - .Select(server => new ServerDto - { - Name = server.ServerId, - Heartbeat = server.LastHeartbeat, - Queues = server.Queues, - StartedAt = server.CreatedOn, - WorkersCount = server.Workers - }).ToList(); - } - - public JobDetailsDto JobDetails(string jobId) - { - if (string.IsNullOrEmpty(jobId)) throw new ArgumentNullException(nameof(jobId)); - - Uri uri = UriFactory.CreateDocumentUri(storage.Options.DatabaseName, storage.Options.CollectionName, jobId); - Task> task = storage.Client.ReadDocumentWithRetriesAsync(uri); - task.Wait(); - - if (task.Result.Document != null) - { - Documents.Job job = task.Result; - InvocationData invocationData = job.InvocationData; - invocationData.Arguments = job.Arguments; - - List states = storage.Client.CreateDocumentQuery(storage.CollectionUri) - .Where(s => s.DocumentType == DocumentTypes.State && s.JobId == jobId) - .OrderByDescending(s => s.CreatedOn) - .ToQueryResult() - .Select(s => new StateHistoryDto - { - Data = s.Data, - CreatedAt = s.CreatedOn, - Reason = s.Reason, - StateName = s.Name - }).ToList(); - - return new JobDetailsDto - { - Job = invocationData.DeserializeJob(), - CreatedAt = job.CreatedOn, - ExpireAt = job.ExpireOn, - Properties = job.Parameters.ToDictionary(p => p.Name, p => p.Value), - History = states - }; - } - - return null; - } - - public StatisticsDto GetStatistics() - { - lock (cacheLock) - { - if (cacheStatisticsDto == null || cacheUpdated.Add(cacheTimeout) < DateTime.UtcNow) - { - Dictionary results = new Dictionary(); - - // get counts of jobs on state - string[] keys = { EnqueuedState.StateName, FailedState.StateName, ProcessingState.StateName, ScheduledState.StateName }; - foreach (string state in keys) - { - long states = GetNumberOfJobsByStateName(state); - results.Add(state.ToLower(), states); - } - - // get counts of servers - SqlQuerySpec sql = new SqlQuerySpec - { - QueryText = "SELECT TOP 1 VALUE COUNT(1) FROM doc WHERE doc.type = @type", - Parameters = new SqlParameterCollection - { - new SqlParameter("@type", (int)DocumentTypes.Server) - } - }; - - long servers = storage.Client.CreateDocumentQuery(storage.CollectionUri, sql) - .ToQueryResult() - .FirstOrDefault(); - - results.Add("servers", servers); - - // get sum of stats:succeeded / stats:deleted counters - keys = new[] { "stats:succeeded", "stats:deleted" }; - foreach (string key in keys) - { - sql = new SqlQuerySpec - { - QueryText = "SELECT TOP 1 VALUE SUM(doc['value']) FROM doc WHERE doc.type = @type AND doc.key = @key", - Parameters = new SqlParameterCollection - { - new SqlParameter("@key", key), - new SqlParameter("@type", (int)DocumentTypes.Counter) - } - }; - - long counters = storage.Client.CreateDocumentQuery(storage.CollectionUri, sql) - .ToQueryResult() - .FirstOrDefault(); - - results.Add(key, counters); - } - - sql = new SqlQuerySpec - { - QueryText = "SELECT TOP 1 VALUE COUNT(1) FROM doc WHERE doc.type = @type AND doc.key = @key", - Parameters = new SqlParameterCollection - { - new SqlParameter("@key", "recurring-jobs"), - new SqlParameter("@type", (int)DocumentTypes.Set) - } - }; - - long jobs = storage.Client.CreateDocumentQuery(storage.CollectionUri, sql) - .ToQueryResult() - .FirstOrDefault(); - - results.Add("recurring-jobs", jobs); - - long getValueOrDefault(string key) => results.TryGetValue(key, out long value) ? value : default(long); - - // ReSharper disable once UseObjectOrCollectionInitializer - cacheStatisticsDto = new StatisticsDto - { - Enqueued = getValueOrDefault("enqueued"), - Failed = getValueOrDefault("failed"), - Processing = getValueOrDefault("processing"), - Scheduled = getValueOrDefault("scheduled"), - Succeeded = getValueOrDefault("stats:succeeded"), - Deleted = getValueOrDefault("stats:deleted"), - Recurring = getValueOrDefault("recurring-jobs"), - Servers = getValueOrDefault("servers"), - }; - - cacheStatisticsDto.Queues = storage.QueueProviders - .SelectMany(x => x.GetJobQueueMonitoringApi().GetQueues()) - .Count(); - - cacheUpdated = DateTime.UtcNow; - } - - return cacheStatisticsDto; - } - } - - #region Job List - - public JobList EnqueuedJobs(string queue, int from, int perPage) - { - string queryText = $"SELECT * FROM doc WHERE doc.type = @type AND doc.name = @name AND NOT IS_DEFINED(doc.fetched_at) ORDER BY doc.created_on OFFSET {from} LIMIT {perPage}"; - return GetJobsOnQueue(queryText, queue, (state, job, fetchedAt) => new EnqueuedJobDto - { - Job = job, - State = state.Name, - InEnqueuedState = EnqueuedState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase), - EnqueuedAt = EnqueuedState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase) - ? JobHelper.DeserializeNullableDateTime(state.Data["EnqueuedAt"]) - : null - }); - } - - public JobList FetchedJobs(string queue, int from, int perPage) - { - string queryText = $"SELECT * FROM doc WHERE doc.type = @type AND doc.name = @name AND IS_DEFINED(doc.fetched_at) ORDER BY doc.created_on OFFSET {from} LIMIT {perPage}"; - return GetJobsOnQueue(queryText, queue, (state, job, fetchedAt) => new FetchedJobDto - { - Job = job, - State = state.Name, - FetchedAt = fetchedAt - }); - } - - public JobList ProcessingJobs(int from, int count) - { - return GetJobsOnState(ProcessingState.StateName, from, count, (state, job) => new ProcessingJobDto - { - Job = job, - InProcessingState = ProcessingState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase), - ServerId = state.Data.ContainsKey("ServerId") ? state.Data["ServerId"] : state.Data["ServerName"], - StartedAt = JobHelper.DeserializeDateTime(state.Data["StartedAt"]) - }); - } - - public JobList ScheduledJobs(int from, int count) - { - return GetJobsOnState(ScheduledState.StateName, from, count, (state, job) => new ScheduledJobDto - { - Job = job, - InScheduledState = ScheduledState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase), - EnqueueAt = JobHelper.DeserializeDateTime(state.Data["EnqueueAt"]), - ScheduledAt = JobHelper.DeserializeDateTime(state.Data["ScheduledAt"]) - }); - } - - public JobList SucceededJobs(int from, int count) - { - return GetJobsOnState(SucceededState.StateName, from, count, (state, job) => new SucceededJobDto - { - Job = job, - InSucceededState = SucceededState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase), - Result = state.Data.ContainsKey("Result") ? state.Data["Result"] : null, - TotalDuration = state.Data.ContainsKey("PerformanceDuration") && state.Data.ContainsKey("Latency") - ? (long?)long.Parse(state.Data["PerformanceDuration"]) + long.Parse(state.Data["Latency"]) - : null, - SucceededAt = JobHelper.DeserializeNullableDateTime(state.Data["SucceededAt"]) - }); - } - - public JobList FailedJobs(int from, int count) - { - return GetJobsOnState(FailedState.StateName, from, count, (state, job) => new FailedJobDto - { - Job = job, - InFailedState = FailedState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase), - Reason = state.Reason, - FailedAt = JobHelper.DeserializeNullableDateTime(state.Data["FailedAt"]), - ExceptionDetails = state.Data["ExceptionDetails"], - ExceptionMessage = state.Data["ExceptionMessage"], - ExceptionType = state.Data["ExceptionType"] - }); - } - - public JobList DeletedJobs(int from, int count) - { - return GetJobsOnState(DeletedState.StateName, from, count, (state, job) => new DeletedJobDto - { - Job = job, - InDeletedState = DeletedState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase), - DeletedAt = JobHelper.DeserializeNullableDateTime(state.Data["DeletedAt"]) - }); - } - - private JobList GetJobsOnState(string stateName, int from, int count, Func selector) - { - List> jobs = new List>(); - FeedOptions feedOptions = new FeedOptions - { - EnableCrossPartitionQuery = true - }; - - List filterJobs = storage.Client.CreateDocumentQuery(storage.CollectionUri, feedOptions) - .Where(j => j.DocumentType == DocumentTypes.Job && j.StateName == stateName) - .OrderByDescending(j => j.CreatedOn) - .Skip(from).Take(count) - .ToQueryResult(); - - filterJobs.ForEach(job => - { - Uri uri = UriFactory.CreateDocumentUri(storage.Options.DatabaseName, storage.Options.CollectionName, job.StateId); - Task> task = storage.Client.ReadDocumentWithRetriesAsync(uri); - task.Wait(); - - if (task.Result.Document != null) - { - State state = task.Result; - InvocationData invocationData = job.InvocationData; - invocationData.Arguments = job.Arguments; - - T data = selector(state, invocationData.DeserializeJob()); - jobs.Add(new KeyValuePair(job.Id, data)); - } - }); - - return new JobList(jobs); - } - - private JobList GetJobsOnQueue(string queryText, string queue, Func selector) - { - if (string.IsNullOrEmpty(queue)) throw new ArgumentNullException(nameof(queue)); - List> jobs = new List>(); - - SqlQuerySpec sql = new SqlQuerySpec - { - QueryText = queryText, - Parameters = new SqlParameterCollection - { - new SqlParameter("@type", (int)DocumentTypes.Queue), - new SqlParameter("@name", queue) - } - }; - - FeedOptions feedOptions = new FeedOptions - { - EnableCrossPartitionQuery = true - }; - - List queues = storage.Client.CreateDocumentQuery(storage.CollectionUri, sql, feedOptions) - .ToList(); - - queues.ForEach(queueItem => - { - Uri uri = UriFactory.CreateDocumentUri(storage.Options.DatabaseName, storage.Options.CollectionName, queueItem.JobId); - Task> task = storage.Client.ReadDocumentWithRetriesAsync(uri); - task.Wait(); - - if (task.Result != null) - { - Documents.Job job = task.Result; - InvocationData invocationData = job.InvocationData; - invocationData.Arguments = job.Arguments; - - uri = UriFactory.CreateDocumentUri(storage.Options.DatabaseName, storage.Options.CollectionName, job.StateId); - Task> stateTask = storage.Client.ReadDocumentWithRetriesAsync(uri); - - T data = selector(stateTask.Result, invocationData.DeserializeJob(), queueItem.FetchedAt); - jobs.Add(new KeyValuePair(job.Id, data)); - } - }); - - return new JobList(jobs); - } - - #endregion - - #region Counts - - public long EnqueuedCount(string queue) - { - if (string.IsNullOrEmpty(queue)) throw new ArgumentNullException(nameof(queue)); - - IPersistentJobQueueProvider provider = storage.QueueProviders.GetProvider(queue); - IPersistentJobQueueMonitoringApi monitoringApi = provider.GetJobQueueMonitoringApi(); - (int? EnqueuedCount, int? FetchedCount) counters = monitoringApi.GetEnqueuedAndFetchedCount(queue); - return counters.EnqueuedCount ?? 0; - } - - public long FetchedCount(string queue) - { - if (string.IsNullOrEmpty(queue)) throw new ArgumentNullException(nameof(queue)); - - IPersistentJobQueueProvider provider = storage.QueueProviders.GetProvider(queue); - IPersistentJobQueueMonitoringApi monitoringApi = provider.GetJobQueueMonitoringApi(); - (int? EnqueuedCount, int? FetchedCount) counters = monitoringApi.GetEnqueuedAndFetchedCount(queue); - return counters.FetchedCount ?? 0; - } - - public long ScheduledCount() => GetNumberOfJobsByStateName(ScheduledState.StateName); - - public long FailedCount() => GetNumberOfJobsByStateName(FailedState.StateName); - - public long ProcessingCount() => GetNumberOfJobsByStateName(ProcessingState.StateName); - - public long SucceededListCount() => GetNumberOfJobsByStateName(SucceededState.StateName); - - public long DeletedListCount() => GetNumberOfJobsByStateName(DeletedState.StateName); - - private long GetNumberOfJobsByStateName(string state) - { - SqlQuerySpec sql = new SqlQuerySpec - { - QueryText = "SELECT TOP 1 VALUE COUNT(1) FROM doc WHERE doc.type = @type AND IS_DEFINED(doc.state_name) AND doc.state_name = @state", - Parameters = new SqlParameterCollection - { - new SqlParameter("@state", state), - new SqlParameter("@type", (int)DocumentTypes.Job) - } - }; - - return storage.Client.CreateDocumentQuery(storage.CollectionUri, sql) - .ToQueryResult() - .FirstOrDefault(); - } - - public IDictionary SucceededByDatesCount() => GetDatesTimelineStats("succeeded"); - - public IDictionary FailedByDatesCount() => GetDatesTimelineStats("failed"); - - public IDictionary HourlySucceededJobs() => GetHourlyTimelineStats("succeeded"); - - public IDictionary HourlyFailedJobs() => GetHourlyTimelineStats("failed"); - - private Dictionary GetHourlyTimelineStats(string type) - { - DateTime endDate = DateTime.UtcNow; - List dates = new List(); - for (int i = 0; i < 24; i++) - { - dates.Add(endDate); - endDate = endDate.AddHours(-1); - } - - Dictionary keys = dates.ToDictionary(x => $"stats:{type}:{x:yyyy-MM-dd-HH}", x => x); - return GetTimelineStats(keys); - } - - private Dictionary GetDatesTimelineStats(string type) - { - DateTime endDate = DateTime.UtcNow.Date; - List dates = new List(); - for (int i = 0; i < 7; i++) - { - dates.Add(endDate); - endDate = endDate.AddDays(-1); - } - - Dictionary keys = dates.ToDictionary(x => $"stats:{type}:{x:yyyy-MM-dd}", x => x); - return GetTimelineStats(keys); - } - - private Dictionary GetTimelineStats(Dictionary keys) - { - Dictionary result = keys.ToDictionary(k => k.Value, v => default(long)); - string[] filter = keys.Keys.ToArray(); - - Dictionary data = storage.Client.CreateDocumentQuery(storage.CollectionUri) - .Where(c => c.Type == CounterTypes.Aggregate && c.DocumentType == DocumentTypes.Counter) - .Where(c => filter.Contains(c.Key)) - .Select(c => new { c.Key, c.Value }) - .ToQueryResult() - .GroupBy(c => c.Key) - .ToDictionary(k => k.Key, k => k.Sum(c => c.Value)); - - foreach (string key in keys.Keys) - { - DateTime date = keys.Where(k => k.Key == key).Select(k => k.Value).First(); - result[date] = data.TryGetValue(key, out int value) ? value : 0; - } - - return result; - } - - #endregion - } -} +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +using Hangfire.Azure.Documents; +using Hangfire.Azure.Helper; +using Hangfire.Azure.Queue; +using Hangfire.Common; +using Hangfire.States; +using Hangfire.Storage; +using Hangfire.Storage.Monitoring; + +using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.Client; +using Microsoft.Azure.Documents.SystemFunctions; + +namespace Hangfire.Azure +{ + internal sealed class DocumentDbMonitoringApi : IMonitoringApi + { + private readonly DocumentDbStorage storage; + private readonly object cacheLock = new object(); + private static readonly TimeSpan cacheTimeout = TimeSpan.FromSeconds(2); + private static DateTime cacheUpdated; + private static StatisticsDto cacheStatisticsDto; + + public DocumentDbMonitoringApi(DocumentDbStorage storage) => this.storage = storage; + + public IList Queues() + { + List queueJobs = new List(); + + var tuples = storage.QueueProviders + .Select(x => x.GetJobQueueMonitoringApi()) + .SelectMany(x => x.GetQueues(), (monitoring, queue) => new { Monitoring = monitoring, Queue = queue }) + .OrderBy(x => x.Queue) + .ToArray(); + + foreach (var tuple in tuples) + { + (int? EnqueuedCount, int? FetchedCount) counters = tuple.Monitoring.GetEnqueuedAndFetchedCount(tuple.Queue); + JobList jobs = EnqueuedJobs(tuple.Queue, 0, 5); + + queueJobs.Add(new QueueWithTopEnqueuedJobsDto + { + Length = counters.EnqueuedCount ?? 0, + Fetched = counters.FetchedCount ?? 0, + Name = tuple.Queue, + FirstJobs = jobs + }); + } + + return queueJobs; + } + + public IList Servers() + { + return storage.Client.CreateDocumentQueryAsync(storage.CollectionUri, new FeedOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Server) }) + .Where(s => s.DocumentType == DocumentTypes.Server) + .OrderByDescending(s => s.CreatedOn) + .ToQueryResult() + .Select(server => new ServerDto + { + Name = server.ServerId, + Heartbeat = server.LastHeartbeat, + Queues = server.Queues, + StartedAt = server.CreatedOn, + WorkersCount = server.Workers + }).ToList(); + } + + public JobDetailsDto JobDetails(string jobId) + { + if (string.IsNullOrEmpty(jobId)) throw new ArgumentNullException(nameof(jobId)); + + Uri uri = UriFactory.CreateDocumentUri(storage.Options.DatabaseName, storage.Options.CollectionName, jobId); + Task> task = storage.Client.ReadDocumentWithRetriesAsync(uri, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Job) }); + task.Wait(); + + if (task.Result.Document != null) + { + Documents.Job job = task.Result; + InvocationData invocationData = job.InvocationData; + invocationData.Arguments = job.Arguments; + + List states = storage.Client.CreateDocumentQueryAsync(storage.CollectionUri, new FeedOptions { PartitionKey = new PartitionKey((int)DocumentTypes.State) }) + .Where(s => s.DocumentType == DocumentTypes.State && s.JobId == jobId) + .OrderByDescending(s => s.CreatedOn) + .ToQueryResult() + .Select(s => new StateHistoryDto + { + Data = s.Data, + CreatedAt = s.CreatedOn, + Reason = s.Reason, + StateName = s.Name + }).ToList(); + + return new JobDetailsDto + { + Job = invocationData.DeserializeJob(), + CreatedAt = job.CreatedOn, + ExpireAt = job.ExpireOn, + Properties = job.Parameters.ToDictionary(p => p.Name, p => p.Value), + History = states + }; + } + + return null; + } + + public StatisticsDto GetStatistics() + { + lock (cacheLock) + { + if (cacheStatisticsDto == null || cacheUpdated.Add(cacheTimeout) < DateTime.UtcNow) + { + Dictionary results = new Dictionary(); + + // get counts of jobs on state + string[] keys = { EnqueuedState.StateName, FailedState.StateName, ProcessingState.StateName, ScheduledState.StateName, SucceededState.StateName, AwaitingState.StateName }; + List> states = storage.Client.CreateDocumentQueryAsync(storage.CollectionUri, new FeedOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Job) }) + .Where(x => x.DocumentType == DocumentTypes.Job && x.StateName.IsDefined() && keys.Contains(x.StateName)) + .Select(x => x.StateName) + .ToQueryResult() + .GroupBy(x => x) + .ToList(); + + foreach (IGrouping state in states) + { + results.Add(state.Key.ToLower(), state.LongCount()); + } + + // get counts of servers + SqlQuerySpec sql = new SqlQuerySpec + { + QueryText = "SELECT TOP 1 VALUE COUNT(1) FROM doc WHERE doc.type = @type", + Parameters = new SqlParameterCollection + { + new SqlParameter("@type", (int)DocumentTypes.Server) + } + }; + + long servers = storage.Client.CreateDocumentQueryAsync(storage.CollectionUri, sql, new FeedOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Server) }) + .ToQueryResult() + .FirstOrDefault(); + + results.Add("servers", servers); + + // get sum of stats:succeeded / stats:deleted counters + keys = new[] { "stats:succeeded", "stats:deleted" }; + List> counters = storage.Client.CreateDocumentQueryAsync(storage.CollectionUri, new FeedOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Counter) }) + .Where(x => x.DocumentType == DocumentTypes.Counter && keys.Contains(x.Key)) + .ToQueryResult() + .GroupBy(x => x.Key) + .ToList(); + + foreach (IGrouping counter in counters) + { + long total = counter.Sum(x => x.Value); + results.Add(counter.Key.ToLower(), total); + } + + sql = new SqlQuerySpec + { + QueryText = "SELECT TOP 1 VALUE COUNT(1) FROM doc WHERE doc.type = @type AND doc.key = @key", + Parameters = new SqlParameterCollection + { + new SqlParameter("@key", "recurring-jobs"), + new SqlParameter("@type", (int)DocumentTypes.Set) + } + }; + + long jobs = storage.Client.CreateDocumentQueryAsync(storage.CollectionUri, sql, new FeedOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Set) }) + .ToQueryResult() + .FirstOrDefault(); + + results.Add("recurring-jobs", jobs); + + long getValueOrDefault(string key) => results.TryGetValue(key, out long value) ? value : default; + + // ReSharper disable once UseObjectOrCollectionInitializer + cacheStatisticsDto = new StatisticsDto + { + Enqueued = getValueOrDefault("enqueued"), + Failed = getValueOrDefault("failed"), + Processing = getValueOrDefault("processing"), + Scheduled = getValueOrDefault("scheduled"), + Succeeded = getValueOrDefault("stats:succeeded"), + Deleted = getValueOrDefault("stats:deleted"), + Recurring = getValueOrDefault("recurring-jobs"), + Servers = getValueOrDefault("servers"), + }; + + cacheStatisticsDto.Queues = storage.QueueProviders + .SelectMany(x => x.GetJobQueueMonitoringApi().GetQueues()) + .Count(); + + cacheUpdated = DateTime.UtcNow; + } + + return cacheStatisticsDto; + } + } + + #region Job List + + public JobList EnqueuedJobs(string queue, int from, int perPage) + { + string queryText = $"SELECT * FROM doc WHERE doc.type = @type AND doc.name = @name AND NOT IS_DEFINED(doc.fetched_at) ORDER BY doc.created_on OFFSET {from} LIMIT {perPage}"; + return GetJobsOnQueue(queryText, queue, (state, job, fetchedAt) => new EnqueuedJobDto + { + Job = job, + State = state.Name, + InEnqueuedState = EnqueuedState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase), + EnqueuedAt = EnqueuedState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase) + ? JobHelper.DeserializeNullableDateTime(state.Data["EnqueuedAt"]) + : null + }); + } + + public JobList FetchedJobs(string queue, int from, int perPage) + { + string queryText = $"SELECT * FROM doc WHERE doc.type = @type AND doc.name = @name AND IS_DEFINED(doc.fetched_at) ORDER BY doc.created_on OFFSET {from} LIMIT {perPage}"; + return GetJobsOnQueue(queryText, queue, (state, job, fetchedAt) => new FetchedJobDto + { + Job = job, + State = state.Name, + FetchedAt = fetchedAt + }); + } + + public JobList ProcessingJobs(int from, int count) + { + return GetJobsOnState(ProcessingState.StateName, from, count, (state, job) => new ProcessingJobDto + { + Job = job, + InProcessingState = ProcessingState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase), + ServerId = state.Data.ContainsKey("ServerId") ? state.Data["ServerId"] : state.Data["ServerName"], + StartedAt = JobHelper.DeserializeDateTime(state.Data["StartedAt"]) + }); + } + + public JobList ScheduledJobs(int from, int count) + { + return GetJobsOnState(ScheduledState.StateName, from, count, (state, job) => new ScheduledJobDto + { + Job = job, + InScheduledState = ScheduledState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase), + EnqueueAt = JobHelper.DeserializeDateTime(state.Data["EnqueueAt"]), + ScheduledAt = JobHelper.DeserializeDateTime(state.Data["ScheduledAt"]) + }); + } + + public JobList SucceededJobs(int from, int count) + { + return GetJobsOnState(SucceededState.StateName, from, count, (state, job) => new SucceededJobDto + { + Job = job, + InSucceededState = SucceededState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase), + Result = state.Data.ContainsKey("Result") ? state.Data["Result"] : null, + TotalDuration = state.Data.ContainsKey("PerformanceDuration") && state.Data.ContainsKey("Latency") + ? (long?)long.Parse(state.Data["PerformanceDuration"]) + long.Parse(state.Data["Latency"]) + : null, + SucceededAt = JobHelper.DeserializeNullableDateTime(state.Data["SucceededAt"]) + }); + } + + public JobList FailedJobs(int from, int count) + { + return GetJobsOnState(FailedState.StateName, from, count, (state, job) => new FailedJobDto + { + Job = job, + InFailedState = FailedState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase), + Reason = state.Reason, + FailedAt = JobHelper.DeserializeNullableDateTime(state.Data["FailedAt"]), + ExceptionDetails = state.Data["ExceptionDetails"], + ExceptionMessage = state.Data["ExceptionMessage"], + ExceptionType = state.Data["ExceptionType"] + }); + } + + public JobList DeletedJobs(int from, int count) + { + return GetJobsOnState(DeletedState.StateName, from, count, (state, job) => new DeletedJobDto + { + Job = job, + InDeletedState = DeletedState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase), + DeletedAt = JobHelper.DeserializeNullableDateTime(state.Data["DeletedAt"]) + }); + } + + private JobList GetJobsOnState(string stateName, int from, int count, Func selector) + { + List> jobs = new List>(); + + List filterJobs = storage.Client.CreateDocumentQueryAsync(storage.CollectionUri, new FeedOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Job) }) + .Where(j => j.DocumentType == DocumentTypes.Job && j.StateName == stateName) + .OrderByDescending(j => j.CreatedOn) + .Skip(from).Take(count) + .ToQueryResult() + .ToList(); + + filterJobs.ForEach(job => + { + Uri uri = UriFactory.CreateDocumentUri(storage.Options.DatabaseName, storage.Options.CollectionName, job.StateId); + Task> task = storage.Client.ReadDocumentWithRetriesAsync(uri, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.State) }); + task.Wait(); + + if (task.Result.Document != null) + { + State state = task.Result; + InvocationData invocationData = job.InvocationData; + invocationData.Arguments = job.Arguments; + + T data = selector(state, invocationData.DeserializeJob()); + jobs.Add(new KeyValuePair(job.Id, data)); + } + }); + + return new JobList(jobs); + } + + private JobList GetJobsOnQueue(string queryText, string queue, Func selector) + { + if (string.IsNullOrEmpty(queue)) throw new ArgumentNullException(nameof(queue)); + List> jobs = new List>(); + + SqlQuerySpec sql = new SqlQuerySpec + { + QueryText = queryText, + Parameters = new SqlParameterCollection + { + new SqlParameter("@type", (int)DocumentTypes.Queue), + new SqlParameter("@name", queue) + } + }; + + List queues = storage.Client.CreateDocumentQueryAsync(storage.CollectionUri, sql, new FeedOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Queue) }) + .ToQueryResult() + .ToList(); + + queues.ForEach(queueItem => + { + Uri uri = UriFactory.CreateDocumentUri(storage.Options.DatabaseName, storage.Options.CollectionName, queueItem.JobId); + Task> task = storage.Client.ReadDocumentWithRetriesAsync(uri, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Job) }); + task.Wait(); + + if (task.Result != null) + { + Documents.Job job = task.Result; + InvocationData invocationData = job.InvocationData; + invocationData.Arguments = job.Arguments; + + uri = UriFactory.CreateDocumentUri(storage.Options.DatabaseName, storage.Options.CollectionName, job.StateId); + Task> stateTask = storage.Client.ReadDocumentWithRetriesAsync(uri, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.State) }); + + T data = selector(stateTask.Result, invocationData.DeserializeJob(), queueItem.FetchedAt); + jobs.Add(new KeyValuePair(job.Id, data)); + } + }); + + return new JobList(jobs); + } + + #endregion + + #region Counts + + public long EnqueuedCount(string queue) + { + if (string.IsNullOrEmpty(queue)) throw new ArgumentNullException(nameof(queue)); + + IPersistentJobQueueProvider provider = storage.QueueProviders.GetProvider(queue); + IPersistentJobQueueMonitoringApi monitoringApi = provider.GetJobQueueMonitoringApi(); + (int? EnqueuedCount, int? FetchedCount) counters = monitoringApi.GetEnqueuedAndFetchedCount(queue); + return counters.EnqueuedCount ?? 0; + } + + public long FetchedCount(string queue) + { + if (string.IsNullOrEmpty(queue)) throw new ArgumentNullException(nameof(queue)); + + IPersistentJobQueueProvider provider = storage.QueueProviders.GetProvider(queue); + IPersistentJobQueueMonitoringApi monitoringApi = provider.GetJobQueueMonitoringApi(); + (int? EnqueuedCount, int? FetchedCount) counters = monitoringApi.GetEnqueuedAndFetchedCount(queue); + return counters.FetchedCount ?? 0; + } + + public long ScheduledCount() => GetNumberOfJobsByStateName(ScheduledState.StateName); + + public long FailedCount() => GetNumberOfJobsByStateName(FailedState.StateName); + + public long ProcessingCount() => GetNumberOfJobsByStateName(ProcessingState.StateName); + + public long SucceededListCount() => GetNumberOfJobsByStateName(SucceededState.StateName); + + public long DeletedListCount() => GetNumberOfJobsByStateName(DeletedState.StateName); + + private long GetNumberOfJobsByStateName(string state) + { + SqlQuerySpec sql = new SqlQuerySpec + { + QueryText = "SELECT TOP 1 VALUE COUNT(1) FROM doc WHERE doc.type = @type AND IS_DEFINED(doc.state_name) AND doc.state_name = @state", + Parameters = new SqlParameterCollection + { + new SqlParameter("@state", state), + new SqlParameter("@type", (int)DocumentTypes.Job) + } + }; + + return storage.Client.CreateDocumentQueryAsync(storage.CollectionUri, sql, new FeedOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Job) }) + .ToQueryResult() + .FirstOrDefault(); + } + + public IDictionary SucceededByDatesCount() => GetDatesTimelineStats("succeeded"); + + public IDictionary FailedByDatesCount() => GetDatesTimelineStats("failed"); + + public IDictionary HourlySucceededJobs() => GetHourlyTimelineStats("succeeded"); + + public IDictionary HourlyFailedJobs() => GetHourlyTimelineStats("failed"); + + private Dictionary GetHourlyTimelineStats(string type) + { + DateTime endDate = DateTime.UtcNow; + List dates = new List(); + for (int i = 0; i < 24; i++) + { + dates.Add(endDate); + endDate = endDate.AddHours(-1); + } + + Dictionary keys = dates.ToDictionary(x => $"stats:{type}:{x:yyyy-MM-dd-HH}", x => x); + return GetTimelineStats(keys); + } + + private Dictionary GetDatesTimelineStats(string type) + { + DateTime endDate = DateTime.UtcNow.Date; + List dates = new List(); + for (int i = 0; i < 7; i++) + { + dates.Add(endDate); + endDate = endDate.AddDays(-1); + } + + Dictionary keys = dates.ToDictionary(x => $"stats:{type}:{x:yyyy-MM-dd}", x => x); + return GetTimelineStats(keys); + } + + private Dictionary GetTimelineStats(Dictionary keys) + { + Dictionary result = keys.ToDictionary(k => k.Value, v => default(long)); + string[] filter = keys.Keys.ToArray(); + + Dictionary data = storage.Client.CreateDocumentQueryAsync(storage.CollectionUri, new FeedOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Counter) }) + .Where(c => c.DocumentType == DocumentTypes.Counter) + .Where(c => filter.Contains(c.Key)) + .Select(c => new { c.Key, c.Value }) + .ToQueryResult() + .GroupBy(c => c.Key) + .ToDictionary(k => k.Key, k => k.Sum(c => c.Value)); + + foreach (string key in keys.Keys) + { + DateTime date = keys.Where(k => k.Key == key).Select(k => k.Value).First(); + result[date] = data.TryGetValue(key, out int value) ? value : 0; + } + + return result; + } + + #endregion + } +} diff --git a/Hangfire.AzureDocumentDB/DocumentDbStorage.cs b/src/DocumentDbStorage.cs similarity index 82% rename from Hangfire.AzureDocumentDB/DocumentDbStorage.cs rename to src/DocumentDbStorage.cs index 54a352d..c51faa1 100644 --- a/Hangfire.AzureDocumentDB/DocumentDbStorage.cs +++ b/src/DocumentDbStorage.cs @@ -1,20 +1,21 @@ using System; +using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading.Tasks; -using System.Collections.Generic; +using Hangfire.Azure.Documents.Json; +using Hangfire.Azure.Helper; +using Hangfire.Azure.Queue; +using Hangfire.Logging; using Hangfire.Server; using Hangfire.Storage; -using Hangfire.Logging; -using Newtonsoft.Json; + using Microsoft.Azure.Documents; using Microsoft.Azure.Documents.Client; -using Hangfire.Azure.Queue; -using Hangfire.Azure.Helper; -using Hangfire.Azure.Documents.Json; +using Newtonsoft.Json; namespace Hangfire.Azure @@ -47,6 +48,9 @@ public DocumentDbStorage(string url, string authSecret, string database, string Options.DatabaseName = database; Options.CollectionName = collection; + // set the partitioning flag on the client helper + ClientHelper.enablePartition = Options.EnablePartition; + JsonSerializerSettings settings = new JsonSerializerSettings { NullValueHandling = NullValueHandling.Ignore, @@ -111,13 +115,14 @@ public override void WriteOptionsToLog(ILog logger) logger.Info($" Counter Aggregate Interval: {Options.CountersAggregateInterval.TotalSeconds} seconds"); logger.Info($" Queue Poll Interval: {Options.QueuePollInterval.TotalSeconds} seconds"); logger.Info($" Expiration Check Interval: {Options.ExpirationCheckInterval.TotalSeconds} seconds"); + logger.Info($" Partition Enabled: {Options.EnablePartition}"); } /// /// Return the name of the database /// /// - public override string ToString() => $"DoucmentDb Database : {Options.DatabaseName}"; + public override string ToString() => $"DoucmentDb : {Options.DatabaseName}"; private void Initialize() { @@ -132,7 +137,15 @@ private void Initialize() { logger.Info($"Creating document collection : {t.Result.Resource.Id}"); Uri databaseUri = UriFactory.CreateDatabaseUri(t.Result.Resource.Id); - return Client.CreateDocumentCollectionIfNotExistsAsync(databaseUri, new DocumentCollection { Id = Options.CollectionName }); + DocumentCollection documentCollection = new DocumentCollection { Id = Options.CollectionName }; + + // if the partition option is enable + if (Options.EnablePartition) + { + documentCollection.PartitionKey = new PartitionKeyDefinition { Paths = new System.Collections.ObjectModel.Collection { "/type" } }; + } + + return Client.CreateDocumentCollectionIfNotExistsAsync(databaseUri, documentCollection); }, TaskContinuationOptions.OnlyOnRanToCompletion).Unwrap(); // create stored procedures @@ -155,7 +168,18 @@ private void Initialize() .Split(new[] { '.' }, StringSplitOptions.RemoveEmptyEntries) .Last() }; - Client.UpsertStoredProcedureAsync(CollectionUri, sp).Wait(); + + Uri storedProcedureUri = UriFactory.CreateStoredProcedureUri(Options.DatabaseName, t.Result.Resource.Id, sp.Id); + Task> spTask = Client.ReplaceStoredProcedureAsync(storedProcedureUri, sp); + spTask.ContinueWith(x => + { + if (x.Status == TaskStatus.Faulted && x.Exception.InnerException is DocumentClientException ex && ex.StatusCode == System.Net.HttpStatusCode.NotFound) + { + return Client.CreateStoredProcedureAsync(CollectionUri, sp); + } + + return Task.FromResult(x.Result); + }).Unwrap().Wait(); } stream?.Close(); } diff --git a/Hangfire.AzureDocumentDB/DocumentDbStorageExtensions.cs b/src/DocumentDbStorageExtensions.cs similarity index 97% rename from Hangfire.AzureDocumentDB/DocumentDbStorageExtensions.cs rename to src/DocumentDbStorageExtensions.cs index 283d439..1dd6fa6 100644 --- a/Hangfire.AzureDocumentDB/DocumentDbStorageExtensions.cs +++ b/src/DocumentDbStorageExtensions.cs @@ -1,4 +1,5 @@ using System; + using Hangfire.Azure; // ReSharper disable UnusedMember.Global @@ -10,7 +11,7 @@ namespace Hangfire /// public static class DocumentDbStorageExtensions { - /// + /// /// Enables to attach Azure DocumentDb to Hangfire /// /// The IGlobalConfiguration object @@ -25,7 +26,7 @@ public static IGlobalConfiguration UseAzureDocumentDbStorage( if (configuration == null) throw new ArgumentNullException(nameof(configuration)); if (string.IsNullOrEmpty(url)) throw new ArgumentNullException(nameof(url)); if (string.IsNullOrEmpty(authSecret)) throw new ArgumentNullException(nameof(authSecret)); - + DocumentDbStorage storage = new DocumentDbStorage(url, authSecret, database, collection, options); return configuration.UseStorage(storage); } diff --git a/Hangfire.AzureDocumentDB/DocumentDbStorageOptions.cs b/src/DocumentDbStorageOptions.cs similarity index 90% rename from Hangfire.AzureDocumentDB/DocumentDbStorageOptions.cs rename to src/DocumentDbStorageOptions.cs index 7b87f78..8832f93 100644 --- a/Hangfire.AzureDocumentDB/DocumentDbStorageOptions.cs +++ b/src/DocumentDbStorageOptions.cs @@ -1,4 +1,5 @@ using System; + using Microsoft.Azure.Documents.Client; // ReSharper disable MemberCanBePrivate.Global @@ -44,6 +45,11 @@ public class DocumentDbStorageOptions /// public Protocol ConnectionProtocol { get; set; } + /// + /// Gets or sets the partitioning of the document based on /type. Default value is false + /// + public bool EnablePartition { get; set; } + /// /// Create an instance of AzureDocumentDB Storage option with default values /// @@ -55,6 +61,7 @@ public DocumentDbStorageOptions() QueuePollInterval = TimeSpan.FromSeconds(15); ConnectionMode = ConnectionMode.Direct; ConnectionProtocol = Protocol.Tcp; + EnablePartition = false; } } } diff --git a/Hangfire.AzureDocumentDB/DocumentDbWriteOnlyTransaction.cs b/src/DocumentDbWriteOnlyTransaction.cs similarity index 82% rename from Hangfire.AzureDocumentDB/DocumentDbWriteOnlyTransaction.cs rename to src/DocumentDbWriteOnlyTransaction.cs index db31789..0f4d464 100644 --- a/Hangfire.AzureDocumentDB/DocumentDbWriteOnlyTransaction.cs +++ b/src/DocumentDbWriteOnlyTransaction.cs @@ -1,18 +1,18 @@ using System; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; -using System.Collections.Generic; +using Hangfire.Azure.Documents; +using Hangfire.Azure.Documents.Helper; +using Hangfire.Azure.Helper; +using Hangfire.Azure.Queue; using Hangfire.States; using Hangfire.Storage; + using Microsoft.Azure.Documents; using Microsoft.Azure.Documents.Client; -using Hangfire.Azure.Queue; -using Hangfire.Azure.Helper; -using Hangfire.Azure.Documents; -using Hangfire.Azure.Documents.Helper; - namespace Hangfire.Azure { internal class DocumentDbWriteOnlyTransaction : JobStorageTransaction @@ -52,10 +52,10 @@ public override void DecrementCounter(string key) { Key = key, Type = CounterTypes.Raw, - Value = -1 + Value = -1, }; - Task> task = connection.Storage.Client.CreateDocumentWithRetriesAsync(connection.Storage.CollectionUri, data); + Task> task = connection.Storage.Client.CreateDocumentWithRetriesAsync(connection.Storage.CollectionUri, data, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Counter) }); task.Wait(); }); } @@ -75,7 +75,7 @@ public override void DecrementCounter(string key, TimeSpan expireIn) ExpireOn = DateTime.UtcNow.Add(expireIn) }; - Task> task = connection.Storage.Client.CreateDocumentWithRetriesAsync(connection.Storage.CollectionUri, data); + Task> task = connection.Storage.Client.CreateDocumentWithRetriesAsync(connection.Storage.CollectionUri, data, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Counter) }); task.Wait(); }); } @@ -93,7 +93,7 @@ public override void IncrementCounter(string key) Value = 1 }; - Task> task = connection.Storage.Client.CreateDocumentWithRetriesAsync(connection.Storage.CollectionUri, data); + Task> task = connection.Storage.Client.CreateDocumentWithRetriesAsync(connection.Storage.CollectionUri, data, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Counter) }); task.Wait(); }); } @@ -113,7 +113,7 @@ public override void IncrementCounter(string key, TimeSpan expireIn) ExpireOn = DateTime.UtcNow.Add(expireIn) }; - Task> task = connection.Storage.Client.CreateDocumentWithRetriesAsync(connection.Storage.CollectionUri, data); + Task> task = connection.Storage.Client.CreateDocumentWithRetriesAsync(connection.Storage.CollectionUri, data, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Counter) }); task.Wait(); }); } @@ -131,7 +131,7 @@ public override void ExpireJob(string jobId, TimeSpan expireIn) { int epoch = DateTime.UtcNow.Add(expireIn).ToEpoch(); string query = $"SELECT * FROM doc WHERE doc.type = {(int)DocumentTypes.Job} AND doc.id = '{jobId}'"; - connection.Storage.Client.ExecuteExpireDocuments(query, epoch); + connection.Storage.Client.ExecuteExpireDocuments(query, epoch, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Job) }); }); } @@ -142,7 +142,7 @@ public override void PersistJob(string jobId) QueueCommand(() => { string query = $"SELECT * FROM doc WHERE doc.type = {(int)DocumentTypes.Job} AND doc.id = '{jobId}'"; - connection.Storage.Client.ExecutePersistDocuments(query); + connection.Storage.Client.ExecutePersistDocuments(query, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Job) }); }); } @@ -166,9 +166,13 @@ public override void SetJobState(string jobId, IState state) Data = state.SerializeData() }; - Uri spSetJobStateUri = UriFactory.CreateStoredProcedureUri(connection.Storage.Options.DatabaseName, connection.Storage.Options.CollectionName, "setJobState"); - Task> task = connection.Storage.Client.ExecuteStoredProcedureWithRetriesAsync(spSetJobStateUri, jobId, data); - task.Wait(); + Task> task = connection.Storage.Client.CreateDocumentWithRetriesAsync(connection.Storage.CollectionUri, data, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.State) }); + Task> sptask = task.ContinueWith(x => + { + Uri spSetJobStateUri = UriFactory.CreateStoredProcedureUri(connection.Storage.Options.DatabaseName, connection.Storage.Options.CollectionName, "setJobState"); + return connection.Storage.Client.ExecuteStoredProcedureWithRetriesAsync(spSetJobStateUri, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Job) }, default, jobId, data); + }).Unwrap(); + sptask.Wait(); }); } @@ -188,7 +192,7 @@ public override void AddJobState(string jobId, IState state) Data = state.SerializeData() }; - Task> task = connection.Storage.Client.CreateDocumentWithRetriesAsync(connection.Storage.CollectionUri, data); + Task> task = connection.Storage.Client.CreateDocumentWithRetriesAsync(connection.Storage.CollectionUri, data, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.State) }); task.Wait(); }); } @@ -204,7 +208,8 @@ public override void RemoveFromSet(string key, string value) QueueCommand(() => { - string[] sets = connection.Storage.Client.CreateDocumentQuery(connection.Storage.CollectionUri) + PartitionKey partitionKey = new PartitionKey((int)DocumentTypes.Set); + string[] sets = connection.Storage.Client.CreateDocumentQueryAsync(connection.Storage.CollectionUri, new FeedOptions { PartitionKey = partitionKey }) .Where(s => s.DocumentType == DocumentTypes.Set && s.Key == key) .Select(s => new { s.Id, s.Value }) .ToQueryResult() @@ -216,7 +221,7 @@ public override void RemoveFromSet(string key, string value) string ids = string.Join(",", sets.Select(s => $"'{s}'")); string query = $"SELECT doc._self FROM doc WHERE doc.type = {(int)DocumentTypes.Set} AND doc.id IN ({ids})"; - connection.Storage.Client.ExecuteDeleteDocuments(query); + connection.Storage.Client.ExecuteDeleteDocuments(query, new RequestOptions { PartitionKey = partitionKey }); }); } @@ -229,7 +234,8 @@ public override void AddToSet(string key, string value, double score) QueueCommand(() => { - List sets = connection.Storage.Client.CreateDocumentQuery(connection.Storage.CollectionUri) + PartitionKey partitionKey = new PartitionKey((int)DocumentTypes.Set); + List sets = connection.Storage.Client.CreateDocumentQueryAsync(connection.Storage.CollectionUri, new FeedOptions { PartitionKey = partitionKey }) .Where(s => s.DocumentType == DocumentTypes.Set && s.Key == key) .ToQueryResult() .Where(s => s.Value == value) // value may contain json string.. which interfere with query @@ -252,7 +258,7 @@ public override void AddToSet(string key, string value, double score) } Data data = new Data(sets); - connection.Storage.Client.ExecuteUpsertDocuments(data); + connection.Storage.Client.ExecuteUpsertDocuments(data, new RequestOptions { PartitionKey = partitionKey }); }); } @@ -263,7 +269,7 @@ public override void PersistSet(string key) QueueCommand(() => { string query = $"SELECT * FROM doc WHERE doc.type = {(int)DocumentTypes.Set} AND doc.key = '{key}'"; - connection.Storage.Client.ExecutePersistDocuments(query); + connection.Storage.Client.ExecutePersistDocuments(query, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Set) }); }); } @@ -275,7 +281,7 @@ public override void ExpireSet(string key, TimeSpan expireIn) { string query = $"SELECT * FROM doc WHERE doc.type = {(int)DocumentTypes.Set} AND doc.key = '{key}'"; int epoch = DateTime.UtcNow.Add(expireIn).ToEpoch(); - connection.Storage.Client.ExecuteExpireDocuments(query, epoch); + connection.Storage.Client.ExecuteExpireDocuments(query, epoch, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Set) }); }); } @@ -295,7 +301,7 @@ public override void AddRangeToSet(string key, IList items) }).ToList(); Data data = new Data(sets); - connection.Storage.Client.ExecuteUpsertDocuments(data); + connection.Storage.Client.ExecuteUpsertDocuments(data, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Set) }); }); } @@ -306,7 +312,7 @@ public override void RemoveSet(string key) QueueCommand(() => { string query = $"SELECT doc._self FROM doc WHERE doc.type = {(int)DocumentTypes.Set} AND doc.key = '{key}'"; - connection.Storage.Client.ExecuteDeleteDocuments(query); + connection.Storage.Client.ExecuteDeleteDocuments(query, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Set) }); }); } @@ -321,7 +327,7 @@ public override void RemoveHash(string key) QueueCommand(() => { string query = $"SELECT doc._self FROM doc WHERE doc.type = {(int)DocumentTypes.Hash} AND doc.key = '{key}'"; - connection.Storage.Client.ExecuteDeleteDocuments(query); + connection.Storage.Client.ExecuteDeleteDocuments(query, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Hash) }); }); } @@ -334,9 +340,11 @@ public override void SetRangeInHash(string key, IEnumerable data = new Data(); - List hashes = connection.Storage.Client.CreateDocumentQuery(connection.Storage.CollectionUri) + PartitionKey partitionKey = new PartitionKey((int)DocumentTypes.Hash); + List hashes = connection.Storage.Client.CreateDocumentQueryAsync(connection.Storage.CollectionUri, new FeedOptions { PartitionKey = partitionKey }) .Where(h => h.DocumentType == DocumentTypes.Hash && h.Key == key) - .ToQueryResult(); + .ToQueryResult() + .ToList(); Hash[] sources = keyValuePairs.Select(k => new Hash { @@ -359,7 +367,8 @@ public override void SetRangeInHash(string key, IEnumerable { string query = $"SELECT * FROM doc WHERE doc.type = {(int)DocumentTypes.Hash} AND doc.key = '{key}'"; - connection.Storage.Client.ExecutePersistDocuments(query); + connection.Storage.Client.ExecutePersistDocuments(query, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.Hash) }); }); } @@ -404,7 +413,7 @@ public override void InsertToList(string key, string value) CreatedOn = DateTime.UtcNow }; - Task> task = connection.Storage.Client.CreateDocumentWithRetriesAsync(connection.Storage.CollectionUri, data); + Task> task = connection.Storage.Client.CreateDocumentWithRetriesAsync(connection.Storage.CollectionUri, data, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.List) }); task.Wait(); }); } @@ -416,7 +425,7 @@ public override void RemoveFromList(string key, string value) QueueCommand(() => { - string[] lists = connection.Storage.Client.CreateDocumentQuery(connection.Storage.CollectionUri) + string[] lists = connection.Storage.Client.CreateDocumentQueryAsync(connection.Storage.CollectionUri) .Where(l => l.DocumentType == DocumentTypes.List && l.Key == key) .Select(l => new { l.Id, l.Value }) .ToQueryResult() @@ -428,7 +437,7 @@ public override void RemoveFromList(string key, string value) string ids = string.Join(",", lists.Select(l => $"'{l}'")); string query = $"SELECT doc._self FROM doc WHERE doc.type = {(int)DocumentTypes.List} AND doc.id IN ({ids})"; - connection.Storage.Client.ExecuteDeleteDocuments(query); + connection.Storage.Client.ExecuteDeleteDocuments(query, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.List) }); }); } @@ -438,7 +447,8 @@ public override void TrimList(string key, int keepStartingFrom, int keepEndingAt QueueCommand(() => { - string[] lists = connection.Storage.Client.CreateDocumentQuery(connection.Storage.CollectionUri) + PartitionKey partitionKey = new PartitionKey((int)DocumentTypes.List); + string[] lists = connection.Storage.Client.CreateDocumentQueryAsync(connection.Storage.CollectionUri, new FeedOptions { PartitionKey = partitionKey }) .Where(l => l.DocumentType == DocumentTypes.List && l.Key == key) .OrderByDescending(l => l.CreatedOn) .Select(l => l.Id) @@ -452,7 +462,7 @@ public override void TrimList(string key, int keepStartingFrom, int keepEndingAt string ids = string.Join(",", lists.Select(l => $"'{l}'")); string query = $"SELECT doc._self FROM doc WHERE doc.type = {(int)DocumentTypes.List} AND doc.id IN ({ids})"; - connection.Storage.Client.ExecuteDeleteDocuments(query); + connection.Storage.Client.ExecuteDeleteDocuments(query, new RequestOptions { PartitionKey = partitionKey }); }); } @@ -464,7 +474,7 @@ public override void ExpireList(string key, TimeSpan expireIn) { string query = $"SELECT * FROM doc WHERE doc.type = {(int)DocumentTypes.List} AND doc.key = '{key}'"; int epoch = DateTime.UtcNow.Add(expireIn).ToEpoch(); - connection.Storage.Client.ExecuteExpireDocuments(query, epoch); + connection.Storage.Client.ExecuteExpireDocuments(query, epoch, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.List) }); }); } @@ -475,7 +485,7 @@ public override void PersistList(string key) QueueCommand(() => { string query = $"SELECT * FROM doc WHERE doc.type = {(int)DocumentTypes.List} AND doc.key = '{key}'"; - connection.Storage.Client.ExecutePersistDocuments(query); + connection.Storage.Client.ExecutePersistDocuments(query, new RequestOptions { PartitionKey = new PartitionKey((int)DocumentTypes.List) }); }); } diff --git a/Hangfire.AzureDocumentDB/Entities/Counter.cs b/src/Entities/Counter.cs similarity index 100% rename from Hangfire.AzureDocumentDB/Entities/Counter.cs rename to src/Entities/Counter.cs diff --git a/Hangfire.AzureDocumentDB/Entities/DocumentBase.cs b/src/Entities/DocumentBase.cs similarity index 90% rename from Hangfire.AzureDocumentDB/Entities/DocumentBase.cs rename to src/Entities/DocumentBase.cs index 2f9a6fd..9102aae 100644 --- a/Hangfire.AzureDocumentDB/Entities/DocumentBase.cs +++ b/src/Entities/DocumentBase.cs @@ -1,9 +1,10 @@ using System; -using Newtonsoft.Json; using System.Collections.Generic; using Microsoft.Azure.Documents; +using Newtonsoft.Json; + // ReSharper disable once CheckNamespace namespace Hangfire.Azure.Documents { @@ -19,6 +20,9 @@ internal abstract class DocumentBase [JsonConverter(typeof(UnixDateTimeConverter))] public DateTime? ExpireOn { get; set; } + [JsonProperty(PropertyName = "ttl", NullValueHandling = NullValueHandling.Ignore)] + public int? TimeToLive { get; set; } + [JsonProperty("type")] public abstract DocumentTypes DocumentType { get; } } diff --git a/Hangfire.AzureDocumentDB/Entities/Hash.cs b/src/Entities/Hash.cs similarity index 100% rename from Hangfire.AzureDocumentDB/Entities/Hash.cs rename to src/Entities/Hash.cs diff --git a/Hangfire.AzureDocumentDB/Entities/Job.cs b/src/Entities/Job.cs similarity index 99% rename from Hangfire.AzureDocumentDB/Entities/Job.cs rename to src/Entities/Job.cs index 3014cc0..e9313d6 100644 --- a/Hangfire.AzureDocumentDB/Entities/Job.cs +++ b/src/Entities/Job.cs @@ -1,9 +1,11 @@ using System; -using Newtonsoft.Json; using Hangfire.Storage; + using Microsoft.Azure.Documents; +using Newtonsoft.Json; + // ReSharper disable once CheckNamespace namespace Hangfire.Azure.Documents { diff --git a/Hangfire.AzureDocumentDB/Entities/List.cs b/src/Entities/List.cs similarity index 99% rename from Hangfire.AzureDocumentDB/Entities/List.cs rename to src/Entities/List.cs index 74f258a..c422188 100644 --- a/Hangfire.AzureDocumentDB/Entities/List.cs +++ b/src/Entities/List.cs @@ -1,7 +1,9 @@ using System; -using Newtonsoft.Json; + using Microsoft.Azure.Documents; +using Newtonsoft.Json; + // ReSharper disable once CheckNamespace namespace Hangfire.Azure.Documents { diff --git a/Hangfire.AzureDocumentDB/Entities/Lock.cs b/src/Entities/Lock.cs similarity index 69% rename from Hangfire.AzureDocumentDB/Entities/Lock.cs rename to src/Entities/Lock.cs index 14e4405..a8ceee5 100644 --- a/Hangfire.AzureDocumentDB/Entities/Lock.cs +++ b/src/Entities/Lock.cs @@ -8,9 +8,6 @@ internal class Lock : DocumentBase [JsonProperty("name")] public string Name { get; set; } - [JsonProperty(PropertyName = "ttl", NullValueHandling = NullValueHandling.Ignore)] - public int? TimeToLive { get; set; } - public override DocumentTypes DocumentType => DocumentTypes.Lock; } } diff --git a/Hangfire.AzureDocumentDB/Entities/Parameter.cs b/src/Entities/Parameter.cs similarity index 100% rename from Hangfire.AzureDocumentDB/Entities/Parameter.cs rename to src/Entities/Parameter.cs diff --git a/Hangfire.AzureDocumentDB/Entities/Queue.cs b/src/Entities/Queue.cs similarity index 99% rename from Hangfire.AzureDocumentDB/Entities/Queue.cs rename to src/Entities/Queue.cs index e5719d2..32a5406 100644 --- a/Hangfire.AzureDocumentDB/Entities/Queue.cs +++ b/src/Entities/Queue.cs @@ -1,7 +1,9 @@ using System; -using Newtonsoft.Json; + using Microsoft.Azure.Documents; +using Newtonsoft.Json; + // ReSharper disable once CheckNamespace namespace Hangfire.Azure.Documents { diff --git a/Hangfire.AzureDocumentDB/Entities/Server.cs b/src/Entities/Server.cs similarity index 99% rename from Hangfire.AzureDocumentDB/Entities/Server.cs rename to src/Entities/Server.cs index 8f14819..bebc646 100644 --- a/Hangfire.AzureDocumentDB/Entities/Server.cs +++ b/src/Entities/Server.cs @@ -1,8 +1,9 @@ using System; -using Newtonsoft.Json; using Microsoft.Azure.Documents; +using Newtonsoft.Json; + // ReSharper disable once CheckNamespace namespace Hangfire.Azure.Documents { diff --git a/Hangfire.AzureDocumentDB/Entities/Set.cs b/src/Entities/Set.cs similarity index 97% rename from Hangfire.AzureDocumentDB/Entities/Set.cs rename to src/Entities/Set.cs index f1f42bc..bf3c1c9 100644 --- a/Hangfire.AzureDocumentDB/Entities/Set.cs +++ b/src/Entities/Set.cs @@ -1,7 +1,9 @@ using System; -using Newtonsoft.Json; + using Microsoft.Azure.Documents; +using Newtonsoft.Json; + // ReSharper disable once CheckNamespace namespace Hangfire.Azure.Documents { @@ -15,11 +17,11 @@ internal class Set : DocumentBase [JsonProperty("score")] public double? Score { get; set; } - + [JsonProperty("created_on")] [JsonConverter(typeof(UnixDateTimeConverter))] public DateTime CreatedOn { get; set; } - + public override DocumentTypes DocumentType => DocumentTypes.Set; } } \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB/Entities/State.cs b/src/Entities/State.cs similarity index 99% rename from Hangfire.AzureDocumentDB/Entities/State.cs rename to src/Entities/State.cs index a06bfe2..e012f4e 100644 --- a/Hangfire.AzureDocumentDB/Entities/State.cs +++ b/src/Entities/State.cs @@ -1,9 +1,10 @@ using System; using System.Collections.Generic; -using Newtonsoft.Json; using Microsoft.Azure.Documents; +using Newtonsoft.Json; + // ReSharper disable once CheckNamespace namespace Hangfire.Azure.Documents { diff --git a/Hangfire.AzureDocumentDB/ExpirationManager.cs b/src/ExpirationManager.cs similarity index 93% rename from Hangfire.AzureDocumentDB/ExpirationManager.cs rename to src/ExpirationManager.cs index e8d1ce7..f2734e6 100644 --- a/Hangfire.AzureDocumentDB/ExpirationManager.cs +++ b/src/ExpirationManager.cs @@ -1,12 +1,14 @@ using System; using System.Threading; -using Hangfire.Server; -using Hangfire.Logging; - -using Hangfire.Azure.Helper; using Hangfire.Azure.Documents; using Hangfire.Azure.Documents.Helper; +using Hangfire.Azure.Helper; +using Hangfire.Logging; +using Hangfire.Server; + +using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.Client; namespace Hangfire.Azure { @@ -46,8 +48,8 @@ public void Execute(CancellationToken cancellationToken) query += $" AND doc.counter_type = {(int)CounterTypes.Aggregate}"; } - int deleted = storage.Client.ExecuteDeleteDocuments(query); - + int deleted = storage.Client.ExecuteDeleteDocuments(query, new RequestOptions { PartitionKey = new PartitionKey((int)type) }); + logger.Trace($"Outdated {deleted} records removed from the '{type}' document."); } } diff --git a/Hangfire.AzureDocumentDB/Hangfire.AzureDocumentDB.csproj b/src/Hangfire.AzureDocumentDB.csproj similarity index 95% rename from Hangfire.AzureDocumentDB/Hangfire.AzureDocumentDB.csproj rename to src/Hangfire.AzureDocumentDB.csproj index 0325608..12ac280 100644 --- a/Hangfire.AzureDocumentDB/Hangfire.AzureDocumentDB.csproj +++ b/src/Hangfire.AzureDocumentDB.csproj @@ -1,78 +1,78 @@ - - - net461;netstandard2.0 - Hangfire.AzureDocumentDB - Hangfire.AzureDocumentDB - Hangfire.AzureDocumentDB - Imran Momin - https://github.com/imranmomin/hangfire.azuredocumentdb/blob/master/LICENSE - https://github.com/imranmomin/hangfire.azuredocumentdb - Copyright 2018 - Hangfire Azure DocumentDB Cosmos - This package adds support to Microsoft Azure DocumentDB for Hangfire - false - 1.0.1 - 2.0.0 - 2.0.0.0 - true - Hangfire.Azure - Latest - true - - - NETCORE;NETSTANDARD;NETSTANDARD2_0 - - - NET461;NETFULL - - - bin\Release\net461\Hangfire.AzureDocumentDB.xml - - - bin\Release\netstandard2.0\Hangfire.AzureDocumentDB.xml - - - - - - - - heartbeatServer.ts - - - setJobParameter.ts - - - setJobState.ts - - - persistDocuments.ts - - - expireDocuments.ts - - - deleteDocuments.ts - - - upsertDocuments.ts - - - document.ts - - - - - - - - - - - - - - - - + + + net461;netstandard2.0 + Hangfire.AzureDocumentDB + Hangfire.AzureDocumentDB + Hangfire.AzureDocumentDB + Imran Momin + https://github.com/imranmomin/hangfire.azuredocumentdb/blob/master/LICENSE + https://github.com/imranmomin/hangfire.azuredocumentdb + Copyright 2018 + Hangfire Azure DocumentDB Cosmos + This package adds support to Microsoft Azure DocumentDB for Hangfire + false + 1.0.1 + 2.0.0 + 2.0.0.0 + true + Hangfire.Azure + Latest + true + + + NETCORE;NETSTANDARD;NETSTANDARD2_0 + + + NET461;NETFULL + + + bin\Release\net461\Hangfire.AzureDocumentDB.xml + + + bin\Release\netstandard2.0\Hangfire.AzureDocumentDB.xml + + + + + + + + heartbeatServer.ts + + + setJobParameter.ts + + + setJobState.ts + + + persistDocuments.ts + + + expireDocuments.ts + + + deleteDocuments.ts + + + upsertDocuments.ts + + + document.ts + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB/Helper/ClientHelper.cs b/src/Helper/ClientHelper.cs similarity index 63% rename from Hangfire.AzureDocumentDB/Helper/ClientHelper.cs rename to src/Helper/ClientHelper.cs index 20cf778..a278e0f 100644 --- a/Hangfire.AzureDocumentDB/Helper/ClientHelper.cs +++ b/src/Helper/ClientHelper.cs @@ -1,15 +1,54 @@ using System; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Documents; -using Microsoft.Azure.Documents.Linq; using Microsoft.Azure.Documents.Client; namespace Hangfire.Azure.Helper { internal static class ClientHelper { + public static bool enablePartition; + + /// + /// Extension method to create a query for documents in the Azure Cosmos DB service + /// + /// the type of object to query + /// + /// The URI of the document collection. + /// The options for processing the query results feed + /// + public static IOrderedQueryable CreateDocumentQueryAsync(this DocumentClient client, Uri documentCollectionUri, FeedOptions feedOptions = null) + { + if (enablePartition == false && feedOptions != null) + { + feedOptions.PartitionKey = null; + } + + return client.CreateDocumentQuery(documentCollectionUri, feedOptions); + } + + /// + /// Extension method to create a query for documents in the Azure Cosmos DB service + /// + /// the type of object to query + /// + /// The URI of the document collection. + /// The options for processing the query results feed + /// + public static IQueryable CreateDocumentQueryAsync(this DocumentClient client, Uri documentCollectionOrDatabaseUri, SqlQuerySpec querySpec, FeedOptions feedOptions = null) + { + if (enablePartition == false && feedOptions != null) + { + feedOptions.PartitionKey = null; + } + + return client.CreateDocumentQuery(documentCollectionOrDatabaseUri, querySpec, feedOptions); + } + + /// /// Creates a document as an asynchronous operation in the Azure Cosmos DB service. /// @@ -20,9 +59,14 @@ internal static class ClientHelper /// Disables the automatic id generation, will throw an exception if id is missing. /// (Optional) representing request cancellation. /// - internal static Task> CreateDocumentWithRetriesAsync(this DocumentClient client, Uri documentCollectionUri, object document, RequestOptions options = null, bool disableAutomaticIdGeneration = false, CancellationToken cancellationToken = default(CancellationToken)) + internal static Task> CreateDocumentWithRetriesAsync(this DocumentClient client, Uri documentCollectionUri, object document, RequestOptions options = null, bool disableAutomaticIdGeneration = true, CancellationToken cancellationToken = default) { - return Task.Run(async () => await client.ExecuteWithRetries(() => client.CreateDocumentAsync(documentCollectionUri, document, options, disableAutomaticIdGeneration, cancellationToken)), cancellationToken); + if (enablePartition == false && options != null) + { + options.PartitionKey = null; + } + + return Task.Run(async () => await client.ExecuteWithRetries(x => x.CreateDocumentAsync(documentCollectionUri, document, options, disableAutomaticIdGeneration, cancellationToken)), cancellationToken); } /// @@ -34,9 +78,14 @@ internal static class ClientHelper /// The request options for the request. /// (Optional) representing request cancellation. /// - internal static Task> ReadDocumentWithRetriesAsync(this DocumentClient client, Uri documentUri, RequestOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) + internal static Task> ReadDocumentWithRetriesAsync(this DocumentClient client, Uri documentUri, RequestOptions options = null, CancellationToken cancellationToken = default) { - return Task.Run(async () => await client.ExecuteWithRetries(() => client.ReadDocumentAsync(documentUri, options, cancellationToken)), cancellationToken); + if (enablePartition == false && options != null) + { + options.PartitionKey = null; + } + + return Task.Run(async () => await client.ExecuteWithRetries(x => x.ReadDocumentAsync(documentUri, options, cancellationToken)), cancellationToken); } /// @@ -48,9 +97,14 @@ internal static class ClientHelper /// The request options for the request. /// Disables the automatic id generation, will throw an exception if id is missing. /// (Optional) representing request cancellation. - internal static Task> UpsertDocumentWithRetriesAsync(this DocumentClient client, Uri documentCollectionUri, object document, RequestOptions options = null, bool disableAutomaticIdGeneration = false, CancellationToken cancellationToken = default(CancellationToken)) + internal static Task> UpsertDocumentWithRetriesAsync(this DocumentClient client, Uri documentCollectionUri, object document, RequestOptions options = null, bool disableAutomaticIdGeneration = false, CancellationToken cancellationToken = default) { - return Task.Run(async () => await client.ExecuteWithRetries(() => client.UpsertDocumentAsync(documentCollectionUri, document, options, disableAutomaticIdGeneration, cancellationToken)), cancellationToken); + if (enablePartition == false && options != null) + { + options.PartitionKey = null; + } + + return Task.Run(async () => await client.ExecuteWithRetries(x => x.UpsertDocumentAsync(documentCollectionUri, document, options, disableAutomaticIdGeneration, cancellationToken)), cancellationToken); } /// @@ -60,9 +114,14 @@ internal static class ClientHelper /// the URI of the document to delete. /// The request options for the request. /// (Optional) representing request cancellation. - internal static Task> DeleteDocumentWithRetriesAsync(this DocumentClient client, Uri documentUri, RequestOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) + internal static Task> DeleteDocumentWithRetriesAsync(this DocumentClient client, Uri documentUri, RequestOptions options = null, CancellationToken cancellationToken = default) { - return Task.Run(async () => await client.ExecuteWithRetries(() => client.DeleteDocumentAsync(documentUri, options, cancellationToken)), cancellationToken); + if (enablePartition == false && options != null) + { + options.PartitionKey = null; + } + + return Task.Run(async () => await client.ExecuteWithRetries(x => x.DeleteDocumentAsync(documentUri, options, cancellationToken)), cancellationToken); } /// @@ -74,9 +133,14 @@ internal static class ClientHelper /// The request options for the request. /// (Optional) representing request cancellation. /// - internal static Task> ReplaceDocumentWithRetriesAsync(this DocumentClient client, Uri documentUri, object document, RequestOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) + internal static Task> ReplaceDocumentWithRetriesAsync(this DocumentClient client, Uri documentUri, object document, RequestOptions options = null, CancellationToken cancellationToken = default) { - return Task.Run(async () => await client.ExecuteWithRetries(() => client.ReplaceDocumentAsync(documentUri, document, options, cancellationToken)), cancellationToken); + if (enablePartition == false && options != null) + { + options.PartitionKey = null; + } + + return Task.Run(async () => await client.ExecuteWithRetries(x => x.ReplaceDocumentAsync(documentUri, document, options, cancellationToken)), cancellationToken); } /// @@ -87,41 +151,20 @@ internal static class ClientHelper /// the URI of the stored procedure to be executed. /// the parameters for the stored procedure execution. /// - internal static Task> ExecuteStoredProcedureWithRetriesAsync(this DocumentClient client, Uri storedProcedureUri, params object[] procedureParams) + internal static Task> ExecuteStoredProcedureWithRetriesAsync(this DocumentClient client, Uri storedProcedureUri, RequestOptions options = null, CancellationToken cancellationToken = default, params object[] procedureParams) { - return Task.Run(async () => await client.ExecuteWithRetries(() => client.ExecuteStoredProcedureAsync(storedProcedureUri, procedureParams))); - } - - /// - /// Execute the function with retries on throttle - /// - internal static async Task> ExecuteNextWithRetriesAsync(this IDocumentQuery query) - { - while (true) + if (enablePartition == false && options != null) { - TimeSpan timeSpan; - - try - { - return await query.ExecuteNextAsync(); - } - catch (DocumentClientException ex) when (ex.StatusCode != null && (int)ex.StatusCode == 429) - { - timeSpan = ex.RetryAfter; - } - catch (AggregateException ex) when (ex.InnerException is DocumentClientException de && de.StatusCode != null && (int)de.StatusCode == 429) - { - timeSpan = de.RetryAfter; - } - - await Task.Delay(timeSpan); + options.PartitionKey = null; } + + return Task.Run(async () => await client.ExecuteWithRetries(x => x.ExecuteStoredProcedureAsync(storedProcedureUri, options, cancellationToken, procedureParams))); } /// /// Execute the function with retries on throttle /// - internal static async Task ExecuteWithRetries(this DocumentClient client, Func> function) + internal static async Task ExecuteWithRetries(this DocumentClient client, Func> function) { while (true) { @@ -129,7 +172,7 @@ internal static async Task ExecuteWithRetries(this DocumentClient client, try { - return await function(); + return await function(client); } catch (DocumentClientException ex) when (ex.StatusCode != null && (int)ex.StatusCode == 429) { diff --git a/Hangfire.AzureDocumentDB/Helper/HashHelper.cs b/src/Helper/HashHelper.cs similarity index 100% rename from Hangfire.AzureDocumentDB/Helper/HashHelper.cs rename to src/Helper/HashHelper.cs diff --git a/src/Helper/QueryHelper.cs b/src/Helper/QueryHelper.cs new file mode 100644 index 0000000..5c5be16 --- /dev/null +++ b/src/Helper/QueryHelper.cs @@ -0,0 +1,60 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.Client; +using Microsoft.Azure.Documents.Linq; + +namespace Hangfire.Azure.Helper +{ + internal static class QueryHelper + { + internal static IEnumerable ToQueryResult(this IDocumentQuery query) + { + while (query.HasMoreResults) + { + Task> task = Task.Run(async () => await query.ExecuteNextWithRetriesAsync()); + task.Wait(); + foreach (T item in task.Result) + { + yield return item; + } + } + } + + internal static IEnumerable ToQueryResult(this IQueryable queryable) + { + IDocumentQuery query = queryable.AsDocumentQuery(); + return query.ToQueryResult(); + } + + + /// + /// Execute the function with retries on throttle + /// + internal static async Task> ExecuteNextWithRetriesAsync(this IDocumentQuery query) + { + while (true) + { + TimeSpan timeSpan; + + try + { + return await query.ExecuteNextAsync(); + } + catch (DocumentClientException ex) when (ex.StatusCode != null && (int)ex.StatusCode == 429) + { + timeSpan = ex.RetryAfter; + } + catch (AggregateException ex) when (ex.InnerException is DocumentClientException de && de.StatusCode != null && (int)de.StatusCode == 429) + { + timeSpan = de.RetryAfter; + } + + await Task.Delay(timeSpan); + } + } + } +} \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB/Helper/StoredprocedureHelper.cs b/src/Helper/StoredprocedureHelper.cs similarity index 79% rename from Hangfire.AzureDocumentDB/Helper/StoredprocedureHelper.cs rename to src/Helper/StoredprocedureHelper.cs index 3fd24a7..a4694c5 100644 --- a/Hangfire.AzureDocumentDB/Helper/StoredprocedureHelper.cs +++ b/src/Helper/StoredprocedureHelper.cs @@ -1,11 +1,12 @@ using System; using System.Linq; +using System.Threading; using System.Threading.Tasks; -using Microsoft.Azure.Documents.Client; - using Hangfire.Azure.Documents; +using Microsoft.Azure.Documents.Client; + namespace Hangfire.Azure.Helper { internal static class StoredprocedureHelper @@ -23,27 +24,27 @@ internal static void Setup(string databaseName, string collectionName) spUpsertDocumentsUri = UriFactory.CreateStoredProcedureUri(databaseName, collectionName, "upsertDocuments"); } - internal static int ExecuteUpsertDocuments(this DocumentClient client, Data data) + internal static int ExecuteUpsertDocuments(this DocumentClient client, Data data, RequestOptions options = null, CancellationToken cancellationToken = default) { int affected = 0; Data records = new Data(data.Items); do { records.Items = data.Items.Skip(affected).ToList(); - Task> task = client.ExecuteStoredProcedureWithRetriesAsync(spUpsertDocumentsUri, records); + Task> task = client.ExecuteStoredProcedureWithRetriesAsync(spUpsertDocumentsUri, options, cancellationToken, records); task.Wait(); affected += task.Result; } while (affected < data.Items.Count); return affected; } - internal static int ExecuteDeleteDocuments(this DocumentClient client, string query) + internal static int ExecuteDeleteDocuments(this DocumentClient client, string query, RequestOptions options = null, CancellationToken cancellationToken = default) { int affected = 0; ProcedureResponse response; do { - Task> task = client.ExecuteStoredProcedureWithRetriesAsync(spDeleteDocumentsUri, query); + Task> task = client.ExecuteStoredProcedureWithRetriesAsync(spDeleteDocumentsUri, options, cancellationToken, query); task.Wait(); response = task.Result; affected += response.Affected; @@ -51,23 +52,23 @@ internal static int ExecuteDeleteDocuments(this DocumentClient client, string qu return affected; } - internal static void ExecutePersistDocuments(this DocumentClient client, string query) + internal static void ExecutePersistDocuments(this DocumentClient client, string query, RequestOptions options = null, CancellationToken cancellationToken = default) { ProcedureResponse response; do { - Task> task = client.ExecuteStoredProcedureWithRetriesAsync(spPersistDocumentsUri, query); + Task> task = client.ExecuteStoredProcedureWithRetriesAsync(spPersistDocumentsUri, options, cancellationToken, query); task.Wait(); response = task.Result; } while (response.Continuation); } - internal static void ExecuteExpireDocuments(this DocumentClient client, string query, int epoch) + internal static void ExecuteExpireDocuments(this DocumentClient client, string query, int epoch, RequestOptions options = null, CancellationToken cancellationToken = default) { ProcedureResponse response; do { - Task> task = client.ExecuteStoredProcedureWithRetriesAsync(spExpireDocumentsUri, query, epoch); + Task> task = client.ExecuteStoredProcedureWithRetriesAsync(spExpireDocumentsUri, options, cancellationToken, query, epoch); task.Wait(); response = task.Result; } while (response.Continuation); diff --git a/Hangfire.AzureDocumentDB/Helper/TimeHelper.cs b/src/Helper/TimeHelper.cs similarity index 100% rename from Hangfire.AzureDocumentDB/Helper/TimeHelper.cs rename to src/Helper/TimeHelper.cs diff --git a/Hangfire.AzureDocumentDB/Json/DocumentContractResolver.cs b/src/Json/DocumentContractResolver.cs similarity index 98% rename from Hangfire.AzureDocumentDB/Json/DocumentContractResolver.cs rename to src/Json/DocumentContractResolver.cs index 9af4ac7..359abde 100644 --- a/Hangfire.AzureDocumentDB/Json/DocumentContractResolver.cs +++ b/src/Json/DocumentContractResolver.cs @@ -4,8 +4,6 @@ using Newtonsoft.Json.Linq; using Newtonsoft.Json.Serialization; -using Hangfire.Azure.Documents; - namespace Hangfire.Azure.Documents.Json { internal class DocumentContractResolver : CamelCasePropertyNamesContractResolver diff --git a/Hangfire.AzureDocumentDB/Queue/Entities/FetchedJob.cs b/src/Queue/Entities/FetchedJob.cs similarity index 86% rename from Hangfire.AzureDocumentDB/Queue/Entities/FetchedJob.cs rename to src/Queue/Entities/FetchedJob.cs index 8598489..227c00f 100644 --- a/Hangfire.AzureDocumentDB/Queue/Entities/FetchedJob.cs +++ b/src/Queue/Entities/FetchedJob.cs @@ -3,13 +3,14 @@ using System.Threading; using System.Threading.Tasks; +using Hangfire.Azure.Documents; +using Hangfire.Azure.Helper; using Hangfire.Logging; using Hangfire.Storage; + using Microsoft.Azure.Documents; using Microsoft.Azure.Documents.Client; -using Hangfire.Azure.Helper; - // ReSharper disable once CheckNamespace namespace Hangfire.Azure.Queue { @@ -23,6 +24,7 @@ internal class FetchedJob : IFetchedJob private bool disposed; private bool removedFromQueue; private bool reQueued; + private readonly PartitionKey partitionKey = new PartitionKey((int)DocumentTypes.Queue); public FetchedJob(DocumentDbStorage storage, Documents.Queue data) { @@ -58,7 +60,7 @@ public void RemoveFromQueue() try { Uri deleteUri = new Uri(data.SelfLink, UriKind.Relative); - Task> task = storage.Client.DeleteDocumentWithRetriesAsync(deleteUri); + Task> task = storage.Client.DeleteDocumentWithRetriesAsync(deleteUri, new RequestOptions { PartitionKey = partitionKey }); task.Wait(); } finally @@ -79,7 +81,7 @@ public void Requeue() try { Uri replaceUri = new Uri(data.SelfLink, UriKind.Relative); - Task> task = storage.Client.ReplaceDocumentWithRetriesAsync(replaceUri, data); + Task> task = storage.Client.ReplaceDocumentWithRetriesAsync(replaceUri, data, new RequestOptions { PartitionKey = partitionKey }); task.Wait(); } catch (DocumentClientException ex) when (ex.StatusCode == HttpStatusCode.NotFound) @@ -88,7 +90,7 @@ public void Requeue() data.SelfLink = null; Uri collectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, storage.Options.CollectionName); - Task> task = storage.Client.CreateDocumentWithRetriesAsync(collectionUri, data); + Task> task = storage.Client.CreateDocumentWithRetriesAsync(collectionUri, data, new RequestOptions { PartitionKey = partitionKey }); task.Wait(); } finally @@ -110,7 +112,7 @@ private void KeepAliveJobCallback(object obj) queue.FetchedAt = DateTime.UtcNow; Uri replaceUri = new Uri(queue.SelfLink, UriKind.Relative); - Task> task = storage.Client.ReplaceDocumentWithRetriesAsync(replaceUri, queue); + Task> task = storage.Client.ReplaceDocumentWithRetriesAsync(replaceUri, queue, new RequestOptions { PartitionKey = partitionKey }); task.Wait(); logger.Trace($"Keep-alive query for job: {queue.Id} sent"); diff --git a/Hangfire.AzureDocumentDB/Queue/Interface/IPersistentJobQueue.cs b/src/Queue/Interface/IPersistentJobQueue.cs similarity index 83% rename from Hangfire.AzureDocumentDB/Queue/Interface/IPersistentJobQueue.cs rename to src/Queue/Interface/IPersistentJobQueue.cs index ca5cbdc..b1c2f37 100644 --- a/Hangfire.AzureDocumentDB/Queue/Interface/IPersistentJobQueue.cs +++ b/src/Queue/Interface/IPersistentJobQueue.cs @@ -1,5 +1,6 @@ -using Hangfire.Storage; -using System.Threading; +using System.Threading; + +using Hangfire.Storage; // ReSharper disable once CheckNamespace namespace Hangfire.Azure.Queue diff --git a/Hangfire.AzureDocumentDB/Queue/Interface/IPersistentJobQueueMonitoringApi.cs b/src/Queue/Interface/IPersistentJobQueueMonitoringApi.cs similarity index 100% rename from Hangfire.AzureDocumentDB/Queue/Interface/IPersistentJobQueueMonitoringApi.cs rename to src/Queue/Interface/IPersistentJobQueueMonitoringApi.cs diff --git a/Hangfire.AzureDocumentDB/Queue/Interface/IPersistentJobQueueProvider.cs b/src/Queue/Interface/IPersistentJobQueueProvider.cs similarity index 100% rename from Hangfire.AzureDocumentDB/Queue/Interface/IPersistentJobQueueProvider.cs rename to src/Queue/Interface/IPersistentJobQueueProvider.cs diff --git a/Hangfire.AzureDocumentDB/Queue/JobQueue.cs b/src/Queue/JobQueue.cs similarity index 92% rename from Hangfire.AzureDocumentDB/Queue/JobQueue.cs rename to src/Queue/JobQueue.cs index 1567c43..8a1e9a1 100644 --- a/Hangfire.AzureDocumentDB/Queue/JobQueue.cs +++ b/src/Queue/JobQueue.cs @@ -1,17 +1,18 @@ using System; +using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; -using System.Collections.Generic; +using Hangfire.Azure.Documents; +using Hangfire.Azure.Documents.Helper; +using Hangfire.Azure.Helper; using Hangfire.Logging; using Hangfire.Storage; + using Microsoft.Azure.Documents; using Microsoft.Azure.Documents.Client; -using Hangfire.Azure.Helper; -using Hangfire.Azure.Documents.Helper; - namespace Hangfire.Azure.Queue { internal class JobQueue : IPersistentJobQueue @@ -22,6 +23,7 @@ internal class JobQueue : IPersistentJobQueue private readonly TimeSpan defaultLockTimeout; private readonly TimeSpan invisibilityTimeout = TimeSpan.FromMinutes(15); private readonly object syncLock = new object(); + private readonly PartitionKey partitionKey = new PartitionKey((int)DocumentTypes.Queue); public JobQueue(DocumentDbStorage storage) { @@ -60,7 +62,7 @@ public IFetchedJob Dequeue(string[] queues, CancellationToken cancellationToken) }; sql.Parameters.Add(new SqlParameter("@timeout", invisibilityTimeoutEpoch)); - Documents.Queue data = storage.Client.CreateDocumentQuery(storage.CollectionUri, sql) + Documents.Queue data = storage.Client.CreateDocumentQueryAsync(storage.CollectionUri, sql, new FeedOptions { PartitionKey = partitionKey }) .ToQueryResult() .FirstOrDefault(); @@ -70,7 +72,7 @@ public IFetchedJob Dequeue(string[] queues, CancellationToken cancellationToken) data.FetchedAt = DateTime.UtcNow; Uri replaceUri = new Uri(data.SelfLink, UriKind.Relative); - Task> task = storage.Client.ReplaceDocumentWithRetriesAsync(replaceUri, data, cancellationToken: cancellationToken); + Task> task = storage.Client.ReplaceDocumentWithRetriesAsync(replaceUri, data, new RequestOptions { PartitionKey = partitionKey }, cancellationToken: cancellationToken); task.Wait(cancellationToken); logger.Trace($"Found job {data.JobId} from the queue : {data.Name}"); diff --git a/Hangfire.AzureDocumentDB/Queue/JobQueueMonitoringApi.cs b/src/Queue/JobQueueMonitoringApi.cs similarity index 82% rename from Hangfire.AzureDocumentDB/Queue/JobQueueMonitoringApi.cs rename to src/Queue/JobQueueMonitoringApi.cs index 7400bab..8bba435 100644 --- a/Hangfire.AzureDocumentDB/Queue/JobQueueMonitoringApi.cs +++ b/src/Queue/JobQueueMonitoringApi.cs @@ -1,113 +1,105 @@ -using System; -using System.Linq; -using System.Collections.Generic; - -using Hangfire.Azure.Documents; -using Microsoft.Azure.Documents; -using Microsoft.Azure.Documents.Client; -using Microsoft.Azure.Documents.SystemFunctions; - -using Hangfire.Azure.Helper; - -namespace Hangfire.Azure.Queue -{ - internal class JobQueueMonitoringApi : IPersistentJobQueueMonitoringApi - { - private readonly DocumentDbStorage storage; - private readonly List queuesCache = new List(); - private DateTime cacheUpdated; - private readonly object cacheLock = new object(); - private static readonly TimeSpan queuesCacheTimeout = TimeSpan.FromSeconds(5); - - public JobQueueMonitoringApi(DocumentDbStorage storage) => this.storage = storage; - - public IEnumerable GetQueues() - { - lock (cacheLock) - { - if (queuesCache.Count == 0 || cacheUpdated.Add(queuesCacheTimeout) < DateTime.UtcNow) - { - SqlQuerySpec sql = new SqlQuerySpec - { - QueryText = "SELECT VALUE doc['name'] FROM doc WHERE doc.type = @type", - Parameters = new SqlParameterCollection - { - new SqlParameter("@type", (int)DocumentTypes.Queue) - } - }; - - IEnumerable result = storage.Client.CreateDocumentQuery(storage.CollectionUri, sql) - .ToQueryResult() - .Distinct(); - - queuesCache.Clear(); - queuesCache.AddRange(result); - cacheUpdated = DateTime.UtcNow; - } - - return queuesCache.ToList(); - } - } - - public int GetEnqueuedCount(string queue) - { - SqlQuerySpec sql = new SqlQuerySpec - { - QueryText = "SELECT TOP 1 VALUE COUNT(1) FROM doc WHERE doc.type = @type AND doc.name = @name", - Parameters = new SqlParameterCollection - { - new SqlParameter("@name", queue), - new SqlParameter("@type", (int)DocumentTypes.Queue) - } - }; - - return storage.Client.CreateDocumentQuery(storage.CollectionUri, sql) - .ToQueryResult() - .FirstOrDefault(); - } - - public IEnumerable GetEnqueuedJobIds(string queue, int from, int perPage) - { - FeedOptions feedOptions = new FeedOptions - { - EnableCrossPartitionQuery = true - }; - - return storage.Client.CreateDocumentQuery(storage.CollectionUri, feedOptions) - .Where(q => q.DocumentType == DocumentTypes.Queue && q.Name == queue && q.FetchedAt.IsDefined() == false) - .OrderBy(q => q.CreatedOn) - .Skip(from).Take(perPage) - .Select(q => q.JobId) - .ToQueryResult(); - } - - public IEnumerable GetFetchedJobIds(string queue, int from, int perPage) - { - FeedOptions feedOptions = new FeedOptions - { - EnableCrossPartitionQuery = true - }; - - return storage.Client.CreateDocumentQuery(storage.CollectionUri, feedOptions) - .Where(q => q.DocumentType == DocumentTypes.Queue && q.Name == queue && q.FetchedAt.IsDefined()) - .OrderBy(q => q.CreatedOn) - .Skip(from).Take(perPage) - .Select(q => q.JobId) - .ToQueryResult(); - } - - public (int? EnqueuedCount, int? FetchedCount) GetEnqueuedAndFetchedCount(string queue) - { - (int EnqueuedCount, int FetchedCount) result = storage.Client.CreateDocumentQuery(storage.CollectionUri) - .Where(q => q.DocumentType == DocumentTypes.Queue && q.Name == queue) - .Select(q => new { q.Name, EnqueuedCount = q.FetchedAt.IsDefined() ? 0 : 1, FetchedCount = q.FetchedAt.IsDefined() ? 1 : 0 }) - .ToQueryResult() - .GroupBy(q => q.Name) - .Select(v => (EnqueuedCount: v.Sum(q => q.EnqueuedCount), FetchedCount: v.Sum(q => q.FetchedCount))) - .FirstOrDefault(); - - return result; - } - - } +using System; +using System.Collections.Generic; +using System.Linq; + +using Hangfire.Azure.Documents; +using Hangfire.Azure.Helper; + +using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.Client; +using Microsoft.Azure.Documents.SystemFunctions; + +namespace Hangfire.Azure.Queue +{ + internal class JobQueueMonitoringApi : IPersistentJobQueueMonitoringApi + { + private readonly DocumentDbStorage storage; + private readonly List queuesCache = new List(); + private DateTime cacheUpdated; + private readonly object cacheLock = new object(); + private static readonly TimeSpan queuesCacheTimeout = TimeSpan.FromSeconds(5); + private readonly PartitionKey partitionKey = new PartitionKey((int)DocumentTypes.Queue); + + public JobQueueMonitoringApi(DocumentDbStorage storage) => this.storage = storage; + + public IEnumerable GetQueues() + { + lock (cacheLock) + { + if (queuesCache.Count == 0 || cacheUpdated.Add(queuesCacheTimeout) < DateTime.UtcNow) + { + SqlQuerySpec sql = new SqlQuerySpec + { + QueryText = "SELECT VALUE doc['name'] FROM doc WHERE doc.type = @type", + Parameters = new SqlParameterCollection + { + new SqlParameter("@type", (int)DocumentTypes.Queue) + } + }; + + IEnumerable result = storage.Client.CreateDocumentQueryAsync(storage.CollectionUri, sql, new FeedOptions { PartitionKey = partitionKey }) + .ToQueryResult() + .Distinct(); + + queuesCache.Clear(); + queuesCache.AddRange(result); + cacheUpdated = DateTime.UtcNow; + } + + return queuesCache.ToList(); + } + } + + public int GetEnqueuedCount(string queue) + { + SqlQuerySpec sql = new SqlQuerySpec + { + QueryText = "SELECT TOP 1 VALUE COUNT(1) FROM doc WHERE doc.type = @type AND doc.name = @name", + Parameters = new SqlParameterCollection + { + new SqlParameter("@name", queue), + new SqlParameter("@type", (int)DocumentTypes.Queue) + } + }; + + return storage.Client.CreateDocumentQueryAsync(storage.CollectionUri, sql, new FeedOptions { PartitionKey = partitionKey }) + .ToQueryResult() + .FirstOrDefault(); + } + + public IEnumerable GetEnqueuedJobIds(string queue, int from, int perPage) + { + return storage.Client.CreateDocumentQueryAsync(storage.CollectionUri, new FeedOptions { PartitionKey = partitionKey }) + .Where(q => q.DocumentType == DocumentTypes.Queue && q.Name == queue && q.FetchedAt.IsDefined() == false) + .OrderBy(q => q.CreatedOn) + .Skip(from).Take(perPage) + .Select(q => q.JobId) + .ToQueryResult(); + } + + public IEnumerable GetFetchedJobIds(string queue, int from, int perPage) + { + return storage.Client.CreateDocumentQueryAsync(storage.CollectionUri, new FeedOptions { PartitionKey = partitionKey }) + .Where(q => q.DocumentType == DocumentTypes.Queue && q.Name == queue && q.FetchedAt.IsDefined()) + .OrderBy(q => q.CreatedOn) + .Skip(from).Take(perPage) + .Select(q => q.JobId) + .ToQueryResult(); + } + + public (int? EnqueuedCount, int? FetchedCount) GetEnqueuedAndFetchedCount(string queue) + { + + (int EnqueuedCount, int FetchedCount) result = storage.Client.CreateDocumentQueryAsync(storage.CollectionUri, new FeedOptions { PartitionKey = partitionKey }) + .Where(q => q.DocumentType == DocumentTypes.Queue && q.Name == queue) + .Select(q => new { q.Name, EnqueuedCount = q.FetchedAt.IsDefined() ? 0 : 1, FetchedCount = q.FetchedAt.IsDefined() ? 1 : 0 }) + .ToQueryResult() + .GroupBy(q => q.Name) + .Select(v => (EnqueuedCount: v.Sum(q => q.EnqueuedCount), FetchedCount: v.Sum(q => q.FetchedCount))) + .FirstOrDefault(); + + return result; + } + + } } \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB/Queue/JobQueueProvider.cs b/src/Queue/JobQueueProvider.cs similarity index 100% rename from Hangfire.AzureDocumentDB/Queue/JobQueueProvider.cs rename to src/Queue/JobQueueProvider.cs diff --git a/Hangfire.AzureDocumentDB/Queue/PersistentJobQueueProviderCollection.cs b/src/Queue/PersistentJobQueueProviderCollection.cs similarity index 100% rename from Hangfire.AzureDocumentDB/Queue/PersistentJobQueueProviderCollection.cs rename to src/Queue/PersistentJobQueueProviderCollection.cs diff --git a/Hangfire.AzureDocumentDB/StoredProcedure/deleteDocuments.js b/src/StoredProcedure/deleteDocuments.js similarity index 100% rename from Hangfire.AzureDocumentDB/StoredProcedure/deleteDocuments.js rename to src/StoredProcedure/deleteDocuments.js diff --git a/Hangfire.AzureDocumentDB/StoredProcedure/deleteDocuments.ts b/src/StoredProcedure/deleteDocuments.ts similarity index 100% rename from Hangfire.AzureDocumentDB/StoredProcedure/deleteDocuments.ts rename to src/StoredProcedure/deleteDocuments.ts diff --git a/Hangfire.AzureDocumentDB/StoredProcedure/document.js b/src/StoredProcedure/document.js similarity index 100% rename from Hangfire.AzureDocumentDB/StoredProcedure/document.js rename to src/StoredProcedure/document.js diff --git a/Hangfire.AzureDocumentDB/StoredProcedure/document.ts b/src/StoredProcedure/document.ts similarity index 98% rename from Hangfire.AzureDocumentDB/StoredProcedure/document.ts rename to src/StoredProcedure/document.ts index bd41ee5..020e312 100644 --- a/Hangfire.AzureDocumentDB/StoredProcedure/document.ts +++ b/src/StoredProcedure/document.ts @@ -6,6 +6,7 @@ interface IDocumentBase extends IDocumentMeta { type: number; // ReSharper disable once InconsistentNaming expire_on: number; + ttl?: number; } interface IProcedureResponse { diff --git a/Hangfire.AzureDocumentDB/StoredProcedure/expireDocuments.js b/src/StoredProcedure/expireDocuments.js similarity index 100% rename from Hangfire.AzureDocumentDB/StoredProcedure/expireDocuments.js rename to src/StoredProcedure/expireDocuments.js diff --git a/Hangfire.AzureDocumentDB/StoredProcedure/expireDocuments.ts b/src/StoredProcedure/expireDocuments.ts similarity index 100% rename from Hangfire.AzureDocumentDB/StoredProcedure/expireDocuments.ts rename to src/StoredProcedure/expireDocuments.ts diff --git a/Hangfire.AzureDocumentDB/StoredProcedure/heartbeatServer.js b/src/StoredProcedure/heartbeatServer.js similarity index 100% rename from Hangfire.AzureDocumentDB/StoredProcedure/heartbeatServer.js rename to src/StoredProcedure/heartbeatServer.js diff --git a/Hangfire.AzureDocumentDB/StoredProcedure/heartbeatServer.ts b/src/StoredProcedure/heartbeatServer.ts similarity index 100% rename from Hangfire.AzureDocumentDB/StoredProcedure/heartbeatServer.ts rename to src/StoredProcedure/heartbeatServer.ts diff --git a/Hangfire.AzureDocumentDB/StoredProcedure/persistDocuments.js b/src/StoredProcedure/persistDocuments.js similarity index 98% rename from Hangfire.AzureDocumentDB/StoredProcedure/persistDocuments.js rename to src/StoredProcedure/persistDocuments.js index b3a955a..cb46627 100644 --- a/Hangfire.AzureDocumentDB/StoredProcedure/persistDocuments.js +++ b/src/StoredProcedure/persistDocuments.js @@ -40,6 +40,7 @@ function persistDocument(query) { function tryUpdate(documents) { if (documents.length > 0) { let doc = documents[0]; + delete doc.ttl; delete doc.expire_on; let option = { etag: doc._etag diff --git a/Hangfire.AzureDocumentDB/StoredProcedure/persistDocuments.ts b/src/StoredProcedure/persistDocuments.ts similarity index 99% rename from Hangfire.AzureDocumentDB/StoredProcedure/persistDocuments.ts rename to src/StoredProcedure/persistDocuments.ts index f23d752..aa8234e 100644 --- a/Hangfire.AzureDocumentDB/StoredProcedure/persistDocuments.ts +++ b/src/StoredProcedure/persistDocuments.ts @@ -63,6 +63,7 @@ function persistDocument(query: string) { if (documents.length > 0) { let doc: IDocumentBase = documents[0]; + delete doc.ttl; delete doc.expire_on; let option: IReplaceOptions = { diff --git a/Hangfire.AzureDocumentDB/StoredProcedure/setJobParameter.js b/src/StoredProcedure/setJobParameter.js similarity index 100% rename from Hangfire.AzureDocumentDB/StoredProcedure/setJobParameter.js rename to src/StoredProcedure/setJobParameter.js diff --git a/Hangfire.AzureDocumentDB/StoredProcedure/setJobParameter.ts b/src/StoredProcedure/setJobParameter.ts similarity index 100% rename from Hangfire.AzureDocumentDB/StoredProcedure/setJobParameter.ts rename to src/StoredProcedure/setJobParameter.ts diff --git a/src/StoredProcedure/setJobState.js b/src/StoredProcedure/setJobState.js new file mode 100644 index 0000000..07da5bc --- /dev/null +++ b/src/StoredProcedure/setJobState.js @@ -0,0 +1,28 @@ +function setJobState(id, state) { + let context = getContext(); + let collection = context.getCollection(); + let response = getContext().getResponse(); + let collectionLink = collection.getAltLink(); + let documentLink = `${collectionLink}/docs/${id}`; + response.setBody(false); + let isAccepted = collection.readDocument(documentLink, (error, job) => { + if (error) { + throw error; + } + job.state_id = state.id; + job.state_name = state.name; + let options = { etag: job._etag }; + let success = collection.replaceDocument(job._self, job, options, (err) => { + if (err) { + throw err; + } + response.setBody(true); + }); + if (!success) { + throw new Error("The call was not accepted"); + } + }); + if (!isAccepted) { + throw new Error("The call was not accepted"); + } +} diff --git a/src/StoredProcedure/setJobState.ts b/src/StoredProcedure/setJobState.ts new file mode 100644 index 0000000..a8cf63f --- /dev/null +++ b/src/StoredProcedure/setJobState.ts @@ -0,0 +1,40 @@ +/** + * Set the Job state data + * @param {string} id - the job id + * @param {IState} state - the state document + */ +function setJobState(id: string, state: IState) { + let context: IContext = getContext(); + let collection: ICollection = context.getCollection(); + let response: IResponse = getContext().getResponse(); + let collectionLink: string = collection.getAltLink(); + let documentLink: string = `${collectionLink}/docs/${id}`; + + // default response + response.setBody(false); + + let isAccepted: boolean = collection.readDocument(documentLink, (error: IRequestCallbackError, job: IJob) => { + if (error) { + throw error; + } + + job.state_id = state.id; + job.state_name = state.name; + let options: IReplaceOptions = { etag: job._etag }; + + let success: boolean = collection.replaceDocument(job._self, job, options, (err: IRequestCallbackError) => { + if (err) { + throw err; + } + response.setBody(true); + }); + + if (!success) { + throw new Error("The call was not accepted"); + } + }); + + if (!isAccepted) { + throw new Error("The call was not accepted"); + } +} \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB/StoredProcedure/upsertDocuments.js b/src/StoredProcedure/upsertDocuments.js similarity index 100% rename from Hangfire.AzureDocumentDB/StoredProcedure/upsertDocuments.js rename to src/StoredProcedure/upsertDocuments.js diff --git a/Hangfire.AzureDocumentDB/StoredProcedure/upsertDocuments.ts b/src/StoredProcedure/upsertDocuments.ts similarity index 97% rename from Hangfire.AzureDocumentDB/StoredProcedure/upsertDocuments.ts rename to src/StoredProcedure/upsertDocuments.ts index 8ef6f40..751e0fe 100644 --- a/Hangfire.AzureDocumentDB/StoredProcedure/upsertDocuments.ts +++ b/src/StoredProcedure/upsertDocuments.ts @@ -1,58 +1,58 @@ -/** - * Upsert documents - * @param {IData} data - the array of documents - */ -function upsertDocuments(data: IData) { - let context: IContext = getContext(); - let collection: ICollection = context.getCollection(); - let response: IResponse = getContext().getResponse(); - let collectionLink: string = collection.getSelfLink(); - - if (data.items.length === 0) { - response.setBody(0); - return; - } - - let count: number = 0; - let docs: Array = data.items; - let docsLength: number = docs.length; - - // Call the CRUD API to upsert a document. - tryUpsert(docs[count]); - - // Note that there are 2 exit conditions: - // 1) The upsertDocument request was not accepted. In this case the callback will not be called, we just call setBody and we are done. - // 2) The callback was called docs.length times. In this case all documents were created/updated and we don't need to call tryUpsert anymore. - // Just call setBody and we are done. - function tryUpsert(doc: IDocumentBase) { - let isAccepted: boolean = collection.upsertDocument(collectionLink, doc, callback); - - // If the request was accepted, callback will be called. - // Otherwise report current count back to the client, - // which will call the script again with remaining set of docs. - // This condition will happen when this stored procedure has been running too long - // and is about to get cancelled by the server. This will allow the calling client - // to resume this batch from the point we got to before isAccepted was set to false - if (!isAccepted) { - response.setBody(count); - } - } - - // This is called when collection.upsertDocument is done and the document has been persisted. - function callback(err: IRequestCallbackError) { - if (err) { - throw err; - } - - // One more document has been inserted/updated, increment the count. - count++; - - if (count >= docsLength) { - // If we have inserted/updated all documents, we are done. Just set the response. - response.setBody(count); - } else { - // upsert next document. - tryUpsert(docs[count]); - } - } +/** + * Upsert documents + * @param {IData} data - the array of documents + */ +function upsertDocuments(data: IData) { + let context: IContext = getContext(); + let collection: ICollection = context.getCollection(); + let response: IResponse = getContext().getResponse(); + let collectionLink: string = collection.getSelfLink(); + + if (data.items.length === 0) { + response.setBody(0); + return; + } + + let count: number = 0; + let docs: Array = data.items; + let docsLength: number = docs.length; + + // Call the CRUD API to upsert a document. + tryUpsert(docs[count]); + + // Note that there are 2 exit conditions: + // 1) The upsertDocument request was not accepted. In this case the callback will not be called, we just call setBody and we are done. + // 2) The callback was called docs.length times. In this case all documents were created/updated and we don't need to call tryUpsert anymore. + // Just call setBody and we are done. + function tryUpsert(doc: IDocumentBase) { + let isAccepted: boolean = collection.upsertDocument(collectionLink, doc, callback); + + // If the request was accepted, callback will be called. + // Otherwise report current count back to the client, + // which will call the script again with remaining set of docs. + // This condition will happen when this stored procedure has been running too long + // and is about to get cancelled by the server. This will allow the calling client + // to resume this batch from the point we got to before isAccepted was set to false + if (!isAccepted) { + response.setBody(count); + } + } + + // This is called when collection.upsertDocument is done and the document has been persisted. + function callback(err: IRequestCallbackError) { + if (err) { + throw err; + } + + // One more document has been inserted/updated, increment the count. + count++; + + if (count >= docsLength) { + // If we have inserted/updated all documents, we are done. Just set the response. + response.setBody(count); + } else { + // upsert next document. + tryUpsert(docs[count]); + } + } } \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB/package-lock.json b/src/package-lock.json similarity index 100% rename from Hangfire.AzureDocumentDB/package-lock.json rename to src/package-lock.json diff --git a/Hangfire.AzureDocumentDB/package.json b/src/package.json similarity index 100% rename from Hangfire.AzureDocumentDB/package.json rename to src/package.json diff --git a/Hangfire.AzureDocumentDB/tsconfig.json b/src/tsconfig.json similarity index 100% rename from Hangfire.AzureDocumentDB/tsconfig.json rename to src/tsconfig.json