文档说明
作者:痴者工良
仓库地址:https://github.com/whuanle/Maomi.MQ
作者博客:
导读
Maomi.MQ 是一个消息通讯模型项目,目前只支持了 RabbitMQ。
Maomi.MQ.RabbitMQ 是一个用于专为 RabbitMQ 设计的发布者和消费者通讯模型,大大简化了发布和消息的代码,并提供一系列简便和实用的功能,开发者可以通过框架提供的消费模型实现高性能消费、事件编排,框架还支持发布者确认机制、自定义重试机制、补偿机制、死信队列、延迟队列、连接通道复用等一系列的便利功能。开发者可以把更多的精力放到业务逻辑中,通过 Maomi.MQ.RabbitMQ 框架简化跨进程消息通讯模式,使得跨进程消息传递更加简单和可靠。
此外,框架通过 runtime 内置的 api 支持了分布式可观测性,可以通过进一步使用 OpenTelemetry 等框架进一步收集可观测性信息,推送到基础设施平台中。
快速开始
本文将快速介绍 Maomi.MQ.RabbitMQ 的使用方法。
引入 Maomi.MQ.RabbitMQ 包,在 Web 配置中注入服务:
builder.Services.AddSwaggerGen();
builder.Services.AddLogging();
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = "192.168.3.248";
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly]);
var app = builder.Build();
- WorkId: 指定用于生成分布式雪花 id 的节点 id,默认为 0。
- AppName:用于标识消息的生产者,以及在日志和链路追踪中标识消息的生产者或消费者。
- Rabbit:RabbitMQ 客户端配置,请参考 ConnectionFactory。
如果是控制台项目,则需要引入 Microsoft.Extensions.Hosting 包。
var host = new HostBuilder()
.ConfigureLogging(options =>
{
options.AddConsole();
options.AddDebug();
})
.ConfigureServices(services =>
{
services.AddMaomiMQ(options =>
{
options.WorkId = 1;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = "192.168.3.248";
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, new System.Reflection.Assembly[] { typeof(Program).Assembly });
// Your services.
services.AddHostedService<MyPublishAsync>();
}).Build();
await host.RunAsync();
定义消息模型类,该模型类将会被序列化为二进制内容传递到 RabbitMQ 服务器中。
public class TestEvent
{
public int Id { get; set; }
public override string ToString()
{
return Id.ToString();
}
}
定义消费者,消费者需要实现 IConsumer<TEvent>
接口,以及使用 [Consumer]
特性注解配置消费者属性。
[Consumer("test", Qos = 1, RetryFaildRequeue = true)]
public class MyConsumer : IConsumer<TestEvent>
{
private static int _retryCount = 0;
// 消费
public async Task ExecuteAsync(EventBody<TestEvent> message)
{
Console.WriteLine($"事件 id: {message.Id} {DateTime.Now}");
await Task.CompletedTask;
}
// 每次消费失败时执行
public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message) => Task.CompletedTask;
// 补偿
public Task<bool> FallbackAsync(EventBody<TestEvent>? message) => Task.FromResult(true);
}
然后注入 IMessagePublisher 服务发布消息:
[ApiController]
[Route("[controller]")]
public class IndexController : ControllerBase
{
private readonly IMessagePublisher _messagePublisher;
public IndexController(IMessagePublisher messagePublisher)
{
_messagePublisher = messagePublisher;
}
[HttpGet("publish")]
public async Task<string> Publisher()
{
// 发布消息
await _messagePublisher.PublishAsync(queue: "test", message: new TestEvent
{
Id = i
});
return "ok";
}
}
消息发布者
消息发布者用于推送消息到 RabbitMQ 服务器中。
通过注入 IMessagePublisher 接口即可向 RabbitMQ 推送消息,示例项目请参考 PublisherWeb。
定义一个事件模型类:
public class TestEvent
{
public int Id { get; set; }
public override string ToString()
{
return Id.ToString();
}
}
注入 IMessagePublisher 服务后发布消息:
[ApiController]
[Route("[controller]")]
public class IndexController : ControllerBase
{
private readonly IMessagePublisher _messagePublisher;
public IndexController(IMessagePublisher messagePublisher)
{
_messagePublisher = messagePublisher;
}
[HttpGet("publish")]
public async Task<string> Publisher()
{
for (var i = 0; i < 100; i++)
{
await _messagePublisher.PublishAsync(queue: "PublisherWeb", message: new TestEvent
{
Id = i
});
}
return "ok";
}
}
IMessagePublisher
IMessagePublisher 定义比较简单,只有三个方法和一个属性:
public ConnectionPool ConnectionPool { get; }
Task PublishAsync<TEvent>(string queue, TEvent message, Action<IBasicProperties>? properties = null)
where TEvent : class;
Task PublishAsync<TEvent>(string queue, TEvent message, IBasicProperties properties);
// 不建议直接使用该接口。
Task CustomPublishAsync<TEvent>(string queue, EventBody<TEvent> message, BasicProperties properties);
三个 PublishAsync 方法用于发布事件,ConnectionPool 属性用于获取 RabbitMQ.Client.IConnection 对象。
由于直接公开了 BasicProperties ,因此开发者完全自由配置 RabbitMQ 原生的消息属性,所以 Maomi.MQ.RabbitMQ 没必要过度设计,只提供了简单的功能接口。
例如,可以通过 BasicProperties 配置单条消息的过期时间:
await _messagePublisher.PublishAsync(queue: "RetryWeb", message: new TestEvent
{
Id = i
}, (BasicProperties p) =>
{
p.Expiration = "1000";
});
当发布一条消息时,实际上框架传递的是 EventBody<T>
类型,EventBody<T>
中包含了一些重要的附加消息属性,这些属性会给消息处理和故障诊断带来很大的方便。
public class EventBody<TEvent>
{
// 事件唯一 id.
public long Id { get; init; }
// Queue.
public string Queue { get; init; } = null!;
// App name.
public string Publisher { get; init; } = null!;
// 事件创建时间.
public DateTimeOffset CreationTime { get; init; }
// 事件体.
public TEvent Body { get; init; } = default!;
}
Maomi.MQ 通过 DefaultMessagePublisher 类型实现了 IMessagePublisher,DefaultMessagePublisher 默认生命周期是 Singleton:
services.AddSingleton<IMessagePublisher, DefaultMessagePublisher>();
生命周期不重要,如果需要修改默认的生命周期,可以手动修改替换。
services.AddScoped<IMessagePublisher, DefaultMessagePublisher>();
开发者也可以自行实现 IMessagePublisher 接口,具体示例请参考 DefaultMessagePublisher 类型。
连接池
为了复用 RabbitMQ.Client.IConnection ,Maomi.MQ.RabbitMQ 内部实现了 ConnectionPool 类型,通过对象池维护复用的 RabbitMQ.Client.IConnection 对象。
默认对象池中的 RabbitMQ.Client.IConnection 数量为 0,只有当连接被真正使用时才会从对象池委托中创建,连接对象会随着程序并发量而自动增加,但是,默认最大连接对象数量为 Environment.ProcessorCount * 2
。
除了 IMessagePublisher 接口提供的 PublishAsync 方法可以发布事件,开发者还可以从 ConnectionPool 获取连接对象,请务必在使用完毕后通过 ConnectionPool.Return()
方法将其归还到连接对象池。
通过连接池直接使用 IConnection 对象发布消息:
[HttpGet("publish")]
public async Task<string> Publisher()
{
for (var i = 0; i < 100; i++)
{
var connectionPool = _messagePublisher.ConnectionPool;
var connection = connectionPool.Get();
try
{
connection.Channel.BasicPublishAsync(
exchange: string.Empty,
routingKey: "queue",
basicProperties: properties,
body: _jsonSerializer.Serializer(message),
mandatory: true);
}
finally
{
connectionPool.Return(connection);
}
}
return "ok";
}
你也可以绕开 IMessagePublisher ,直接注入 ConnectionPool 使用 RabbitMQ 连接对象,但是不建议这样使用。
private readonly ConnectionPool _connectionPool;
public DefaultMessagePublisher(ConnectionPool connectionPool)
{
_connectionPool = connectionPool;
}
public async Task MyPublshAsync()
{
var connection = _connectionPool.Get();
try
{
await connection.Channel.BasicPublishAsync(...);
}
finally
{
_connectionPool.Return(connection);
}
}
为了更加简便地管理连接对象,可以使用 CreateAutoReturn()
函数创建连接管理对象,该对象被释放时会自动将 IConnection 返还给连接池。
using var poolObject = _messagePublisher.ConnectionPool.CreateAutoReturn();
poolObject.Channel.BasicPublishAsync(
exchange: string.Empty,
routingKey: "queue",
basicProperties: properties,
body: _jsonSerializer.Serializer(message),
mandatory: true);
如果你自行使用 ConnectionPool 推送消息到 RabbitMQ,请务必通过序列化 EventBody<TEvent>
事件对象,这样 Maomi.MQ.RabbitMQ 消费者才能正常工作。同时,Moami.MQ 对可观测性做了支持,如果自行使用 ConnectionPool 获取连接对象推送消息,可能会导致可观测性信息缺失。
正常情况下,RabbitMQ.Client 中包含了可观测性的功能,但是 Maomi.MQ.RabbitMQ 附加的可观测性信息有助于诊断故障问题。
请注意:
-
Maomi.MQ.RqbbitMQ 通过
EventBody<TEvent>
泛型对象发布和接收事件。 -
DefaultMessagePublisher 包含了链路追踪等可观测性代码。
消息过期
IMessagePublisher 对外开放了 BasicProperties 或 BasicProperties,可以自由配置消息属性。
例如为消息配置过期时间:
[HttpGet("publish")]
public async Task<string> Publisher()
{
for (var i = 0; i < 1; i++)
{
await _messagePublisher.PublishAsync(queue: "test", message: new TestEvent
{
Id = i
}, properties =>
{
properties.Expiration = "6000";
});
}
return "ok";
}
如果此时为 test
绑定死信队列,那么该消息长时间没有被消费时,会被移动到另一个队列,请参考 死信队列。
还可以通过配置消息属性实现更多的功能,请参考 IBasicProperties。
事务
RabbitMQ 支持事务,不过据 RabbitMQ 官方文档显示,事务会使吞吐量减少 250 倍。
RabbitMQ 事务使用上比较简单,可以保证发布的消息已经被推送到 RabbitMQ 服务器,只有当提交事务时,提交的消息才会被 RabbitMQ 存储并推送给消费者。
使用示例:
[HttpGet("publish_tran")]
public async Task<string> Publisher_Tran()
{
using var tranPublisher = await _messagePublisher.TxSelectAsync();
try
{
await tranPublisher.PublishAsync(queue: "publish_tran", message: new TestEvent
{
Id = 666
});
await tranPublisher.TxCommitAsync();
}
catch
{
await tranPublisher.TxRollbackAsync();
throw;
}
return "ok";
}
或者手动开启事务:
[HttpGet("publish_tran")]
public async Task<string> Publisher_Tran()
{
using var tranPublisher = _messagePublisher.CreateTransaction();
try
{
await tranPublisher.TxSelectAsync();
await tranPublisher.PublishAsync(queue: "publish_tran", message: new TestEvent
{
Id = 666
});
await tranPublisher.TxCommitAsync();
}
catch
{
await tranPublisher.TxRollbackAsync();
throw;
}
return "ok";
}
注意,在该种模式之下,创建 TransactionPublisher 对象时,会从对象池中取出一个连接对象,因为开启事务模式可能会污染当前连接通道,因此 TransactionPublisher 不会向连接池归还连接对象,而是直接释放。
发送方确认模式
虽然事务模式可以保证消息会被推送到 RabbitMQ 服务器中,但是由于事务模式会导致吞吐量降低 250 倍,因此不是一个好的选择。为了解决这个问题, RabbitMQ 引入了一种确认机制,这种机制就像滑动窗口,能够保证消息推送到服务器中,并且具备高性能的特性。
使用示例:
[HttpGet("publish_confirm")]
public async Task<string> Publisher_Confirm()
{
using var confirmPublisher = await _messagePublisher.ConfirmSelectAsync();
for (var i = 0; i < 5; i++)
{
await confirmPublisher.PublishAsync(queue: "publish_confirm1", message: new TestEvent
{
Id = 666
});
var result = await confirmPublisher.WaitForConfirmsAsync();
// 如果在超时内没有接收到 nacks,则为 True,否则为 false。
Console.WriteLine($"发布 {i},{result}");
}
return "ok";
}
WaitForConfirmsAsync
方法会返回一个值,如果正常被服务器确认了消息已经传达,则结果为 true,如果超时没有被服务器确认,则返回 false。
此外,还有一个 WaitForConfirmsOrDieAsync
方法,它会一直等待该频道上的所有已发布消息都得到确认,使用示例:
using var confirmPublisher = await _messagePublisher.ConfirmSelectAsync();
for (var i = 0; i < 5; i++)
{
await confirmPublisher.PublishAsync(queue: "publish_confirm1", message: new TestEvent
{
Id = 666
});
Console.WriteLine($"发布 {i}");
}
await confirmPublisher.WaitForConfirmsOrDieAsync();
注意,在该种模式之下,创建 ConfirmPublisher 对象时,会从对象池中取出一个连接对象,因为开启事务模式可能会污染当前连接通道,因此 ConfirmPublisher 不会向连接池归还连接对象,而是直接释放。
注意,同一个通道不能同时使用事务和发送方确认模式。
独占模式
默认情况下,每次使用 IMessagePublisher.PublishAsync()
发布消息时,都会从连接池中取出连接对象,然后使用该连接通道发布消息,发布完毕后就会归还连接对象给连接池。
如果需要在短时间内大批量发布消息,则需要每次都要重复获取和返还连接对象。
使用独占模式时可以在一段时间内独占一个连接对象,超出作用域后,连接对象会自动放回连接池。这种模式对于需要大量发布消息的场景提高吞吐量非常有帮助。为了能够将连接通道归还连接池,请务必使用 using
关键字修饰变量,或者手动调用 Dispose
函数。
使用示例:
// 创建独占模式
using var singlePublisher = _messagePublisher.CreateSingle();
for (var i = 0; i < 500; i++)
{
await singlePublisher.PublishAsync(queue: "publish_single", message: new TestEvent
{
Id = 666
});
}
消费者
Maomi.MQ.RabbitMQ 中,有两种消费模式,一种是消费者模式,一种是事件模式(事件总线模式)。
下面简单了解这两种模式的使用方法。
消费者模式
消费者服务需要实现 IConsumer<TEvent>
接口,并且配置 [Consumer("queue")]
特性绑定队列名称,通过消费者对象来控制消费行为。
消费者模式有具有失败通知和补偿能力,使用上也比较简单。
public class TestEvent
{
public int Id { get; set; }
}
[Consumer("PublisherWeb", Qos = 1, RetryFaildRequeue = true)]
public class MyConsumer : IConsumer<TestEvent>
{
private static int _retryCount = 0;
// 消费或重试
public async Task ExecuteAsync(EventBody<TestEvent> message)
{
_retryCount++;
Console.WriteLine($"执行次数:{_retryCount} 事件 id: {message.Id} {DateTime.Now}");
await Task.CompletedTask;
}
// 失败
public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message) => Task.CompletedTask;
// 补偿
public Task<bool> FallbackAsync(EventBody<TestEvent>? message) => Task.FromResult(true);
}
事件模式
事件模式是通过事件总线的方式实现的,以事件模型为中心,通过事件来控制消费行为。
[EventTopic("web2", Qos = 1, RetryFaildRequeue = true)]
public class TestEvent
{
public string Message { get; set; }
}
然后使用 [EventOrder]
特性编排事件执行顺序。
// 编排事件消费顺序
[EventOrder(0)]
public class My1EventEventHandler : IEventHandler<TestEvent>
{
public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
}
public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{@event.Id},事件 1 已被执行");
}
}
[EventOrder(1)]
public class My2EventEventHandler : IEventHandler<TestEvent>
{
public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
}
public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{@event.Id},事件 2 已被执行");
}
}
当然,事件模式也可以通过创建中间件增加补偿功能,通过中间件还可以将所有排序事件放到同一个事务中,一起成功或失败,避免事件执行时出现程序退出导致的一致性问题。
public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
private readonly BloggingContext _bloggingContext;
public TestEventMiddleware(BloggingContext bloggingContext)
{
_bloggingContext = bloggingContext;
}
public async Task ExecuteAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next)
{
using (var transaction = _bloggingContext.Database.BeginTransaction())
{
await next(@event, CancellationToken.None);
await transaction.CommitAsync();
}
}
public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
{
return Task.CompletedTask;
}
public Task<bool> FallbackAsync(EventBody<TestEvent>? message)
{
return Task.FromResult(true);
}
}
分组
消费者模式和事件模式都可以设置分组,在特性上设置了 Group
属性,具有同一个分组的事件会被放到一个连接通道(RabbitMQ.Client.IConnection
)中,对于消费频率不高的事件,复用连接通道可以有效较低资源消耗。
消费者模式分组示例:
[Consumer("ConsumerWeb_group_1", Qos = 1, Group = "group")]
public class Group_1_Consumer : IConsumer<GroupEvent>
{
public Task ExecuteAsync(EventBody<GroupEvent> message) => Task.CompletedTask;
public Task FaildAsync(Exception ex, int retryCount, EventBody<GroupEvent>? message) => Task.CompletedTask;
public Task<bool> FallbackAsync(EventBody<GroupEvent>? message) => Task.FromResult(true);
}
[Consumer("ConsumerWeb_group_2", Qos = 1, Group = "group")]
public class Group_2_Consumer : IConsumer<GroupEvent>
{
public Task ExecuteAsync(EventBody<GroupEvent> message) => Task.CompletedTask;
public Task FaildAsync(Exception ex, int retryCount, EventBody<GroupEvent>? message) => Task.CompletedTask;
public Task<bool> FallbackAsync(EventBody<GroupEvent>? message) => Task.FromResult(true);
}
事件总线模式分组示例:
[EventTopic("web1", Qos = 1, RetryFaildRequeue = true, Group = "group")]
public class Test1Event
{
public string Message { get; set; }
}
[EventTopic("web2", Qos = 1, RetryFaildRequeue = true, Group = "group")]
public class Test2Event
{
public string Message { get; set; }
}
消费者模式
消费者模式要求服务实现 IConsumer<TEvent>
接口,并添加 [Connsumer]
特性。
IConsumer<TEvent>
接口比较简单,其定义如下:
public interface IConsumer<TEvent>
where TEvent : class
{
// 消息处理.
public Task ExecuteAsync(EventBody<TEvent> message);
// ExecuteAsync 异常后立即执行此代码.
public Task FaildAsync(Exception ex, int retryCount, EventBody<TEvent>? message);
// 最后一次重试失败时执行,用于补偿.
public Task<bool> FallbackAsync(EventBody<TEvent>? message);
}
使用消费者模式时,需要先定义一个模型类,用于发布者和消费者之间传递消息,事件模型类只要是类即可,能够正常序列化和反序列化,没有其它要求。
public class TestEvent
{
public int Id { get; set; }
public override string ToString()
{
return Id.ToString();
}
}
然后继承 IConsumer<TEvent>
接口实现消费者功能:
[Consumer("web1", Qos = 1)]
public class MyConsumer : IConsumer<TestEvent>
{
// 消费
public async Task ExecuteAsync(EventBody<TestEvent> message)
{
Console.WriteLine(message.Body.Id);
}
// 每次失败时被执行
public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
{
Console.WriteLine($"重试 {message.Body.Id},次数 {retryCount}");
await Task.CompletedTask;
}
// 最后一次失败时执行
public async Task<bool> FallbackAsync(EventBody<TestEvent>? message)
{
Console.WriteLine($"最后一次 {message.Body.Id}");
// 如果返回 true,说明补偿成功。
return true;
}
}
特性配置的说明请参考 消费者配置 。
消费、重试和补偿
消费者收到服务器推送的消息时,ExecuteAsync
方法会被自动执行。当 ExecuteAsync
执行异常时,FaildAsync
方法会马上被触发,开发者可以利用 FaildAsync
记录相关日志信息。
// 每次失败时被执行
public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
{
// 当 retryCount == -1 时,错误并非是 ExecuteAsync 方法导致的
if (retryCount == -1)
{
_logger.LogError(ex, "Consumer error,event id: {Id}", message?.Id);
// 可以在此处添加告警通知代码
await Task.Delay(1000);
}
else
{
_logger.LogError(ex, "Consumer exception,event id: {Id},retry count: {retryCount}", message!.Id, retryCount);
}
}
如果 FaildAsync
方法也出现异常时,不会影响整体流程,框架会等待到达间隔时间后继续重试 ExecuteAsync
方法。
建议 FaildAsync
使用 try{}cathc{}
套住代码,不要对外抛出异常,FaildAsync
的逻辑不要包含太多逻辑,并且 FaildAsync
只应记录日志或进行告警使用。
FaildAsync
被执行有一个额外情况,就是在消费消息之前就已经发生错误,例如一个事件模型类有构造函数导致不能被反序列化,这个时候 FaildAsync
会被立即执行,且 retryCount = -1
。
当 ExecuteAsync
方法执行异常时,框架会自动重试,默认会重试五次,如果五次都失败,则会执行 FallbackAsync
方法进行补偿。
重试间隔时间会逐渐增大,请参考 重试。
当重试五次之后,就会立即启动补偿机制。
// 最后一次失败时执行
public async Task<bool> FallbackAsync(EventBody<TestEvent>? message)
{
return true;
}
FallbackAsync
方法需要返回 bool,如果返回 true
,表示虽然 ExecuteAsync
出现异常,但是 FallbackAsync
补偿后已经正常,该消息会被正常消费掉。如果返回 false
,则说补偿失败,该消息按照消费失败处理。
只有 ExecuteAsync
异常时,才会触发 FaildAsync
和 FallbackAsync
,如果是在处理消息之前的异常,会直接失败。
消费失败
当 ExecuteAsync
失败次数达到阈值时,并且 FallbackAsync
返回 false
,则该条消息消费失败,或者由于序列化等错误时直接失败。
在 [Consumer]
特性中有三个很重要的配置:
public class ConsumerAttribute : Attribute
{
// 消费失败次数达到条件时,是否放回队列.
public bool RetryFaildRequeue { get; set; }
// 现异常时是否放回队列,例如序列化错误等原因导致的,而不是消费时发生异常导致的.
public bool ExecptionRequeue { get; set; } = true;
// 绑定死信队列.
public string? DeadQueue { get; set; }
}
当 ExecuteAsync
失败次数达到阈值时,并且 FallbackAsync
返回 false
,则该条消息消费失败。
如果 RetryFaildRequeue == false
,那么该条消息会被 RabbitMQ 丢弃。
如果绑定了死信队列,则会先推送到死信队列,接着再丢弃。
如果 RetryFaildRequeue == true
,那么该条消息会被返回 RabbbitMQ 队列中,等待下一次消费。
由于消息失败后会被放回队列,因此绑定的死信队列不会收到该消息。
当序列化异常或者其它问题导致错误而不能进入 ExecuteAsync
方法时,FaildAsync
方法会首先被触发一次,此时 retryCount 参数值为 -1
。
出现此种问题时,一般是开发者 bug 导致的,不会进行补偿等操作,开发者可以在 FaildAsync
中处理该事件,记录相关日志信息。
// 每次失败时被执行,或者出现无法进入 ExecuteAsync 的异常
public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
{
// 当 retryCount == -1 时,错误并非是 ExecuteAsync 方法导致的
if (retryCount == -1)
{
_logger.LogError(ex, "Consumer error,event id: {Id}", message?.Id);
// 可以在此处添加告警通知代码
await Task.Delay(1000);
}
else
{
_logger.LogError(ex, "Consumer exception,event id: {Id},retry count: {retryCount}", message!.Id, retryCount);
}
}
由于这种情况不妥善处理,会导致消息丢失,因此框架默认将 ExecptionRequeue
设置为 true
,也就是说出现这种异常时,消息会被放回队列。如果问题一致没有得到解决,则会出现循环:调用 FaildAsync
、放回队列、调用 FaildAsync
、放回队列... ...
所以应该在 FaildAsync
中添加代码通知开发者相关信息,并且设置间隔时间,避免重试太频繁。
自动创建队列
框架默认会自动创建队列,如果需要关闭自动创建功能,把 AutoQueueDeclare
设置为 false
即可。
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AppName = "myapp";
options.AutoQueueDeclare = false;
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = "192.168.3.248";
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly]);
当然还可以单独为消费者配置是否自动创建队列:
[Consumer("ConsumerWeb_create", AutoQueueDeclare = AutoQueueDeclare.Enable)]
默认情况下,关闭了全局自动创建,则不会自动创建队列。
如果关闭全局自动创建,但是消费者配置了 AutoQueueDeclare = AutoQueueDeclare.Enable
,则还是会自动创建队列。
如果消费者配置了 AutoQueueDeclare = AutoQueueDeclare.Disable
,则会忽略全局配置,不会创建队列。
Qos
让程序需要严格根据顺序消费时,可以使用 Qos = 1
,框架会严格保证逐条消费,如果程序不需要顺序消费,希望可以快速处理所有消息,则可以将 Qos 设置大一些。由于 Qos 和重试、补偿机制组合使用会有多种情况,因此请参考 重试。
Qos 是通过特性来配置的:
[Consumer("ConsumerWeb", Qos = 1)]
可以通过调高 Qos 值,让程序在可以并发消息,提高并发量。
延迟队列
延迟队列有两种,一种设置消息过期时间,一种是设置队列过期时间。
设置消息过期时间,那么该消息在一定时间没有被消费时,会被丢弃或移动到死信队列中,该配置只对单个消息有效,请参考 消息过期。
队列设置过期后,当消息在一定时间内没有被消费时,会被丢弃或移动到死信队列中,该配置只对所有消息有效。基于这一点,我们可以实现延迟队列。
首先创建消费者,继承 EmptyConsumer,那么该队列会在程序启动时被创建,但是不会创建 IConnection 进行消费。然后设置队列消息过期时间以及绑定死信队列,绑定的死信队列既可以使用消费者模式实现,也可以使用事件模式实现。
[Consumer("ConsumerWeb_dead_2", Expiration = 6000, DeadQueue = "ConsumerWeb_dead_queue_2")]
public class EmptyDeadConsumer : EmptyConsumer<DeadEvent>
{
}
// ConsumerWeb_dead 消费失败的消息会被此消费者消费。
[Consumer("ConsumerWeb_dead_queue_2", Qos = 1)]
public class Dead_2_QueueConsumer : IConsumer<DeadQueueEvent>
{
// 消费
public Task ExecuteAsync(EventBody<DeadQueueEvent> message)
{
Console.WriteLine($"死信队列,事件 id:{message.Id}");
return Task.CompletedTask;
}
// 每次失败时被执行
public Task FaildAsync(Exception ex, int retryCount, EventBody<DeadQueueEvent>? message) => Task.CompletedTask;
// 最后一次失败时执行
public Task<bool> FallbackAsync(EventBody<DeadQueueEvent>? message) => Task.FromResult(false);
}
空消费者
当识别到空消费者时,框架只会创建队列,而不会启动消费者消费消息。
可以结合延迟队列一起使用,该队列不会有任何消费者,当该队列的消息过期时,都由死信队列直接消费,示例如下:
[Consumer("ConsumerWeb_empty", Expiration = 6000, DeadQueue = "ConsumerWeb_empty_dead")]
public class MyEmptyConsumer : EmptyConsumer<TestEvent> { }
[Consumer("ConsumerWeb_empty_dead", Qos = 10)]
public class MyDeadConsumer : IConsumer<TestEvent>
{
public Task ExecuteAsync(EventBody<TestEvent> message) => Task.CompletedTask;
public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message) => Task.CompletedTask;
public Task<bool> FallbackAsync(EventBody<TestEvent>? message) => Task.FromResult(true);
}
对于跨进程的队列,A 服务不消费只发布,B 服务负责消费,A 服务中可以加一个空消费者,保证 A 服务启动时该队列一定存在,另一方面,消费者服务不应该关注队列的定义,也不太应该创建队列。
分组
通过配置 Group
属性将多个消费者放到同一个连接通道中执行,对于那些并发量不高的队列,复用连接通道可以降低资源消耗。
示例:
[Consumer("ConsumerWeb_group_1", Qos = 1, Group = "group")]
public class Group_1_Consumer : IConsumer<GroupEvent>
{
}
[Consumer("ConsumerWeb_group_2", Qos = 1, Group = "group")]
public class Group_2_Consumer : IConsumer<GroupEvent>
{
}
事件总线模式
Maomi.MQ 内部设计了一个事件总线,可以帮助开发者实现事件编排、实现本地事务、正向执行和补偿。
首先定义一个事件类型,该事件绑定一个 topic 或队列,事件需要使用 [EventTopic]
标识,并设置该事件对于的队列名称。
[EventTopic]
特性拥有与 [Consumer]
相同的特性,可参考 [Consumer]
的使用配置事件,请参考 消费者配置。
[EventTopic("EventWeb")]
public class TestEvent
{
public string Message { get; set; }
public override string ToString()
{
return Message;
}
}
然后编排事件执行器,每个执行器都需要继承 IEventHandler<T>
接口,然后使用 [EventOrder]
特性标记执行顺序。
[EventOrder(0)]
public class My1EventEventHandler : IEventHandler<TestEvent>
{
public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
}
public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{@event.Id},事件 1 已被执行");
}
}
[EventOrder(1)]
public class My2EventEventHandler : IEventHandler<TestEvent>
{
public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
}
public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{@event.Id},事件 2 已被执行");
}
}
每个事件执行器都必须实现 IEventHandler<T>
接口,并且设置 [EventOrder]
特性以便确认事件的执行顺序,框架会按顺序执行 IEventHandler<T>
的 ExecuteAsync
方法,当 ExecuteAsync
出现异常时,则反向按顺序调用 CancelAsync
。
由于程序可能随时挂掉,因此通过 CancelAsync
实现补偿是不太可能的,CancelAsync
主要作为记录相关信息而使用。
中间件
中间件的作用是便于开发者拦截事件、记录信息、实现本地事务等,如果开发者不配置,则框架会自动创建 DefaultEventMiddleware<TEvent>
类型作为该事件的中间件服务。
自定义事件中间件示例代码:
public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
public async Task HandleAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next)
{
await next(@event, CancellationToken.None);
}
}
next
委托是框架构建的事件执行链路,在中间件中可以拦截事件、决定是否执行事件链路。
在中间件中调用 next()
委托时,框架开始按顺序执行事件,即前面提到的 My1EventEventHandler
、My2EventEventHandler
。
当一个事件有多个执行器时,由于程序可能会在任何时刻挂掉,因此本地事务必不可少。
例如,在中间件中注入数据库上下文,然后启动事务执行数据库操作,当其中一个 EventHandler 执行失败时,执行链路会回滚,同时不会提交事务。
可以参考 消费者模式 实现中间件的重试和补偿方法。
示例如下:
public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
private readonly BloggingContext _bloggingContext;
public TestEventMiddleware(BloggingContext bloggingContext)
{
_bloggingContext = bloggingContext;
}
public async Task HandleAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next)
{
using (var transaction = _bloggingContext.Database.BeginTransaction())
{
await next(@event, CancellationToken.None);
await transaction.CommitAsync();
}
}
public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
{
return Task.CompletedTask;
}
public Task<bool> FallbackAsync(EventBody<TestEvent>? message)
{
return Task.FromResult(true);
}
}
[EventOrder(0)]
public class My1EventEventHandler : IEventHandler<TestEvent>
{
private readonly BloggingContext _bloggingContext;
public My1EventEventHandler(BloggingContext bloggingContext)
{
_bloggingContext = bloggingContext;
}
public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{@event.Id} 被补偿,[1]");
}
public async Task HandlerAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
await _bloggingContext.Posts.AddAsync(new Post
{
Title = "鲁滨逊漂流记",
Content = "随便写写就对了"
});
await _bloggingContext.SaveChangesAsync();
}
}
[EventOrder(1)]
public class My2EventEventHandler : IEventHandler<TestEvent>
{
private readonly BloggingContext _bloggingContext;
public My2EventEventHandler(BloggingContext bloggingContext)
{
_bloggingContext = bloggingContext;
}
public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{@event.Id} 被补偿,[2]");
}
public async Task HandlerAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
await _bloggingContext.Posts.AddAsync(new Post
{
Title = "红楼梦",
Content = "贾宝玉初试云雨情"
});
await _bloggingContext.SaveChangesAsync();
throw new OperationCanceledException("故意报错");
}
}
事件执行时,如果出现异常,也是会被重试的,中间件 TestEventMiddleware 的 FaildAsync、FallbackAsync 会被依次执行。
分组消费
事件分组消费主要是利用同一个 IConnection 同时处理多个消息队列,提高通道利用率。
示例:
[EventTopic("EventGroup_1", Group = "aaa")]
public class Test1Event
{
public string Message { get; set; }
public override string ToString()
{
return Message;
}
}
[EventTopic("EventGroup_2", Group = "aaa")]
public class Test2Event
{
public string Message { get; set; }
public override string ToString()
{
return Message;
}
}
Maomi.MQ 的 IConsumer<T>
是一个消费者(一个队列)使用一个 IConnection,默认情况下事件总线也是。
对于哪些并发量不大或利用率较低的队列,可以通过事件分组将其合并到同一个 IConnection 中进行处理。
使用方法很简单,只需要在定义事件时,配置 [EventTopic]
特性的 Group
方法即可。
由于不同队列被放到一个 IConnection 中消费,如果事件都设置了 Qos,那么框架会默认计算平均值,例如:
[EventTopic("web3_1", Group = "aaa", Qos = 10)]
public class Test1Event
[EventTopic("web3_2", Group = "aaa", Qos = 6)]
public class Test2Event
此时框架会设置 Qos 为 8
。
配置
在引入 Maomi.MQ 框架时,可以配置相关属性,示例和说明如下:
// this.
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
// 当前程序节点,用于配置分布式雪花 id
options.WorkId = 1;
// 是否自动创建队列
options.AutoQueueDeclare = true;
// 当前应用名称,用于标识消息的发布者和消费者程序
options.AppName = "myapp";
// RabbitMQ 配置
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = "192.168.3.248";
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly]); // 要被扫描的程序集
消费者配置
消费者模式 [Consumer]
和事件总线模式 [EventTopic]
具有相同的属性配置,其配置说明如下:
名称 | 类型 | 默认值 | 说明 |
---|---|---|---|
Queue | string | 队列名称 | |
DeadQueue | string? | 绑定死信队列名称 | |
ExecptionRequeue | bool | true | 出现异常时是否放回队列,例如序列化错误等原因导致的,而不是消费时发生异常导致的 |
Expiration | int | 0 | 队列消息过期时间,单位毫秒 |
Qos | ushort | 1 | Qos |
RetryFaildRequeue | bool | false | 消费失败次数达到条件时,是否放回队列 |
Group | string? | 分组名称 | |
AutoQueueDeclare | AutoQueueDeclare | AutoQueueDeclare.None | 是否自动创建队列 |
环境隔离
目前还在考虑要不要支持多租户模式。
在开发中,往往需要在本地调试,本地程序启动后会连接到开发服务器上,一个队列收到消息时,会向其中一个消费者推送消息。那么我本地调试时,发布一个消息后,可能本地程序收不到该消息,而是被开发环境中的程序消费掉了。
这个时候,我们希望可以将本地调试环境跟开发环境隔离开来,可以使用 RabbitMQ 提供的 VirtualHost 功能。
首先通过 put 请求创建一个新的 VirtualHost,请参考文档:https://www.rabbitmq.com/docs/vhosts#using-http-api
然后在代码中配置 VirtualHost:
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AutoQueueDeclare = true;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = "192.168.3.248";
#if DEBUG
options.VirtualHost = "debug";
#endif
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly]);
雪花 id 配置
Maomi.MQ.RabbitMQ 使用了 IdGenerator 生成雪花 id,使得每个事件在集群中都有一个唯一 id。
框架通过 IIdFactory 接口创建雪花 id,你可以通过替换 IIdFactory
接口配置雪花 id 生成规则。
services.AddSingleton<IIdFactory>(new DefaultIdFactory((ushort)optionsBuilder.WorkId));
示例:
public class DefaultIdFactory : IIdFactory
{
/// <summary>
/// Initializes a new instance of the <see cref="DefaultIdFactory"/> class.
/// </summary>
/// <param name="workId"></param>
public DefaultIdFactory(ushort workId)
{
var options = new IdGeneratorOptions(workId) { SeqBitLength = 10 };
YitIdHelper.SetIdGenerator(options);
}
/// <inheritdoc />
public long NextId() => YitIdHelper.NextId();
}
IdGenerator 框架生成雪花 id 配置请参考:
https://github.com/yitter/IdGenerator/tree/master/C%23
Qos 并发和顺序
基于消费者模式和基于事件模式都是通过特性来配置消费属性,Qos 是其中一个重要的属性。
Qos 场景
对于消费者模式和事件总线模式,在没有使用 Group
属性配置消费行为时,每个队列都会独占一个 IConnection 以及 Host service。
对于消费频率很高但是不能并发的队列,最好不要设置 Group
属性,以及务必设置 Qos = 1
。这样依赖,该消费者会独占资源进行消费,在保证顺序的情况下,独占资源有助于提高消费能力。
[Consumer("web1", Qos = 1)]
public class MyConsumer : IConsumer<TestEvent>
{
}
当需要需要提高消费吞吐量,而且不需要顺序消费时,可以将 Qos 设置高一些,RabbitMQ Client 框架会通过预取等方式提高吞吐量,并且多条消息可以并发消费。
如果判断一些消费者的消费频率不会很高时,可以将这些消费者放到一个分组中。
当多个消费者或事件配置共用一个分组时,那么这些事件的 Qos 应当一致,否则按照平均值来算。
示例:
[Consumer("web1", Qos = 10, Group = "group")]
public class My1Consumer : IConsumer<TestEvent>
{
}
[Consumer("web2", Qos = 6, Group = "group")]
public class My2Consumer : IConsumer<TestEvent>
{
}
由于两个消费者使用相同的分组,因此复用通道的 Qos 会被设置为 8。
如果消费频率不高,但是需要顺序消费时,可以将这些消费者放到同一个分组中,并且 Qos 设置为 1。
[Consumer("web1", Qos = 1, Group = "group1")]
public class My1Consumer : IConsumer<TestEvent>
{
}
[Consumer("web2", Qos = 1, Group = "group1")]
public class My2Consumer : IConsumer<TestEvent>
{
}
并发和异常处理
第一次情况,Qos 为 1 时,不设置 ExecptionRequeue 、RetryFaildRequeue。
第二种情况,Qos 为 1 时,设置 ExecptionRequeue 、RetryFaildRequeue。
Qos 为 1 时,会保证严格顺序消费,ExecptionRequeue 、RetryFaildRequeue 会影响失败的消息是否会被放回队列,如果放回队列,下一次消费会继续消费之前失败的消息。如果错误(如 bug)得不到解决,则会出现消费、失败、放回队列、重新消费这样的循环。
第三次情况,Qos > 1 时,不设置 ExecptionRequeue 、RetryFaildRequeue。
第四种情况,Qos > 1 时,设置 ExecptionRequeue 、RetryFaildRequeue。
当 Qos 大于 1 时,如果设置了 RetryFaildRequeue = true
,那么消费失败的消息会被放回队列中,但是不一定下一次会立即重新消费该条消息。
重试
重试时间
当消费者 ExecuteAsync
方法异常时,框架会进行重试,默认会重试五次,按照 2 作为指数设置重试时间间隔。
第一次失败后,间隔 2 秒重试,第二次失败后,间隔 4 秒,接着分别是 8、16、32 秒。
Maomi.MQ.RabbitMQ 使用了 Polly 框架做重试策略管理器,默认通过 DefaultRetryPolicyFactory 服务生成重试间隔策略。
DefaultRetryPolicyFactory 代码示例如下:
/// <summary>
/// Default retry policy.<br />
/// 默认的策略提供器.
/// </summary>
public class DefaultRetryPolicyFactory : IRetryPolicyFactory
{
/// <inheritdoc/>
public virtual Task<AsyncRetryPolicy> CreatePolicy(string queue, long id)
{
// Create a retry policy.
// 创建重试策略.
var retryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(
retryCount: 5,
sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
onRetry: async (exception, timeSpan, retryCount, context) =>
{
_logger.LogDebug("Retry execution event,queue [{Queue}],retry count [{RetryCount}],timespan [{TimeSpan}]", queue, retryCount, timeSpan);
await FaildAsync(queue, exception, timeSpan, retryCount, context);
});
return Task.FromResult(retryPolicy);
}
public virtual Task FaildAsync(string queue, Exception ex, TimeSpan timeSpan, int retryCount, Context context)
{
return Task.CompletedTask;
}
}
你可以通过实现 IRetryPolicyFactory 接口,替换默认的重试策略服务服务。
services.AddSingleton<IRetryPolicyFactory, DefaultRetryPolicyFactory>();
重试机制
设定消费者代码如下:
[Consumer("web1", Qos = 1 , RetryFaildRequeue = true)]
public class MyConsumer : IConsumer<TestEvent>
{
private int _retryCount = 0;
// 消费
public async Task ExecuteAsync(EventBody<TestEvent> message)
{
Console.WriteLine($"执行 {message.Body.Id} 第几次:{_retryCount} {DateTime.Now}");
_retryCount++;
throw new Exception("1");
}
// 每次失败时被执行
public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
{
Console.WriteLine($"重试 {message.Body.Id} 第几次:{retryCount} {DateTime.Now}");
await Task.CompletedTask;
}
// 最后一次失败时执行
public async Task<bool> FallbackAsync(EventBody<TestEvent>? message)
{
Console.WriteLine($"执行 {message.Body.Id} 补偿 {DateTime.Now}");
return true;
}
}
}
首先会执行 IConsumer<TEvent>.ExecuteAsync()
或 IEventMiddleware<TEvent>.ExecuteAsync()
消费消息,此时 ExecuteAsync()
执行失败,立即触发 FaildAsync()
函数。
然后等待一段时间间隔后,接着会重新执行 ExecuteAsync()
方法。
比如默认重试机制是重试五次,那么最终 IConsumer<TEvent>.ExecuteAsync()
或 IEventMiddleware<TEvent>.ExecuteAsync()
都会被执行 6次,一次正常消费和五次重试消费。
FallbackAsync()
方法会在最后一次重试失败后被调用,该函数要返回一个 bool 类型。
当多次重试失败后,框架会调用 FallbackAsync 方法,如果该方法放回 true,那么框架会认为虽然 ExecuteAsync()
执行失败,但是通过 FallbackAsync()
已经补偿好了,该消息会被当做正常完成消费,框架会向 RabbitMQ 服务器发送 ACK,接着消费下一条消息。
如果 FallbackAsync()
返回 false,框架会认为该消息彻底失败,如果设置了 RetryFaildRequeue = true
,那么该条消息会被放回消息队列,等待下一次消费。否则该条消息会被直接丢弃。
持久化剩余重试次数
当消费者处理消息失败时,默认消费者会重试 5 次,如果已经重试了 3 次,此时程序重启,那么下一次消费该消息时,依然是继续重试五次。
需要记忆重试次数,在程序重启时,能够按照剩余次数进行重试。
引入 Maomi.MQ.RedisRetry 包。
配置示例:
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AutoQueueDeclare = true;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = "192.168.3.248";
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly]);
builder.Services.AddMaomiMQRedisRetry((s) =>
{
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.3.248");
IDatabase db = redis.GetDatabase();
return db;
});
默认 key 只会保留 5 分钟。也就是说,如果五分钟之后程序才重新消费该消息,那么就会剩余重试次数就会重置。
死信队列
死信队列
可以给一个消费者或事件绑定死信队列,当该队列的消息失败后并且不会放回队列时,该消息会被推送到死信队列中,示例:
[Consumer("ConsumerWeb_dead", Qos = 1, DeadQueue = "ConsumerWeb_dead_queue", RetryFaildRequeue = false)]
public class DeadConsumer : IConsumer<DeadEvent>
{
// 消费
public Task ExecuteAsync(EventBody<DeadEvent> message)
{
Console.WriteLine($"事件 id:{message.Id}");
throw new OperationCanceledException();
}
// 每次失败时被执行
public Task FaildAsync(Exception ex, int retryCount, EventBody<DeadEvent>? message) => Task.CompletedTask;
// 最后一次失败时执行
public Task<bool> FallbackAsync(EventBody<DeadEvent>? message) => Task.FromResult(false);
}
// ConsumerWeb_dead 消费失败的消息会被此消费者消费。
[Consumer("ConsumerWeb_dead_queue", Qos = 1)]
public class DeadQueueConsumer : IConsumer<DeadQueueEvent>
{
// 消费
public Task ExecuteAsync(EventBody<DeadQueueEvent> message)
{
Console.WriteLine($"死信队列,事件 id:{message.Id}");
return Task.CompletedTask;
}
// 每次失败时被执行
public Task FaildAsync(Exception ex, int retryCount, EventBody<DeadQueueEvent>? message) => Task.CompletedTask;
// 最后一次失败时执行
public Task<bool> FallbackAsync(EventBody<DeadQueueEvent>? message) => Task.FromResult(false);
}
如果使用死信队列,则务必将 RetryFaildRequeue
设置为 false,那么消费者会在重试多次失败后,向 RabbitMQ 发送 nack 信号,RabbitMQ 就会将该消息转发到绑定的死信队列中。
延迟队列
创建一个消费者,继承 EmptyConsumer,那么该队列会在程序启动时被创建,但是不会创建 IConnection 进行消费。然后设置队列消息过期时间以及绑定死信队列,绑定的死信队列既可以使用消费者模式实现,也可以使用事件模式实现。
[Consumer("ConsumerWeb_dead_2", Expiration = 6000, DeadQueue = "ConsumerWeb_dead_queue_2")]
public class EmptyDeadConsumer : EmptyConsumer<DeadEvent>
{
}
// ConsumerWeb_dead 消费失败的消息会被此消费者消费。
[Consumer("ConsumerWeb_dead_queue_2", Qos = 1)]
public class Dead_2_QueueConsumer : IConsumer<DeadQueueEvent>
{
// 消费
public Task ExecuteAsync(EventBody<DeadQueueEvent> message)
{
Console.WriteLine($"事件 id:{message.Id} 已到期");
return Task.CompletedTask;
}
// 每次失败时被执行
public Task FaildAsync(Exception ex, int retryCount, EventBody<DeadQueueEvent>? message) => Task.CompletedTask;
// 最后一次失败时执行
public Task<bool> FallbackAsync(EventBody<DeadQueueEvent>? message) => Task.FromResult(false);
}
例如,用户下单之后,如果 15 分钟之内没有付款,那么消息到期时,自动取消订单。
可观测性
功能还在继续完善中。请参考 ActivitySourceApi 示例。
为了快速部署可观测性平台,可以使用 OpenTelemetry 官方提供的示例包快速部署相关的服务。
下载示例仓库源码:
git clone https://github.com/open-telemetry/opentelemetry-demo.git
由于示例中会包含大量的 demo 微服务,因此我们需要打开 docker-compose.yml 文件,将 services 节点的 Core Demo Services
和 Dependent Services
服务直接删除,只保留可观测性组件。或者直接点击下载笔者已经修改好的版本: docker-compose.yml
执行命令部署可观测性服务:
docker-compose up -d
opentelemetry-collector-contrib 用于收集链路追踪的可观测性信息,有 grpc 和 http 两种,监听端口如下:
Port | Protocol | Endpoint | Function |
---|---|---|---|
4317 | gRPC | n/a | Accepts traces in OpenTelemetry OTLP format (Protobuf). |
4318 | HTTP | /v1/traces |
Accepts traces in OpenTelemetry OTLP format (Protobuf and JSON). |
经过容器端口映射后,对外端口可能不是 4317、4318 了。
引入 Maomi.MQ.Instrumentation 包,以及其它相关 OpenTelemetry 包。
<PackageReference Include="Maomi.MQ.Instrumentation " Version="1.1.0" />
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.8.1" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.8.1" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.8.1" />
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.8.1" />
然后注入服务:
const string serviceName = "myapp";
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AutoQueueDeclare = true;
options.AppName = serviceName;
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = "192.168.3.248";
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly]);
builder.Services.AddOpenTelemetry()
.ConfigureResource(resource => resource.AddService(serviceName))
.WithTracing(tracing =>
{
tracing.AddMaomiMQInstrumentation(options =>
{
options.Sources.AddRange(MaomiMQDiagnostic.Sources);
options.RecordException = true;
})
.AddAspNetCoreInstrumentation()
.AddOtlpExporter(options =>
{
options.Endpoint = new Uri("http://127.0.0.1:32772/v1/traces");
options.Protocol = OtlpExportProtocol.HttpProtobuf;
});
});
启动服务后,进行发布、消费,链路追踪信息会被自动推送到 OpenTelemetry Collector 中,通过 Jaeger 、Skywalking 等组件可以读取出来。
由于 publish、consumer 属于兄弟 trace 而不是同一个 trace,因此需要通过 Tags 查询相关联的 trace,格式 event.id=xxx
。
文章评论