Skip to content

Commit

Permalink
Merge pull request #849 from dco123/CustomQueueName
Browse files Browse the repository at this point in the history
Add support for using a custom hangfire queue name
  • Loading branch information
rasmus authored Aug 31, 2021
2 parents e5d9c56 + c165149 commit 25b3019
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 10 deletions.
5 changes: 4 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
### New in 0.83 (not released yet)

* _Nothing yet_
* New: Queue name used by HangfireJobScheduler can be overridden:
```csharp
eventFlowOptions.UseHangfireJobScheduler(o => o.UseQueueName("myqueue"))
```

### New in 0.82.4684 (released 2021-08-31)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,17 @@ public static IEventFlowOptions UseHangfireJobScheduler(
sr.Register<IHangfireJobRunner, HangfireJobRunner>();
sr.Register<IJobDisplayNameBuilder, JobDisplayNameBuilder>();
sr.Register<IBackgroundJobClient>(r => new BackgroundJobClient());
sr.Register<IQueueNameProvider>(r => new QueueNameProvider(null));
});
}

public static IEventFlowOptions UseHangfireJobScheduler(
this IEventFlowOptions eventFlowOptions,
Action<IEventFlowHangfireOptions> configurationAction)
{
var options = eventFlowOptions.UseHangfireJobScheduler();
configurationAction(new EventFlowHangfireOptions(options));
return options;
}
}
}
47 changes: 47 additions & 0 deletions Source/EventFlow.Hangfire/Integration/EventFlowHangfireOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// The MIT License (MIT)
//
// Copyright (c) 2015-2021 Rasmus Mikkelsen
// Copyright (c) 2015-2021 eBay Software Foundation
// https://github.com/eventflow/EventFlow
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace EventFlow.Hangfire.Integration
{
internal class EventFlowHangfireOptions : IEventFlowHangfireOptions
{
private readonly IEventFlowOptions _eventFlowOptions;

public EventFlowHangfireOptions(IEventFlowOptions eventFlowOptions)
{
_eventFlowOptions = eventFlowOptions;
}

public IEventFlowHangfireOptions UseQueueName(string queueName)
{
_eventFlowOptions.RegisterServices(sr => sr.Register<IQueueNameProvider>(r => new QueueNameProvider(queueName)));
return this;
}
}
}
9 changes: 7 additions & 2 deletions Source/EventFlow.Hangfire/Integration/HangfireJobRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,16 @@ public HangfireJobRunner(
IJobRunner jobRunner)
{
_jobRunner = jobRunner;
}

public Task ExecuteAsync(string displayName, string jobName, int version, string job)
{
return _jobRunner.ExecuteAsync(jobName, version, job, CancellationToken.None);
}

public Task ExecuteAsync(string displayName, string jobName, int version, string job)
public Task ExecuteAsync(string displayName, string jobName, int version, string job, string queueName)
{
return _jobRunner.ExecuteAsync(jobName, version, job, CancellationToken.None);
}
}
}
}
17 changes: 10 additions & 7 deletions Source/EventFlow.Hangfire/Integration/HangfireJobScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,47 +34,50 @@ namespace EventFlow.Hangfire.Integration
public class HangfireJobScheduler : IJobScheduler
{
private readonly IBackgroundJobClient _backgroundJobClient;
private readonly IJobDefinitionService _jobDefinitionService;
private readonly IJobDefinitionService _jobDefinitionService;
private readonly IJsonSerializer _jsonSerializer;
private readonly ILog _log;
private readonly IJobDisplayNameBuilder _jobDisplayNameBuilder;
private readonly IJobDisplayNameBuilder _jobDisplayNameBuilder;
private readonly string _queueName;

public HangfireJobScheduler(
ILog log,
IJobDisplayNameBuilder jobDisplayNameBuilder,
IJsonSerializer jsonSerializer,
IBackgroundJobClient backgroundJobClient,
IJobDefinitionService jobDefinitionService)
IJobDefinitionService jobDefinitionService,
IQueueNameProvider queueNameProvider)
{
_log = log;
_jobDisplayNameBuilder = jobDisplayNameBuilder;
_jsonSerializer = jsonSerializer;
_backgroundJobClient = backgroundJobClient;
_jobDefinitionService = jobDefinitionService;
_jobDefinitionService = jobDefinitionService;
_queueName = queueNameProvider.QueueName;
}

