Scheduled Task Requirements
Sample project repository: whuanle/HangfireDemo
There are mainly two core requirements:
- The need to implement second-level scheduled tasks;
- The development of scheduled tasks should be simple for developers, avoiding unnecessary complexity;
In a microservices architecture, scheduled tasks are one of the most commonly used infrastructure components. There are many libraries or platforms for scheduled tasks in the community, such as Quartz.NET and xxx-job, which have significant differences in usage. For instance, xxx-job is primarily based on HTTP requests, configuring scheduled tasks to implement specific HTTP request interfaces, but it can be quite complex to use.
In microservices, with too many components in use, if the integration of each component becomes cumbersome, the service code is likely to bloat significantly, leading to various bugs. Taking xxx-job as an example, if there are N scheduled tasks in a project, designing N HTTP interfaces to be triggered by xxx-job for callbacks not only results in a large number of HTTP interfaces but also increases the risk of bugs at various stages.
In a recent project requirement, there was a need for scheduled tasks. Combining C# language features, my method was to utilize the Hangfire framework and language capabilities to encapsulate some methods, allowing developers to use scheduled tasks seamlessly, significantly simplifying the workflow and usage difficulty.
Here is an example of usage, integrating the MediatR framework to define CQRS. This Command will be triggered by a scheduled task:
public class MyTestRequest : HangfireRequest, IRequest<ExecuteTasResult>
{
}
/// <summary>
/// Code to be executed by the scheduled task.
/// </summary>
public class MyTestHandler : IRequestHandler<MyTestRequest, ExecuteTasResult>
{
public async Task<ExecuteTasResult> Handle(MyTestRequest request, CancellationToken cancellationToken)
{
// Logic
return new ExecuteTasResult
{
CancelTask = false
};
}
}
To start a scheduled task, you just need to:
private readonly SendHangfireService _hangfireService;
public SendTaskController(SendHangfireService hangfireService)
{
_hangfireService = hangfireService;
}
[HttpGet("aaa")]
public async Task<string> SendAsync()
{
await _hangfireService.Send(new MyTestRequest
{
CreateTime = DateTimeOffset.Now,
CronExpression = "* * * * * *",
TaskId = Guid.NewGuid().ToString(),
});
return "aaa";
}
With this method of using scheduled tasks, developers can achieve their requirements with very simple code, without needing to focus on details or define various HTTP interfaces. Furthermore, because there is no need to focus on the external scheduled task framework being used, it allows for easy switching between different implementation methods for scheduled tasks.
Core Logic
Sample project repository: whuanle/HangfireDemo
The structure of the sample project is as follows:
HangfireServer is the implementation of the scheduled task service. The HangfireServer service only needs to expose two interfaces, addtask
and cancel
, which are used to add and cancel scheduled tasks, respectively. Regardless of the business service, all add tasks via the addtask
service.
DemoApi is the business service, which only needs to expose one execute
interface to trigger the scheduled task.
The basic logic is as follows:
graph LR
subgraph DemoApi
A[Define Command] -- Serialize parameters Command --> AA[Send Scheduled Task]
E[DemoApi: execute interface] --> F[DemoApi: Execute Command]
end
subgraph Hangfire
B[addtask] --> C[Hangfire: Store Task]
C --> D[Hangfire: Execute Task]
D --> DD[Initiate Request]
end
%% Establish necessary connections simultaneously
AA -- Add Scheduled Task --> B
DD -- Request --> E
Since the project uses the MediatR framework to implement CQRS patterns, it is straightforward to achieve dynamic invocation of scheduled task code. You only need to send scheduled task commands for the Command that is to be executed.
For example, you have the following Commands that need to be executed by the scheduled task:
ACommand
BCommand
CCommand
These commands will first be serialized into JSON and sent to the HangfireServer service. At the appropriate time, HangfireServer will push the parameters unchanged to the DemoApi service. The DemoApi service will receive these parameters, deserialize them into the corresponding types, and then send the command via MediatR, thereby achieving dynamic invocation of any command as a scheduled task.
Next, we will implement the HangfireServer and DemoApi services.
Add the following files to the Shared project.
Among them, the TaskRequest content is as follows, while other files should refer to the sample project.
public class TaskRequest
{
/// <summary>
/// Task ID.
/// </summary>
public string TaskId { get; set; } = "";
/// <summary>
/// The service address or service name that the scheduled task needs to request.
/// </summary>
public string ServiceName { get; set; } = "";
/// <summary>
/// The name of the type of parameters.
/// </summary>
public string CommandType { get; set; } = "";
/// <summary>
/// The content of the request parameters, serialized as a JSON string.
/// </summary>
public string CommandBody { get; set; } = "";
/// <summary>
/// Cron expression.
/// </summary>
public string CronExpression { get; set; } = "";
/// <summary>
/// Creation time.
/// </summary>
public string CreateTime { get; set; } = "";
}
Using Redis to Implement Second-Level Scheduled Tasks
Hangfire's configuration itself can be quite complex, and its distributed implementation places high performance demands on databases; therefore, using databases like MySQL or SQL Server to store data could introduce significant pressure. For this reason, and considering the need for implementing second-level scheduled tasks, NoSQL databases can fulfill this requirement better. Here, I use Redis to store task data.
The structure of the HangfireServer project is as follows:
![image-20250418094109409]()
The design of HangfireServer is mainly divided into several steps:
- Hangfire supports container management;
- Configure Hangfire;
- Define RecurringJobHandler to execute tasks and send HTTP requests to the business system;
- Define HTTP interfaces to receive scheduled tasks;
Here are the libraries to be introduced:
<PackageReference Include="Hangfire.AspNetCore" Version="1.8.18" />
<PackageReference Include="Hangfire.Redis.StackExchange" Version="1.12.0" />
First, regarding the configuration of Hangfire itself, most modern designs are based on dependency injection rather than static types. Therefore, we need to create service instances for scheduled task execution, ensuring that each scheduled task request occurs in a new container within a new context.
Step One
Create the HangfireJobActivatorScope and HangfireActivator files to allow Hangfire to support container context.
/// <summary>
/// Task container.
/// </summary>
public class HangfireJobActivatorScope : JobActivatorScope
{
private readonly IServiceScope _serviceScope;
private readonly string _jobId;
/// <summary>
/// Initializes a new instance of the <see cref="HangfireJobActivatorScope"/> class.
/// </summary>
/// <param name="serviceScope"></param>
/// <param name="jobId"></param>
public HangfireJobActivatorScope([NotNull] IServiceScope serviceScope, string jobId)
{
_serviceScope = serviceScope ?? throw new ArgumentNullException(nameof(serviceScope));
_jobId = jobId;
}
/// <inheritdoc/>
public override object Resolve(Type type)
{
var res = ActivatorUtilities.GetServiceOrCreateInstance(_serviceScope.ServiceProvider, type);
return res;
}
/// <inheritdoc/>
public override void DisposeScope()
{
_serviceScope.Dispose();
}
}
/// <summary>
/// JobActivator.
/// </summary>
public class HangfireActivator : JobActivator
{
private readonly IServiceScopeFactory _serviceScopeFactory;
/// <summary>
/// Initializes a new instance of the <see cref="HangfireActivator"/> class.
/// </summary>
/// <param name="serviceScopeFactory"></param>
public HangfireActivator(IServiceScopeFactory serviceScopeFactory)
{
_serviceScopeFactory = serviceScopeFactory ?? throw new ArgumentNullException(nameof(serviceScopeFactory));
}
/// <inheritdoc/>
public override JobActivatorScope BeginScope(JobActivatorContext context)
{
return new HangfireJobActivatorScope(_serviceScopeFactory.CreateScope(), context.BackgroundJob.Id);
}
}
Step Two
Configure the Hangfire service to support Redis and configure various parameters.
private void ConfigureHangfire(IServiceCollection services)
{
var options =
new RedisStorageOptions
{
// Configure Redis prefix; each task instance will create a key
Prefix = "aaa:aaa:hangfire",
};
services.AddHangfire(
config =>
{
config.UseRedisStorage("{redis connection string}", options)
.SetDataCompatibilityLevel(CompatibilityLevel.Version_180)
.UseSimpleAssemblyNameTypeSerializer()
.UseRecommendedSerializerSettings();
config.UseActivator(new HangfireActivator(services.BuildServiceProvider().GetRequiredService<IServiceScopeFactory>()));
});
services.AddHangfireServer(options =>
{
// Note: a very small interval must be set here
options.SchedulePollingInterval = TimeSpan.FromSeconds(1);
// If considering the possibility of later tasks being numerous, this parameter may need to be increased
options.WorkerCount = 50;
});
}
Step Three
Implement the RecurringJobHandler to execute scheduled tasks and initiate HTTP requests to the business system.
The called party needs to return a TaskInterfaceResponse type. This is mainly considered so that if the called party no longer requires continuing this scheduled task, returning the parameter CancelTask = true
allows the scheduled task service to directly cancel subsequent tasks without requiring the called party to manually call the cancellation interface.
public class RecurringJobHandler
{
private readonly IServiceProvider _serviceProvider;
public RecurringJobHandler(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
/// <summary>
/// Executes the task.
/// </summary>
/// <param name="taskRequest"></param>
/// <returns>Task.</returns>
public async Task Handler(TaskRequest taskRequest)
{
var ioc = _serviceProvider;
var recurringJobManager = ioc.GetRequiredService<IRecurringJobManager>();
var httpClientFactory = ioc.GetRequiredService<IHttpClientFactory>();
var logger = ioc.GetRequiredService<ILogger<RecurringJobHandler>>();
using var httpClient = httpClientFactory.CreateClient(taskRequest.ServiceName);
// Regardless of whether the request is successful, this instance of the task is considered complete
try
{
// Request the sub-system's interface
var response = await httpClient.PostAsJsonAsync(taskRequest.ServiceName, taskRequest);
var executeResult = await response.Content.ReadFromJsonAsync<ExecuteTasResult>();
// The called party requests to cancel the task
if (executeResult != null && executeResult.CancelTask)
{
recurringJobManager.RemoveIfExists(taskRequest.TaskId);
}
}
catch (Exception ex)
{
logger.LogError(ex, "Task error.");
}
}
}
Step Four
After configuring Hangfire, start considering how to receive tasks and initiate requests. First, define an HTTP interface or a gRPC interface.
[ApiController]
[Route("/execute")]
public class HangfireController : ControllerBase
{
private readonly IRecurringJobManager _recurringJobManager;
public HangfireController(IRecurringJobManager recurringJobManager)
{
_recurringJobManager = recurringJobManager;
}
[HttpPost("addtask")]
public async Task<TaskResponse> AddTask(TaskRequest value)
{
await Task.CompletedTask;
_recurringJobManager.AddOrUpdate<RecurringJobHandler>(
value.TaskId,
task => task.Handler(value),
cronExpression: value.CronExpression,
options: new RecurringJobOptions
{
});
return new TaskResponse { };
}
[HttpPost("cancel")]
public async Task<TaskResponse> Cancel(CancelTaskRequest value)
{
await Task.CompletedTask;
_recurringJobManager.RemoveIfExists(value.TaskId);
return new TaskResponse
{
};
}
}
Dynamic Code Implementation for Business Services
The business service only needs to expose an execute
interface to the HangfireServer. The DemoApi serializes the Command and wraps it as a request parameter to send to the HangfireServer, and then the HangfireServer forwards the parameters directly to the execute
interface.
![image-20250418095553964]()
The main design process for DemoApi is as follows:
- Define the SendHangfireService service, which wraps Command data and some scheduled task parameters and sends them to the HangfireServer via HTTP;
- Define the ExecuteTaskHandler, which implements deserialization of parameters and sends the Command using MediatR when the API is triggered, achieving dynamic execution;
- Define the ExecuteController interface, which receives requests from the HangfireServer and calls the ExecuteTaskHandler to process those requests;
The libraries imported by DemoApi are as follows:
<PackageReference Include="Maomi.Core" Version="2.2.0" />
<PackageReference Include="MediatR" Version="12.5.0" />
Maomi.Core is a modular and automatic service registration framework.
Step One
Define the SendHangfireService service, which wraps Command data and some scheduled task parameters and sends them to the HangfireServer via HTTP.
When receiving requests from the HangfireServer, it is necessary to look up the Type using a string, which requires the DemoApi to automatically scan the assemblies and cache the corresponding types upon startup.
In order to differentiate scheduled task commands from other Commands, a unified abstraction needs to be defined. Of course, it is also possible to handle this using attribute annotations.
/// <summary>
/// Abstract parameter for scheduled tasks.
/// </summary>
public abstract class HangfireRequest : IRequest<HangfireResponse>
{
/// <summary>
/// ID of the scheduled task.
/// </summary>
public string TaskId { get; init; } = string.Empty;
/// <summary>
/// Creation time of the task.
/// </summary>
public DateTimeOffset CreateTime { get; init; }
}
Define HangfireTypeFactory to enable quick lookup of Type by string.
/// <summary>
/// Records command types in CQRS for quick lookup of Type by string.
/// </summary>
public class HangfireTypeFactory
{
private readonly ConcurrentDictionary<string, Type> _typeDictionary;
public HangfireTypeFactory()
{
_typeDictionary = new ConcurrentDictionary<string, Type>();
}
public void Add(Type type)
{
if (!_typeDictionary.ContainsKey(type.Name))
{
_typeDictionary[type.Name] = type;
}
}
public Type? Get(string typeName)
{
if (_typeDictionary.TryGetValue(typeName, out var type))
{
return type;
}
return _typeDictionary.FirstOrDefault(x => x.Value.FullName == typeName).Value;
}
}
Finally, implement the SendHangfireService service that is capable of wrapping parameters and sending them to the HangfireServer.
Of course, CQRS can also be used for processing.
/// <summary>
/// Scheduled task service for sending scheduled task requests.
/// </summary>
[InjectOnScoped]
public class SendHangfireService
{
private static readonly JsonSerializerOptions JsonOptions = new JsonSerializerOptions
{
AllowTrailingCommas = true,
PropertyNameCaseInsensitive = true,
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
ReadCommentHandling = JsonCommentHandling.Skip
};
private readonly IHttpClientFactory _httpClientFactory;
public SendHangfireService(IHttpClientFactory httpClientFactory)
{
_httpClientFactory = httpClientFactory;
}
/// <summary>
/// Sends a scheduled task request.
/// </summary>
/// <typeparam name="TCommand"></typeparam>
/// <param name="request"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="TypeLoadException"></exception>
public async Task Send<TCommand>(TCommand request)
where TCommand : HangfireRequest
{
using var httpClient = _httpClientFactory.CreateClient();
var taskRequest = new TaskRequest
{
TaskId = request.TaskId,
CommandBody = JsonSerializer.Serialize(request, JsonOptions),
ServiceName = "http://127.0.0.1:5000/hangfire/execute",
CommandType = typeof(TCommand).Name ?? throw new TypeLoadException(typeof(TCommand).Name),
CreateTime = request.CreateTime.ToUnixTimeMilliseconds().ToString(),
CronExpression = request.CronExpression,
};
_ = await httpClient.PostAsJsonAsync("http://127.0.0.1:5001/execute/addtask", taskRequest);
}
/// <summary>
/// Cancels a scheduled task.
/// </summary>
/// <param name="taskId"></param>
/// <returns></returns>
public async Task Cancel(string taskId)
{
using var httpClient = _httpClientFactory.CreateClient();
_ = await httpClient.PostAsJsonAsync("http://127.0.0.1:5001/hangfire/cancel", new CancelTaskRequest
{
TaskId = taskId
});
}
}
Step Two
To dynamically execute a Command by Type, the idea is relatively simple, and it does not require complicated approaches like expression trees.
The author's implementation idea is as follows: define the ExecuteTaskHandler generic class that directly triggers the Command in a strongly typed manner. However, to encapsulate the complexity of generic types in code invocation, it is necessary to abstract an interface IHangfireTaskHandler to hide the generic.
/// <summary>
/// Defines an abstraction for executing tasks to facilitate ignoring generic processing.
/// </summary>
public interface IHangfireTaskHandler
{
/// <summary>
/// Executes the task.
/// </summary>
/// <param name="taskRequest"></param>
/// <returns></returns>
Task<ExecteTasResult> Handler(TaskRequest taskRequest);
}
/// <summary>
/// Used for deserializing parameters and sending the Command.
/// </summary>
/// <typeparam name="TCommand">Command.</typeparam>
public class ExecuteTaskHandler<TCommand> : IHangfireTaskHandler
where TCommand : HangfireRequest, IRequest<ExecteTasResult>
{
private readonly IMediator _mediator;
/// <summary>
/// Initializes a new instance of the <see cref="ExecuteTaskHandler{TCommand}"/> class.
/// </summary>
/// <param name="mediator"></param>
public ExecuteTaskHandler(IMediator mediator)
{
_mediator = mediator;
}
private static readonly JsonSerializerOptions JsonSerializerOptions = new JsonSerializerOptions
{
AllowTrailingCommas = true,
PropertyNameCaseInsensitive = true,
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
ReadCommentHandling = JsonCommentHandling.Skip
};
/// <inheritdoc/>
public async Task<ExecteTasResult> Handler(TaskRequest taskRequest)
{
var command = JsonSerializer.Deserialize<TCommand>(taskRequest.CommandBody, JsonSerializerOptions)!;
if (command == null)
{
throw new Exception("Failed to parse command parameters");
}
// Logic to process the command
var response = await _mediator.Send(command);
return response;
}
}
Step Three
Implement the scheduled task execute
trigger interface, then forward the parameters to the ExecuteTaskHandler, where the strong typing issues are resolved via dependency injection.
/// <summary>
/// Entry point for triggering scheduled tasks.
/// </summary>
[ApiController]
[Route("/hangfire")]
public class ExecuteController : ControllerBase
{
private readonly IServiceProvider _serviceProvider;
private readonly HangfireTypeFactory _hangireTypeFactory;
public ExecuteController(IServiceProvider serviceProvider, HangfireTypeFactory hangireTypeFactory)
{
_serviceProvider = serviceProvider;
_hangireTypeFactory = hangireTypeFactory;
}
[HttpPost("execute")]
public async Task<ExecteTasResult> ExecuteTask([FromBody] TaskRequest request)
{
var commandType = _hangireTypeFactory.Get(request.CommandType);
// If the event type is not found, cancel subsequent event execution
if (commandType == null)
{
return new ExecteTasResult
{
CancelTask = true
};
}
var commandTypeHandler = typeof(ExecuteTaskHandler<>).MakeGenericType(commandType);
var handler = _serviceProvider.GetService(commandTypeHandler) as IHangfireTaskHandler;
if(handler == null)
{
return new ExecteTasResult
{
CancelTask = true
};
}
return await handler.Handler(request);
}
}
Step Four
Once the code is encapsulated, the last environment configuration and service registration can begin. Since the author uses the Maomi.Core framework, service registration configuration and assembly scanning become very simple and can be easily achieved through the interfaces provided by the Maomi.Core framework.
public class ApiModule : Maomi.ModuleCore, IModule
{
private readonly HangireTypeFactory _hangireTypeFactory;
public ApiModule()
{
_hangireTypeFactory = new HangireTypeFactory();
}
public override void ConfigureServices(ServiceContext context)
{
context.Services.AddTransient(typeof(ExecuteTaskHandler<>));
context.Services.AddSingleton(_hangireTypeFactory);
context.Services.AddHttpClient();
context.Services.AddMediatR(o =>
{
o.RegisterServicesFromAssemblies(context.Modules.Select(x => x.Assembly).ToArray());
});
}
public override void TypeFilter(Type type)
{
if (!type.IsClass || type.IsAbstract)
{
return;
}
if (type.IsAssignableTo(typeof(HangfireRequest)))
{
_hangireTypeFactory.Add(type);
}
}
}
Step Five
Developers can write scheduled task commands and executors like this, and then trigger the scheduled tasks through the interface.
public class MyTestRequest : HangfireRequest, IRequest<ExecteTasResult>
{
}
/// <summary>
/// Code to be executed by the scheduled task.
/// </summary>
public class MyTestHandler : IRequestHandler<MyTestRequest, ExecteTasResult>
{
private static volatile int _count;
private static DateTimeOffset _lastTime;
public async Task<ExecteTasResult> Handle(MyTestRequest request, CancellationToken cancellationToken)
{
_count++;
if (_lastTime == default)
{
_lastTime = DateTimeOffset.Now;
}
Console.WriteLine($@"
Execution Time: {DateTimeOffset.Now.ToString("HH:mm:ss.ffff")}
Execution Frequency (every 10s): {(_count / (DateTimeOffset.Now - _lastTime).TotalSeconds * 10)}
");
return new ExecteTasResult
{
CancelTask = false
};
}
}
[ApiController]
[Route("/test")]
public class SendTaskController : ControllerBase
{
private readonly SendHangfireService _hangfireService;
public SendTaskController(SendHangfireService hangfireService)
{
_hangfireService = hangfireService;
}
[HttpGet("aaa")]
public async Task<string> SendAsync()
{
await _hangfireService.Send(new MyTestRequest
{
CreateTime = DateTimeOffset.Now,
CronExpression = "* * * * * *",
TaskId = Guid.NewGuid().ToString(),
});
return "aaa";
}
}
Finally
Start the project to test the code and record the execution frequency and time intervals.
![image-20250418103509714]()
![Animation]()
文章评论