内容目录
在 Mysql 中,批量插入自增列,是不能批量返回自增后的 Id,为了解决批量插入的问题,利用 Redis 的原子操作,实现无锁原子分配 自增 Id。
核心是在 Redis 中,保存表的最大 Id。
每次插入前,检查缓存 CacheId 跟数据库 MaxId 相比,如果 CacheId > MaxId,说明 CacheId 可以使用。这个阶段需要保证原子性。
插入前,需要向 Redis 申请获取一个范围的 Id,然后插入到数据库中。
定义接口:
/// <summary>
/// 自增列批量插入服务
/// </summary>
/// <typeparam name="TEntity"></typeparam>
/// <typeparam name="TId"></typeparam>
public interface IAutoIncrement<TEntity, TId> where TEntity : class, new()
where TId : struct
{
/// <summary>
/// 批量插入实体并设置 Id
/// </summary>
/// <param name="entities"></param>
/// <returns></returns>
Task<bool> InsertRangeAsync(IEnumerable<TEntity> entities);
}
// 非竞争自增id批量处理的实现
public class AutoIncrementService<TEntity, TId> : IAutoIncrement<TEntity, TId>
where TEntity : class, new()
where TId : struct
{
private readonly RedisClient _redisClient;
private readonly BasicRepository<TEntity> _repository;
private readonly Expression<Func<TEntity, TId>> _expression;
private const string BaseKey = "{0}:auto_increment:{1}";
private readonly string Key;
private readonly PropertyInfo _propertyInfo;
/// <summary>
///
/// </summary>
/// <param name="redisClient"></param>
/// <param name="repository"></param>
/// <param name="expression"></param>
public AutoIncrementService(RedisClient redisClient, BasicRepository<TEntity> repository, Expression<Func<TEntity, TId>> expression)
{
_redisClient = redisClient;
_repository = repository;
_expression = expression;
// {应用名称}:auto_increment:{实体名称}
Key = string.Format(BaseKey, AutoIncrementExtension.BasePrefix, nameof(TEntity).ToLower());
MemberExpression member = (expression.Body as MemberExpression)!;
_propertyInfo = (member.Member as PropertyInfo)!;
}
// Redis 缓存检查
private async Task CheckIdAsync()
{
var typeCode = Type.GetTypeCode(typeof(TId));
// 查询 Redis ,是否存在此表的自增 ID
var result = await _redisClient.GetAsync(Key);
// 获取在数据库中当前实体自增列的最大值
var maxId = await _repository.Context.Queryable<TEntity>().MaxAsync(_expression);
object nextId = GetValue();
// 已存在缓存,Redis 缓存与 数据库自增列一致性检查
if (!string.IsNullOrEmpty(result) && long.TryParse(result, out var cacheId))
{
// 一致
if (cacheId > Unsafe.As<TId, Int64>(ref maxId))
{
return;
}
// 不一致,删除它
else
{
await _redisClient.DelAsync(Key);
}
}
// 设置缓存,此缓存就算被其他线程抢占也没关系
await _redisClient.SetAsync(Key, nextId, TimeSpan.FromHours(10), RedisExistence.Nx);
object GetValue()
{
switch (typeCode)
{
case TypeCode.Int32: return Unsafe.As<TId, Int32>(ref maxId) + 1;
case TypeCode.UInt32: return Unsafe.As<TId, UInt32>(ref maxId) + 1;;
case TypeCode.Int64: return Unsafe.As<TId, Int64>(ref maxId) + 1;
case TypeCode.UInt64: return Unsafe.As<TId, UInt64>(ref maxId) + 1;
default: throw new NotSupportedException($"不支持此类型做自增列:{typeof(TId)}");
}
}
}
/// <inheritdoc/>
public async Task<bool> InsertRangeAsync(IEnumerable<TEntity> entities)
{
// 检查器
await CheckIdAsync();
// 要插入的实体数量
var count = entities.Count();
// 申请 count 个 Id,将 id 范围记录下来
// 原来 0,申请 5+1, end = 6, start = 1
// 原来 6,申请 5+1, end = 12, start = 7
var endIndex = await _redisClient.IncrByAsync(Key, count + 1);
var startIndex = endIndex - count;
// 为每个实体生成自增 id
long index = startIndex;
foreach (var item in entities)
{
var tid = Unsafe.As<long, TId>(ref index);
_propertyInfo.SetValue(item, tid);
index++;
}
try
{
var result = await _repository.InsertRangeAsync(entities.ToArray());
if (result == false)
{
ResetId();
}
return result;
}
catch
{
ResetId();
throw;
}
// 插入失败,重置 id 为 0
void ResetId()
{
foreach (var item in entities)
{
var tid = default(TId);
_propertyInfo.SetValue(item, tid);
}
}
}
}
为了方便使用,通过工厂模式创建:
public static class AutoIncrementExtension
{
internal static string BasePrefix = "";
/// <summary>
/// 注入自增服务
/// </summary>
/// <param name="services"></param>
/// <param name="basePrefix">缓存前缀</param>
/// <returns></returns>
public static IServiceCollection AddAutoIncrement(this IServiceCollection services, string basePrefix)
{
BasePrefix = basePrefix.ToLower();
return services.AddScoped<AutoIncrementFactory>(s => new AutoIncrementFactory(s));
}
}
/// <summary>
/// 实体自增插入创建工厂
/// </summary>
public class AutoIncrementFactory
{
private readonly IServiceProvider _serviceProvider;
internal AutoIncrementFactory(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
/// <summary>
/// 创建自增工厂
/// </summary>
/// <typeparam name="TEntity"></typeparam>
/// <typeparam name="TId"></typeparam>
/// <param name="expression"></param>
/// <returns></returns>
public AutoIncrementService<TEntity, TId> Create<TEntity, TId>(Expression<Func<TEntity, TId>> expression) where TEntity : class, new() where TId : struct
{
var redisClient = _serviceProvider.GetRequiredService<RedisClient>();
var repository = _serviceProvider.GetRequiredService<BasicRepository<TEntity>>();
return new AutoIncrementService<TEntity, TId>(redisClient, repository, expression);
}
}
文章评论