In this article, we will see how we can prevent the concurrent execution of a Hangfire job. Hangfire is a wonderful tool for scheduling recurring background jobs. But, if we are not careful, we can encounter strange or intermittent errors caused by concurrent execution of a given job.
We will first discuss potential problems related to concurrent job execution and then present different solutions for addressing the issue.
So let’s get going.
Introduction to Hangfire
Hangfire is a popular library for processing background jobs in .NET and .NET Core applications. It can also schedule jobs or execute them based on a recurring schema.
In this article, we will not explain the library itself and how to install and configure it – we assume you are familiar with Hangfire but if you need a refresher, please check out our article Hangfire with ASP.NET Core.
It may also be beneficial to understand background processing options, a topic we cover in another article: Different Ways to Run Background Tasks in ASP.NET Core.
So, after the Hangfire is installed and configured, let’s create a simple service class to simulate some background work that takes a long time to complete:
public class JobService { private const int OperationDurationInSeconds = 10 * 60; private readonly ILogger<JobService> _logger; public JobService(ILogger<JobService> logger) { _logger = logger; } public async Task RunJob1Async() => await PerformLongRunningOperationAsync(nameof(RunJob1Async)); private async Task PerformLongRunningOperationAsync(string jobName) { var guid = Guid.NewGuid(); _logger.LogInformation("Starting job \"{JobName}\", operation: {Guid}", jobName, guid); await Task.Delay(TimeSpan.FromSeconds(OperationDurationInSeconds)); _logger.LogInformation("Finished job \"{JobName}\" operation: {Guid}", jobName, guid); } }
The RunJob1Async()
method will start our background processing and internally call method PerformLongRunningOperationAsync()
. Of course, we need to register this service in our IoC container.
We start our jobs through API calls, so in the next step, let’s create a basic controller:
[ApiController] [Route("api/jobs")] public class JobsController : ControllerBase { private const string Job1 = "job-1"; private readonly IBackgroundJobClient _backgroundJobClient; private readonly ILogger<JobsController> _logger; public JobsController(IBackgroundJobClient backgroundJobClient, ILogger<JobsController> logger) { _backgroundJobClient = backgroundJobClient; _logger = logger; } [HttpGet("statistics")] [ProducesResponseType(StatusCodes.Status200OK)] public ActionResult<StatisticsDto> GetStatistics() { var monitoringApi = JobStorage.Current.GetMonitoringApi(); var statistics = monitoringApi.GetStatistics(); var statisticsDto = new StatisticsDto { Processing = statistics.Processing, }; return Ok(statisticsDto); } [HttpPost("create-job-1")] [ProducesResponseType(StatusCodes.Status204NoContent)] public IActionResult CreateJob1() { _logger.LogInformation("Creating job '{JobName}'", Job1); var jobId = _backgroundJobClient.Enqueue<JobService>(jobService => jobService.RunJob1Async()); _logger.LogInformation("Created job '{JobName}' with ID '{JobId}'", Job1, jobId); return NoContent(); } }
Our API has two endpoints /api/jobs/create-job-1
to create a new background job and /api/jobs/statistics
to read Hangfire statistics and report the number of currently processed jobs.
Understanding Job Concurrency
In examining our create-job-1
endpoint, we see that each call enqueues a background task. If, for example, we call our /api/jobs/create-job-1
endpoint twice in a short period, it is possible to end up with two instances of this task running concurrently.
Problems With Concurrent Job Execution
There is nothing inherently wrong with concurrency – quite the contrary. Sometimes, it might be exactly what we need, but too much can cause serious performance issues. Also, for some tasks, concurrency can lead to data inconsistency. For example, we would like to start an export/import task. While this operation runs in the background, we don’t want to start another concurrent execution.
Why could that be a problem?
Let’s say that our import task first reads some input files, processes them, and stores the final result in the database. If we are not careful, we could process the same file twice by two simultaneous executions. Similarly, while performing an export from a database to a file, we could be in a situation where two concurrent tasks try to overwrite the same file. That is precisely the problem we’re trying to solve in this article.
First, let’s demonstrate how our code would behave in its current form. We will start our API and will call our endpoint for creating a job twice:
curl.exe -i -X POST -H "http://localhost:5000/api/jobs/create-job-1" curl.exe -i -X POST -H "http://localhost:5000/api/jobs/create-job-1"
We can look at logs to see what happened:
info: HowToPreventAHangfireJobFromRunning.Controllers.JobsController[0] Creating job 'job-1' info: HowToPreventAHangfireJobFromRunning.Controllers.JobsController[0] Created job 'job-1' with ID '0d5f9046-b350-49c7-8547-a68c5fdeaba1' info: HowToPreventAHangfireJobFromRunning.Services.JobService[0] Starting job "RunJob1Async", operation: 0be2f3cc-2862-46f0-81fc-236c0c08e62c info: HowToPreventAHangfireJobFromRunning.Controllers.JobsController[0] Creating job 'job-1' info: HowToPreventAHangfireJobFromRunning.Controllers.JobsController[0] Created job 'job-1' with ID '21f23e47-6955-4e12-9970-16f806d0a38c' info: HowToPreventAHangfireJobFromRunning.Services.JobService[0] Starting job "RunJob1Async", operation: 0e266e8c-fede-46bd-92d6-2500f8aba570
To be 100% percent sure, we can verify this by calling our other endpoint (alternatively, we could use the built-in Hangfire dashboard):
curl.exe -i -X POST -H "http://localhost:5000/api/jobs/statistics"
We can see that two jobs are running:
HTTP/1.1 200 OK Connection: close Content-Type: application/json; charset=utf-8 Date: Sun, 16 Jun 2024 07:00:22 GMT Server: Kestrel Transfer-Encoding: chunked { "processing": 2 }
Strategies to Prevent Concurrent Execution of a Hangfire Job
We have introduced the idea of concurrent execution and show that our jobs can run concurrently, but what if we don’t want that? Hangfire provides a couple of ways to prevent concurrent execution. The most popular one is to use the DisableConcurrentExecution
attribute. Alternatively, we can create a custom solution using distributed locks. Lastly (if we use the paid library version), we can have the complete feature set provided by the Hangfire.Throttling
package.
Implementing DisableConcurrentExecution Attribute to Prevent Concurrent Execution of a Hangfire Job
To use the DisableConcurrentExecution
attribute, we add it to the method executed by the job:
public class JobService { // ... [DisableConcurrentExecution(timeoutInSeconds: 60)] public async Task RunJob1Async() => await PerformLongRunningOperationAsync(nameof(RunJob1Async)); // ... }
By adding this attribute, we (theoretically) achieve the effect we are looking for. If we try to create another execution of our job with this method, Hangfire will prevent that. In practice, it depends. It works if we use Microsoft SQL Server as our storage provider, but it won’t work if we use the in-memory provider.
Another possible problem is that for a recurring job (a job running based on a defined schedule – for example, every minute), this attribute will not cancel the request, but rather queue the job for later. This may be exactly what we are looking for. If not, this could lead to a situation where we have many queued jobs and an ever-increasing queue length.
With that in mind, this attribute is recommended for cases with jobs that typically complete quickly, but may occasionally have longer execution times.
Custom Solution Using Distributed Locks to Prevent Concurrent Execution of a Hangfire Job
If the DisableConcurrentExecution
attribute doesn’t fit our situation, we have another option. We can implement a custom filter to prevent concurrent execution of jobs. Our solution is based on a gist file provided by the co-author of the Hangfire library.
To use our custom filter, we need to create the following:
- A marker attribute
SkipConcurrentExecution
. - An extension class with a helper method to check if a given job has this attribute applied and whether it is “our” job.
- A filter class that contains logic for checking and preventing the concurrent execution of the job with this attribute applied.
Then, we can register our filter, apply the attribute to the job, and test to verify that everything works as expected.
SkipConcurrentExecutionAttribute
First, let’s create an empty (marker) attribute SkipConcurrentExecutionAttribute
:
public class SkipConcurrentExecutionAttribute : JobFilterAttribute { }
JobExtensions
While not a must, we might also create an extension method for the Hangfire’s Job
class:
public static class JobExtensions { public static bool SkipConcurrentExecution(this Job job) { return job.Method.GetCustomAttributes(typeof(SkipConcurrentExecutionAttribute), false).Length > 0; } }
This newly created SkipConcurrentExecution
method checks if a method associated with the job has our attribute applied.
We also need a way to identify that a particular job is “our” job and to do that, we create some methods to produce its “fingerprint”:
public static class JobExtensions { // ... public static string GetFingerprintLockKey(this Job job) { return $"{job.GetFingerprintKey()}:lock"; } public static string GetFingerprintKey(this Job job) { return $"fingerprint:{job.GetFingerprint()}"; } private static string GetFingerprint(this Job job) { if (job.Type == null || job.Method == null) { return string.Empty; } var parameters = string.Empty; if (job.Args is not null) { parameters = string.Join(".", job.Args); } return $"{job.Type.FullName}.{job.Method.Name}.{parameters}"; } }
To create the job’s fingerprint, we retrieve information about its full name, the name of the method associated with it, and that method’s parameters.
The other methods, GetFingerprintKey()
and GetFingerprintLockKey()
, help us integrate with Hangfire’s internal logic to store our fingerprints by providing a specially named key. We have to be cautious as this is only an example. We might need to adjust this implementation to our needs.
SkipConcurrentExecutionFilter
Now, we can start working on our filter class SkipConcurrentExecutionFilter
:
public class SkipConcurrentExecutionFilter : IClientFilter, IServerFilter { public void OnCreating(CreatingContext filterContext) { } public void OnPerformed(PerformedContext filterContext) { } public void OnCreated(CreatedContext filterContext) { } public void OnPerforming(PerformingContext filterContext) { } }
Our class implements two interfaces IClientFilter
and IServerFilter
. For the filter to work, we must define our logic for OnCreating()
(called on the client while creating the job) and OnPerformed()
(called on the server when the job execution is completed) methods.
OnCreating
Let’s add the implementation to the OnCreating()
method:
public void OnCreating(CreatingContext filterContext) { if (!filterContext.Job.SkipConcurrentExecution()) { return; } _logger.LogInformation("Creating job '{JobName}'...", filterContext.Job.Method.Name); if (!AddFingerprintIfNotExists(filterContext.Connection, filterContext.Job)) { _logger.LogWarning("Recurring job '{JobName}' is already running, skipping...", filterContext.Job.Method.Name); filterContext.Canceled = true; } }
This method will check whether the job has our attribute applied to it. If it does, the method will try to add the fingerprint to it by using AddFingerprintIfNotExists()
method. If this fingerprint already exists, we will not create the job.
Let’s define our AddFingerprintIfNotExists()
method:
private bool AddFingerprintIfNotExists(IStorageConnection connection, Job job) { _logger.LogInformation("Adding fingerprint for job '{JobName}'...", job.Method.Name); using (connection.AcquireDistributedLock(job.GetFingerprintLockKey(), _lockTimeout)) { var fingerprint = connection.GetAllEntriesFromHash(job.GetFingerprintKey()); if (fingerprint is not null && fingerprint.ContainsKey("Timestamp") && DateTimeOffset.TryParseExact( fingerprint["Timestamp"], "o", CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind, out var timestamp) && DateTimeOffset.UtcNow <= timestamp.Add(_fingerprintTimeout)) { _logger.LogInformation("Fingerprint for job '{JobName}' already exists, skipping...", job.Method.Name); return false; } _logger.LogInformation("Fingerprint for job '{JobName}' does not exist, adding...", job.Method.Name); connection.SetRangeInHash(job.GetFingerprintKey(), new Dictionary<string, string> { { "Timestamp", DateTimeOffset.UtcNow.ToString("o", CultureInfo.InvariantCulture) } }); return true; } }
This method uses a distributed locking mechanism (built into the Hangfire) to obtain a lock while creating and checking the job’s fingerprint. Our method either returns true or false. We return true when the job did not already exist.
OnPerformed
We will also add our implementation to the OnPerformed()
method:
public void OnPerformed(PerformedContext filterContext) { if (!filterContext.BackgroundJob.Job.SkipConcurrentExecution()) { return; } _logger.LogInformation("Performing job '{JobName}'...", filterContext.BackgroundJob.Job.Method.Name); RemoveFingerprint(filterContext.Connection, filterContext.BackgroundJob.Job); }
This method verifies whether we should apply our filter to the job, and if so, it removes the fingerprint with the help of RemoveFingerprint()
method:
private void RemoveFingerprint(IStorageConnection connection, Job job) { _logger.LogInformation("Removing fingerprint for job '{JobName}'...", job.Method.Name); using (connection.AcquireDistributedLock(job.GetFingerprintLockKey(), _lockTimeout)) using (var transaction = connection.CreateWriteTransaction()) { transaction.RemoveHash(job.GetFingerprintKey()); transaction.Commit(); _logger.LogInformation("Fingerprint for job '{JobName}' removed...", job.Method.Name); } }
This new method acquires a distributed lock and then removes our previously stored job fingerprint.
Filter Registration
We need to register our filter for future use, so we must modify our Program
:
builder.Services.AddHangfire((provider, config) => { config.SetDataCompatibilityLevel(CompatibilityLevel.Version_180); config.UseSimpleAssemblyNameTypeSerializer(); config.UseRecommendedSerializerSettings(); config.UseInMemoryStorage(); config.UseFilter(new SkipConcurrentExecutionFilter( provider.GetRequiredService<ILogger<SkipConcurrentExecutionFilter>>())); });
Here, we use the UseFilter()
method and pass a new instance of our filter to it. Our final implementation uses DI for the logger, so we’re passing it here, too.
Apply Custom Attribute
Now we can use our attribute.
Let’s add another method to our JobService
:
public class JobService { // ... [SkipConcurrentExecution] public async Task RunJob2Async() => await PerformLongRunningOperationAsync(nameof(RunJob2Async)); // ... }
Our new RunJob2Async()
method is almost the same as the old one. The main difference is our use of the SkipConcurrentExecution
attribute.
Controller
For completeness and to demonstrate that this approach is working as expected, let’s add a new endpoint /api/jobs/create-job-2
to our controller:
[ApiController] [Route("api/jobs")] public class JobsController : ControllerBase { // ... [HttpPost("create-job-2")] [ProducesResponseType(StatusCodes.Status204NoContent)] [ProducesResponseType(StatusCodes.Status400BadRequest)] public IActionResult CreateJob2() { _logger.LogInformation("Creating job '{JobName}'", Job2); var jobId = _backgroundJobClient.Enqueue<JobService>(jobService => jobService.RunJob2Async()); if (string.IsNullOrWhiteSpace(jobId)) { _logger.LogWarning("Unable to create job '{JobName}' probably it is already running", Job2); return NoContent(); } _logger.LogInformation("Created job '{JobName}' with ID '{JobId}'", Job2, jobId); return NoContent(); } }
There is nothing special here. It is almost the same code as for the other endpoint. The main difference here is that we’re also checking whether the returned jobId
is null. It will be null in that case that the filter skips running this job due to an existing instance already running.
Testing
Now we can try to start our job twice:
curl.exe -i -X POST -H "https://localhost:5001/api/jobs/create-job-2" curl.exe -i -X POST -H "https://localhost:5001/api/jobs/create-job-2"
And investigate the logs:
info: HowToPreventAHangfireJobFromRunning.Controllers.JobsController[0] Creating job 'job-2' info: HowToPreventAHangfireJobFromRunning.Configurations.Filters.SkipConcurrentExecutionFilter[0] Creating job 'RunJob2Async'... info: HowToPreventAHangfireJobFromRunning.Configurations.Filters.SkipConcurrentExecutionFilter[0] Adding fingerprint for job 'RunJob2Async'... info: HowToPreventAHangfireJobFromRunning.Configurations.Filters.SkipConcurrentExecutionFilter[0] Fingerprint for job 'RunJob2Async' does not exist, adding... info: HowToPreventAHangfireJobFromRunning.Controllers.JobsController[0] Created job 'job-2' with ID '02929c25-ef7e-467d-af10-6bded246f199' info: HowToPreventAHangfireJobFromRunning.Services.JobService[0] Starting job "RunJob2Async", operation: 26b2eba8-5c83-44ad-a49d-b5c7a9de311a info: HowToPreventAHangfireJobFromRunning.Controllers.JobsController[0] Creating job 'job-2' info: HowToPreventAHangfireJobFromRunning.Configurations.Filters.SkipConcurrentExecutionFilter[0] Creating job 'RunJob2Async'... info: HowToPreventAHangfireJobFromRunning.Configurations.Filters.SkipConcurrentExecutionFilter[0] Adding fingerprint for job 'RunJob2Async'... info: HowToPreventAHangfireJobFromRunning.Configurations.Filters.SkipConcurrentExecutionFilter[0] Fingerprint for job 'RunJob2Async' already exists, skipping... warn: HowToPreventAHangfireJobFromRunning.Configurations.Filters.SkipConcurrentExecutionFilter[0] Job 'RunJob2Async' is already running, skipping... warn: HowToPreventAHangfireJobFromRunning.Controllers.JobsController[0] Unable to create job 'job-2' probably it is already running
We can see that we are using our custom filter, which was activated to prevent concurrent execution of our job. To be sure, let’s verify that:
curl.exe -i -X POST -H "http://localhost:5000/api/jobs/statistics"
Here is the proof:
HTTP/1.1 200 OK Connection: close Content-Type: application/json; charset=utf-8 Date: Sun, 16 Jun 2024 07:07:12 GMT Server: Kestrel Transfer-Encoding: chunked { "processing": 1 }
Conclusion
In this article, we explained how to prevent a Hangfire job from running while it is already active.
We have presented two methods to achieve our objective: using the built-in DisableConcurrentExecution attribute and creating a custom filter.