In MySQL, batch inserting auto-increment columns cannot return the auto-incremented IDs in bulk. To solve the problem of batch insertion, we use atomic operations in Redis to implement a lock-free atomic allocation of auto-incremented IDs.
The core idea is to store the maximum ID of the table in Redis. Before each insert, we check if the cached CacheId
is greater than the database MaxId
. If CacheId > MaxId
, it indicates that CacheId
can be used. This stage must ensure atomicity.
Before inserting, a range of IDs needs to be requested from Redis, which will then be inserted into the database.
Define the interface:
/// <summary>
/// Auto-increment column batch insertion service
/// </summary>
/// <typeparam name="TEntity"></typeparam>
/// <typeparam name="TId"></typeparam>
public interface IAutoIncrement<TEntity, TId> where TEntity : class, new()
where TId : struct
{
/// <summary>
/// Batch insert entities and set Id
/// </summary>
/// <param name="entities"></param>
/// <returns></returns>
Task<bool> InsertRangeAsync(IEnumerable<TEntity> entities);
}
// Implementation of non-competitive auto-increment ID batch processing
public class AutoIncrementService<TEntity, TId> : IAutoIncrement<TEntity, TId>
where TEntity : class, new()
where TId : struct
{
private readonly RedisClient _redisClient;
private readonly BasicRepository<TEntity> _repository;
private readonly Expression<Func<TEntity, TId>> _expression;
private const string BaseKey = "{0}:auto_increment:{1}";
private readonly string Key;
private readonly PropertyInfo _propertyInfo;
/// <summary>
///
/// </summary>
/// <param name="redisClient"></param>
/// <param name="repository"></param>
/// <param name="expression"></param>
public AutoIncrementService(RedisClient redisClient, BasicRepository<TEntity> repository, Expression<Func<TEntity, TId>> expression)
{
_redisClient = redisClient;
_repository = repository;
_expression = expression;
// {应用名称}:auto_increment:{实体名称}
Key = string.Format(BaseKey, AutoIncrementExtension.BasePrefix, nameof(TEntity).ToLower());
MemberExpression member = (expression.Body as MemberExpression)!;
_propertyInfo = (member.Member as PropertyInfo)!;
}
// Redis cache check
private async Task CheckIdAsync()
{
var typeCode = Type.GetTypeCode(typeof(TId));
// Query Redis to check if the auto-increment ID exists for this table
var result = await _redisClient.GetAsync(Key);
// Get the current maximum value of the auto-increment column in the database
var maxId = await _repository.Context.Queryable<TEntity>().MaxAsync(_expression);
object nextId = GetValue();
// If cache already exists, check consistency between Redis cache and database auto-increment column
if (!string.IsNullOrEmpty(result) && long.TryParse(result, out var cacheId))
{
// Consistent
if (cacheId > Unsafe.As<TId, Int64>(ref maxId))
{
return;
}
// Inconsistent, delete it
else
{
await _redisClient.DelAsync(Key);
}
}
// Set cache, this cache can be occupied by other threads without concern
await _redisClient.SetAsync(Key, nextId, TimeSpan.FromHours(10), RedisExistence.Nx);
object GetValue()
{
switch (typeCode)
{
case TypeCode.Int32: return Unsafe.As<TId, Int32>(ref maxId) + 1;
case TypeCode.UInt32: return Unsafe.As<TId, UInt32>(ref maxId) + 1;
case TypeCode.Int64: return Unsafe.As<TId, Int64>(ref maxId) + 1;
case TypeCode.UInt64: return Unsafe.As<TId, UInt64>(ref maxId) + 1;
default: throw new NotSupportedException($"Unsupported type for auto-increment column: {typeof(TId)}");
}
}
}
/// <inheritdoc/>
public async Task<bool> InsertRangeAsync(IEnumerable<TEntity> entities)
{
// Checker
await CheckIdAsync();
// Number of entities to be inserted
var count = entities.Count();
// Request count of IDs and record the ID range
// Initially 0, request 5+1, end = 6, start = 1
// Initially 6, request 5+1, end = 12, start = 7
var endIndex = await _redisClient.IncrByAsync(Key, count + 1);
var startIndex = endIndex - count;
// Generate an auto-increment ID for each entity
long index = startIndex;
foreach (var item in entities)
{
var tid = Unsafe.As<long, TId>(ref index);
_propertyInfo.SetValue(item, tid);
index++;
}
try
{
var result = await _repository.InsertRangeAsync(entities.ToArray());
if (result == false)
{
ResetId();
}
return result;
}
catch
{
ResetId();
throw;
}
// If insertion fails, reset ID to 0
void ResetId()
{
foreach (var item in entities)
{
var tid = default(TId);
_propertyInfo.SetValue(item, tid);
}
}
}
}
For convenience, create it through the factory pattern:
public static class AutoIncrementExtension
{
internal static string BasePrefix = "";
/// <summary>
/// Inject auto-increment service
/// </summary>
/// <param name="services"></param>
/// <param name="basePrefix">Cache prefix</param>
/// <returns></returns>
public static IServiceCollection AddAutoIncrement(this IServiceCollection services, string basePrefix)
{
BasePrefix = basePrefix.ToLower();
return services.AddScoped<AutoIncrementFactory>(s => new AutoIncrementFactory(s));
}
}
/// <summary>
/// Entity auto-increment insertion factory
/// </summary>
public class AutoIncrementFactory
{
private readonly IServiceProvider _serviceProvider;
internal AutoIncrementFactory(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
/// <summary>
/// Create auto-increment factory
/// </summary>
/// <typeparam name="TEntity"></typeparam>
/// <typeparam name="TId"></typeparam>
/// <param name="expression"></param>
/// <returns></returns>
public AutoIncrementService<TEntity, TId> Create<TEntity, TId>(Expression<Func<TEntity, TId>> expression) where TEntity : class, new() where TId : struct
{
var redisClient = _serviceProvider.GetRequiredService<RedisClient>();
var repository = _serviceProvider.GetRequiredService<BasicRepository<TEntity>>();
return new AutoIncrementService<TEntity, TId>(redisClient, repository, expression);
}
}
文章评论