public Task<IJobId> ScheduleNowAsync(IJob job, CancellationToken cancellationToken)
{
return ScheduleAsync(
job,
cancellationToken,
(c, d, n, j) => _backgroundJobClient.Enqueue<IHangfireJobRunner>(r => r.ExecuteAsync(n, d.Name, d.Version, j)));
(c, d, n, j) => _backgroundJobClient.Enqueue<IHangfireJobRunner>(r => r.ExecuteAsync(n, d.Name, d.Version, j, _queueName)));
}

public Task<IJobId> ScheduleAsync(IJob job, DateTimeOffset runAt, CancellationToken cancellationToken)
{
return ScheduleAsync(
job,
cancellationToken,
(c, d, n, j) => _backgroundJobClient.Schedule<IHangfireJobRunner>(r => r.ExecuteAsync(n, d.Name, d.Version, j), runAt));
(c, d, n, j) => _backgroundJobClient.Schedule<IHangfireJobRunner>(r => r.ExecuteAsync(n, d.Name, d.Version, j, _queueName), runAt));
}

public Task<IJobId> ScheduleAsync(IJob job, TimeSpan delay, CancellationToken cancellationToken)
{
return ScheduleAsync(
job,
cancellationToken,
(c, d, n, j) => _backgroundJobClient.Schedule<IHangfireJobRunner>(r => r.ExecuteAsync(n, d.Name, d.Version, j), delay));
(c, d, n, j) => _backgroundJobClient.Schedule<IHangfireJobRunner>(r => r.ExecuteAsync(n, d.Name, d.Version, j, _queueName), delay));
}

private async Task<IJobId> ScheduleAsync(
Expand Down
36 changes: 36 additions & 0 deletions Source/EventFlow.Hangfire/Integration/IEventFlowHangfireOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// The MIT License (MIT)
//
// Copyright (c) 2015-2021 Rasmus Mikkelsen
// Copyright (c) 2015-2021 eBay Software Foundation
// https://github.com/eventflow/EventFlow
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace EventFlow.Hangfire.Integration
{
public interface IEventFlowHangfireOptions
{
IEventFlowHangfireOptions UseQueueName(string queueName);
}
}
3 changes: 3 additions & 0 deletions Source/EventFlow.Hangfire/Integration/IHangfireJobRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,8 @@ public interface IHangfireJobRunner
{
[DisplayName("{0}")]
Task ExecuteAsync(string displayName, string jobName, int version, string job);

[DisplayName("{0}"), UseQueueFromParameter(4)]
Task ExecuteAsync(string displayName, string jobName, int version, string job, string queueName);
}
}
36 changes: 36 additions & 0 deletions Source/EventFlow.Hangfire/Integration/IQueueNameProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// The MIT License (MIT)
//
// Copyright (c) 2015-2021 Rasmus Mikkelsen
// Copyright (c) 2015-2021 eBay Software Foundation
// https://github.com/eventflow/EventFlow
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace EventFlow.Hangfire.Integration
{
public interface IQueueNameProvider
{
string QueueName { get; }
}
}
41 changes: 41 additions & 0 deletions Source/EventFlow.Hangfire/Integration/QueueNameProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// The MIT License (MIT)
//
// Copyright (c) 2015-2021 Rasmus Mikkelsen
// Copyright (c) 2015-2021 eBay Software Foundation
// https://github.com/eventflow/EventFlow
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace EventFlow.Hangfire.Integration
{
internal class QueueNameProvider : IQueueNameProvider
{
public string QueueName { get; }

public QueueNameProvider(string queueName)
{
QueueName = queueName;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// The MIT License (MIT)
//
// Copyright (c) 2015-2021 Rasmus Mikkelsen
// Copyright (c) 2015-2021 eBay Software Foundation
// https://github.com/eventflow/EventFlow
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using Hangfire.Common;
using Hangfire.States;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace EventFlow.Hangfire.Integration
{
internal class UseQueueFromParameterAttribute : JobFilterAttribute, IElectStateFilter
{
public UseQueueFromParameterAttribute(int parameterIndex)
{
if (parameterIndex < 0)
throw new InvalidOperationException("Invalid queue name parameter index");

ParameterIndex = parameterIndex;
}

public int ParameterIndex { get; }

public void OnStateElection(ElectStateContext context)
{
var enqueuedState = context.CandidateState as EnqueuedState;
if (enqueuedState != null)
{
if (ParameterIndex >= context.BackgroundJob.Job.Args.Count)
throw new InvalidOperationException("Invalid queue name parameter index");

var queueName = context.BackgroundJob.Job.Args[ParameterIndex] as string;

if (queueName != null)
enqueuedState.Queue = queueName;
}
}
}
}

0 comments on commit 25b3019

Please sign in to comment.