Instructions
Author: whuanle
Document Address: https://mmq.whuanle.cn
Repository Address: https://github.com/whuanle/Maomi.MQ
Author's Blog:
Introduction
Maomi.MQ is a communication framework that simplifies the use of message queues, currently supporting RabbitMQ.
Maomi.MQ.RabbitMQ is a communication model designed specifically for RabbitMQ publishers and consumers, greatly simplifying the code for publishing and messaging, while providing a range of convenient and practical features. Developers can achieve high-performance consumption and event orchestration through the consumption model provided by the framework. The framework also supports a series of convenient features such as publisher confirmation, custom retry mechanisms, compensation mechanisms, dead-letter queues, delayed queues, and connection channel reuse. Developers can focus more on business logic, simplifying cross-process message communication patterns via the Maomi.MQ.RabbitMQ framework, thereby making inter-process message passing simpler and more reliable.
Additionally, the framework supports distributed observability through built-in APIs in the runtime, allowing further use of frameworks such as OpenTelemetry to collect observability information and push it to infrastructure platforms.
Getting Started
This tutorial will introduce how to use Maomi.MQ.RabbitMQ, allowing readers to quickly understand the usage and features of this framework.
Create a Web project (reference the WebDemo project), include the Maomi.MQ.RabbitMQ package, and inject services in the Web configuration:
// using Maomi.MQ;
// using RabbitMQ.Client;
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
options.Port = 5672;
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly]);
var app = builder.Build();
-
WorkId: Specifies the node ID used to generate distributed snowflake IDs, defaulting to 0.
A unique ID is generated for each message for tracking purposes. If snowflake IDs are not set, the same ID may be generated during parallel work of multiple instances in distributed services.
-
AppName: Used to identify the message producer and mark it in logs and trace links as the producer or consumer of messages.
-
Rabbit: Configuration for the RabbitMQ client, please refer to ConnectionFactory.
Define the message model class, which serves as the foundation for MQ communication, and this model class will be serialized into binary content for transmission to the RabbitMQ server.
public class TestEvent
{
public int Id { get; set; }
public override string ToString()
{
return Id.ToString();
}
}
Define a consumer. The consumer needs to implement the IConsumer<TEvent>
interface and use the [Consumer]
attribute to configure consumer properties. For example, [Consumer("test")]
indicates that this consumer subscribes to the queue named test
.
The IConsumer<TEvent>
interface has three methods: ExecuteAsync
, which handles messages; FaildAsync
, which is executed immediately if an exception occurs in ExecuteAsync
; and if the code continues to fail, the FallbackAsync
method will be called. The Maomi.MQ framework will determine based on the ConsumerState value whether to re-queue the message for re-consumption or perform other actions.
[Consumer("test")]
public class MyConsumer : IConsumer<TestEvent>
{
// Consumption
public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
{
Console.WriteLine($"Event id: {message.Id} {DateTime.Now}");
await Task.CompletedTask;
}
// Executed on every consumption failure
public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message)
=> Task.CompletedTask;
// Compensation
public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
=> Task.FromResult(ConsumerState.Ack);
}
Maomi.MQ also supports various consumer modes, with different code implementations that will be detailed in subsequent sections.
To publish messages, simply inject the IMessagePublisher service.
[ApiController]
[Route("[controller]")]
public class IndexController : ControllerBase
{
private readonly IMessagePublisher _messagePublisher;
public IndexController(IMessagePublisher messagePublisher)
{
_messagePublisher = messagePublisher;
}
[HttpGet("publish")]
public async Task<string> Publisher()
{
// Publish message
await _messagePublisher.PublishAsync(exchange: string.Empty, routingKey: "test", message: new TestEvent
{
Id = 123
});
return "ok";
}
}
Start the web service and request the API interface on the Swagger page; the MyConsumer service will immediately receive the published message.
For console projects, you need to include the Microsoft.Extensions.Hosting package to allow the consumer to subscribe to the queue and consume messages in the background.
Reference the ConsoleDemo project.
using Maomi.MQ;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using System.Reflection;
var host = new HostBuilder()
.ConfigureLogging(options =>
{
options.AddConsole();
options.AddDebug();
})
.ConfigureServices(services =>
{
services.AddMaomiMQ(options =>
{
options.WorkId = 1;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
options.Port = 5672;
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, new System.Reflection.Assembly[] { typeof(Program).Assembly });
}).Build();
// Run in the background
var task = host.RunAsync();
Console.ReadLine();
Message Publisher
The message publisher is used to push messages to the RabbitMQ server. Maomi.MQ supports multiple message publisher modes, including RabbitMQ transaction modes. For example projects, please reference PublisherWeb.
Maomi.MQ provides developers with message-pushing services through IMessagePublisher.
Before publishing messages, you need to define an event model class to carry the message.
public class TestEvent
{
public int Id { get; set; }
public override string ToString()
{
return Id.ToString();
}
}
Then inject the IMessagePublisher service and publish messages:
[ApiController]
[Route("[controller]")]
public class IndexController : ControllerBase
{
private readonly IMessagePublisher _messagePublisher;
public IndexController(IMessagePublisher messagePublisher)
{
_messagePublisher = messagePublisher;
}
[HttpGet("publish")]
public async Task<string> Publisher()
{
for (var i = 0; i < 100; i++)
{
await _messagePublisher.PublishAsync(exchange: string.Empty, routingKey: "publish", message: new TestEvent
{
Id = i
});
}
return "ok";
}
}
In general, a model class should only be used by one consumer, so the consumer can be uniquely found through events. This means finding the consumer's IConsumerOptions through the event type, allowing the framework to send messages using the corresponding configuration.
The TestMessageEvent model only has one consumer:
[Consumer("publish", Qos = 1, RetryFaildRequeue = true)]
public class TestEventConsumer : IConsumer<TestMessageEvent>
{
// ... ...
}
You can send events directly without specifying an exchange and routing key:
[HttpGet("publish_message")]
public async Task<string> PublisherMessage()
{
// If TestMessageEvent is designated with only one consumer in this project,
// then it will automatically search for the corresponding configuration through TestMessageEvent
for (var i = 0; i < 100; i++)
{
await _messagePublisher.PublishAsync(model: new TestMessageEvent
{
Id = i
});
}
return "ok";
}
IMessagePublisher
IMessagePublisher is the basic message publishing interface of Maomi.MQ, with the following methods:
// Message publisher.
public interface IMessagePublisher
{
Task PublishAsync<TMessage>(string exchange, // Exchange name.
string routingKey, // Queue/Routing key name.
TMessage message, // Event object.
Action<BasicProperties> properties,
CancellationToken cancellationToken = default)
where TMessage : class;
Task PublishAsync<TMessage>(string exchange,
string routingKey,
TMessage message,
BasicProperties? properties = default,
CancellationToken cancellationToken = default);
Task PublishAsync<TMessage>(TMessage message,
Action<BasicProperties>? properties = null,
CancellationToken cancellationToken = default)
where TMessage : class;
Task PublishAsync<TMessage>(TMessage model,
BasicProperties? properties = default,
CancellationToken cancellationToken = default);
Task CustomPublishAsync<TMessage>(string exchange,
string routingKey,
TMessage message,
BasicProperties? properties = default,
CancellationToken cancellationToken = default);
}
There are only a few message publishing interfaces in Maomi.MQ. Since BasicProperties is directly exposed, developers have complete freedom to configure the native message properties of RabbitMQ, hence the simple interface that allows for great flexibility in usage without much difficulty.
BasicProperties is the basic property object for messages in RabbitMQ, directly aimed at developers, making the publishing and consumption of messages flexible and rich in functionality. For example, you can configure the expiration time of a single message via BasicProperties:
await _messagePublisher.PublishAsync(exchange: string.Empty, routingKey: "publish", message: new TestEvent
{
Id = i
}, (BasicProperties p) =>
{
p.Expiration = "1000";
});
Maomi.MQ implements IMessagePublisher through the DefaultMessagePublisher type, which has a default lifetime of Scoped:
services.AddScoped<IMessagePublisher, DefaultMessagePublisher>();
Developers can also implement their own message publishing model by implementing the IMessagePublisher interface. For specific examples, please refer to the DefaultMessagePublisher type.
Native Channel
Developers can obtain raw connection objects through the ConnectionPool
service and use RabbitMQ's interfaces to publish messages directly on IConnection:
private readonly ConnectionPool _connectionPool;
var connectionObject = _connectionPool.Get();
connectionObject.DefaultChannel.BasicPublishAsync(... ...);
Persistent In-Memory Connection Objects
Maomi.MQ manages RabbitMQ connection objects through ConnectionPool. After injecting the ConnectionPool service, you can obtain a global default connection instance via .Get()
.
If developers have their own requirements, they can also create new connection objects through the .Create()
interface:
using var newConnectionObject = _connectionPool.Create();
using var newConnection = newConnectionObject.Connection;
using var newChannel = newConnection.CreateChannelAsync();
Please use connection objects properly, avoiding frequent creation and disposal, and do not forget to manage lifecycles to prevent memory leaks.
A single IConnection is sufficient for most scenarios, with throughput being adequate. The author has conducted multiple long-duration tests and found that one IConnection can meet the demand; multiple IConnections do not provide any advantage. Therefore, the old version of the connection pool has been removed, and now there will only be one global IConnection by default, while different consumers use IChannel for isolation.
When the program maintains only one IConnection, four publishers simultaneously publish messages at the following rate per second:
If the message content is very large, a single IConnection can still handle it, depending on bandwidth.
Each message is 478 KiB.
Message Expiration
The IMessagePublisher exposes BasicProperties, allowing developers to freely configure message properties.
For example, setting an expiration time for a message:
[HttpGet("publish")]
public async Task<string> Publisher()
{
for (var i = 0; i < 1; i++)
{
await _messagePublisher.PublishAsync(exchange: string.Empty, routingKey: "publish", message: new TestEvent
{
Id = i
}, properties =>
{
properties.Expiration = "6000";
});
}
return "ok";
}
After setting an expiration time for the message, if the queue is bound to a dead-letter queue, the message will be moved to another queue if it is not consumed for a long time. Please refer to Dead Letter Queue.
More functionality can also be achieved by configuring message properties; refer to the IBasicProperties documentation.
Transactions
RabbitMQ natively supports a transactional model, and the transactional communication protocol of RabbitMQ can be referenced at https://www.rabbitmq.com/docs/semantics.
According to the official RabbitMQ documentation, the transaction mode results in a throughput decrease of 250 times, primarily related to the transactional mechanism. The transaction mode not only ensures that messages are pushed to the Rabbit broker but also ensures synchronization across Rabbit brokers in multi-node partitions, so that messages have been fully synchronized when the Rabbit broker goes down. However, this strict mode is generally not necessary, so the sender acknowledgment mechanism mentioned in the next section can also be used.
The transaction interface of Maomi.MQ is relatively straightforward to use, with an extension method to directly start an ITransactionPublisher. The usage is also quite simple, as shown below:
[HttpGet("publish_tran")]
public async Task<string> Publisher_Tran()
{
using var tranPublisher = _messagePublisher.CreateTransaction();
await tranPublisher.TxSelectAsync();
try
{
await tranPublisher.PublishAsync(exchange: string.Empty, routingKey: "publish", message: new TestEvent
{
Id = 666
});
await Task.Delay(5000);
await tranPublisher.TxCommitAsync();
}
catch
{
await tranPublisher.TxRollbackAsync();
throw;
}
return "ok";
}
Sender Acknowledgment Mode
The transaction mode can ensure that messages are pushed to the RabbitMQ server and have been synchronized among nodes, but due to the 250-fold reduction in throughput, RabbitMQ has introduced a confirmation mechanism. This mechanism acts like a sliding window, ensuring that messages are pushed to the server while maintaining high performance, with a throughput that is 100 times that of the transaction mode. Refer to the materials at:
https://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms
https://www.rabbitmq.com/docs/confirms
However, the new version of the .NET RabbitMQClient library has removed some API interfaces. Detailed information about the changes can be found at: Issue #1682, RabbitMQ tutorial - Reliable Publishing with Publisher Confirms.
Maomi.MQ has made simplified adjustments based on the new version. The specific usage is to create a message publisher that uses an independent channel, specifying the IChannel property in the parameters.
using var confirmPublisher = _messagePublisher.CreateSingle(
new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true));
for (var i = 0; i < 5; i++)
{
await confirmPublisher.PublishAsync(exchange: string.Empty, routingKey: "publish", message: new TestEvent
{
Id = 666
});
}
The transaction mode and acknowledgment mode publishers are mutually isolated; the creation of these two object types will automatically use the new IChannel by default, so there is no need to worry about conflicts.
If developers have their own requirements, they can also create a separate IChannel using CreateSingle and customize the CreateChannelOptions properties. For a description of CreateChannelOptions, please refer to:
https://rabbitmq.github.io/rabbitmq-dotnet-client/api/RabbitMQ.Client.CreateChannelOptions.html
Broadcast Mode
Broadcast mode is used to push a message to an exchange, allowing multiple bound queues to receive the same message. In simple terms, this mode involves pushing messages to the exchange, which then forwards the messages to all bound queues, enabling different queue consumers to receive the message simultaneously.
In RabbitMQ, there are about four types of exchanges, defined as constants:
public static class ExchangeType
{
public const string Direct = "direct";
public const string Fanout = "fanout";
public const string Headers = "headers";
public const string Topic = "topic";
private static readonly string[] s_all = { Fanout, Direct, Topic, Headers };
}
However, different exchange modes have different usage. Taking fanout as an example, when a queue is bound to a fanout type exchange, the Rabbit broker will ignore the RoutingKey and push the message to all bound queues.
Thus, we define two consumers bound to the same fanout type exchange:
[Consumer("fanout_1", BindExchange = "fanouttest", ExchangeType = "fanout")]
public class FanoutEvent_1_Consumer : IConsumer<FanoutEvent>
{
// Consume
public virtual async Task ExecuteAsync(MessageHeader messageHeader, FanoutEvent message)
{
Console.WriteLine($"【fanout_1】,事件 id: {message.Id} {DateTime.Now}");
await Task.CompletedTask;
}
// ... ...
}
[Consumer("fanout_2", BindExchange = "fanouttest", ExchangeType = "fanout")]
public class FanoutEvent_2_Consumer : IConsumer<FanoutEvent>
{
// Consume
public virtual async Task ExecuteAsync(MessageHeader messageHeader, FanoutEvent message)
{
Console.WriteLine($"【fanout_2】,事件 id: {message.Id} {DateTime.Now}");
await Task.CompletedTask;
}
// ... ...
}
When publishing messages, you only need to configure the exchange name, and both consumer services will receive the message simultaneously:
[HttpGet("publish_fanout")]
public async Task<string> Publisher_Fanout()
{
for (var i = 0; i < 5; i++)
{
await _messagePublisher.PublishAsync(exchange: "fanouttest", routingKey: string.Empty, message: new FanoutEvent
{
Id = 666
});
}
return "ok";
}
For Topic type exchanges and queues, the usage is also consistent. You can define two consumers:
[Consumer("red.yellow.#", BindExchange = "topictest", ExchangeType = "topic")]
public class TopicEvent_1_Consumer : IConsumer<TopicEvent>
{
// Consume
public virtual async Task ExecuteAsync(MessageHeader messageHeader, TopicEvent message)
{
Console.WriteLine($"【red.yellow.#】,事件 id: {message.Id} {DateTime.Now}");
await Task.CompletedTask;
}
// ... ...
}
[Consumer("red.#", BindExchange = "topictest", ExchangeType = "topic")]
public class TopicEvent_2_Consumer : IConsumer<TopicEvent>
{
// Consume
public virtual async Task ExecuteAsync(MessageHeader messageHeader, TopicEvent message)
{
Console.WriteLine($"【red.#】,事件 id: {message.Id} {DateTime.Now}");
await Task.CompletedTask;
}
// ... ...
}
Publishing messages:
[HttpGet("publish_topic")]
public async Task<string> Publisher_Topic()
{
for (var i = 0; i < 5; i++)
{
await _messagePublisher.PublishAsync(exchange: "topictest", routingKey: "red.a", message: new TopicEvent
{
Id = 666
});
await _messagePublisher.PublishAsync(exchange: "topictest", routingKey: "red.yellow.a", message: new TopicEvent
{
Id = 666
});
}
return "ok";
}
Unrouteable Messages
When a message is published and it cannot be routed (e.g., when the corresponding queue cannot be found), the IBreakdown.BasicReturnAsync
interface will be triggered. The BasicReturnEventArgs
property contains detailed error reasons.
For network failures, RabbitMQ service outages, or missing corresponding exchange names, exceptions will appear on the current thread, and the TCP connection will automatically reconnect.
It is important to note that the RabbitMQ mechanism does not guarantee synchronous behavior when pushing messages, so even if the push fails, no exceptions will appear in the current thread, making it impossible to determine if the message was successfully pushed.
For unrouteable messages, Maomi.MQ only provides simple interface notifications without any other processing mechanisms, so developers need to handle it themselves. A community MQ communication framework called EasyNetQ has a default mechanism that automatically creates a new queue to push the currently unrouteable messages for persistent storage.
Developers can implement this interface and register it to the container:
services.AddScoped<IBreakdown, MyDefaultBreakdown>();
For example, to push unrouteable messages to a new queue:
public class MyDefaultBreakdown : IBreakdown
{
private readonly ConnectionPool _connectionPool;
public MyDefaultBreakdown(ConnectionPool connectionPool)
{
_connectionPool = connectionPool;
}
/// <inheritdoc />
public async Task BasicReturnAsync(object sender, BasicReturnEventArgs @event)
{
var connectionObject = _connectionPool.Get();
await connectionObject.DefaultChannel.BasicPublishAsync<BasicProperties>(
@event.Exchange,
@event.RoutingKey + ".failed",
true,
new BasicProperties(@event.BasicProperties),
@event.Body);
}
/// <inheritdoc />
public Task NotFoundConsumerAsync(string queue, Type messageType, Type consumerType)
{
return Task.CompletedTask;
}
}
In fact, for the situation of unrouteable messages, it is not only necessary to forward and store them, but also to check for issues like mistakenly deleted queues or inconsistencies in queue names during message publishing.
Consumers
In Maomi.MQ.RabbitMQ, there are three consumption modes: consumer mode, event mode (event bus mode), and dynamic consumer mode. The dynamic consumer mode also supports multiple consumption modes.
Below is a brief introduction to the usage of these three modes, with more detailed explanations to follow.
Consumer Mode
Consumer services need to implement the IConsumer<TEvent>
interface and configure the [Consumer("queue")]
attribute to bind the queue name. Through the consumer object, consumption behavior can be controlled. The consumer mode has failure notification and compensation capabilities and is relatively simple to use.
Runtime configuration can be modified using the
[ConsumerAttribute]
public class TestEvent
{
public int Id { get; set; }
}
[Consumer("PublisherWeb", Qos = 1, RetryFaildRequeue = true)]
public class MyConsumer : IConsumer<TestEvent>
{
private static int _retryCount = 0;
// Consume
public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
{
_retryCount++;
Console.WriteLine($"Execution Count: {_retryCount} Event ID: {message.Id} {DateTime.Now}");
await Task.CompletedTask;
}
// Executed on every consumption failure
public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message)
=> Task.CompletedTask;
// Compensation
public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
=> Task.FromResult(ConsumerState.Ack);
}
<br />
### Event Pattern
The event pattern is realized through an event bus to control consumption behavior around the event model.
```csharp
[EventTopic("web2", Qos = 1, RetryFaildRequeue = true)]
public class TestEvent
{
public string Message { get; set; }
}</code></pre>
<p><br /></p>
<p>Then use the <code>[EventOrder]</code> attribute to orchestrate the order of event execution.</p>
<pre><code class="language-csharp">// Orchestrate event consumption order
[EventOrder(0)]
public class My1EventEventHandler : IEventHandler<TestEvent>
{
public async Task CancelAsync(TestEvent @event, CancellationToken cancellationToken)
{
}
public async Task ExecuteAsync(TestEvent @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{@event.Id}, Event 1 has been executed");
}
}
[EventOrder(1)]
public class My2EventEventHandler : IEventHandler<TestEvent>
{
public async Task CancelAsync(TestEvent @event, CancellationToken cancellationToken)
{
}
public async Task ExecuteAsync(TestEvent @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{@event.Id}, Event 2 has been executed");
}
}</code></pre>
<p><br /></p>
<p>Of course, event patterns can also enhance compensation functionality through middleware, allowing all ordered events to be placed into one transaction, either succeeding or failing together, thereby avoiding consistency issues caused by program exits during event execution.</p>
<pre><code class="language-csharp">public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
private readonly BloggingContext _bloggingContext;
public TestEventMiddleware(BloggingContext bloggingContext)
{
_bloggingContext = bloggingContext;
}
public async Task ExecuteAsync(MessageHeader messageHeader, TMessage message, EventHandlerDelegate<TMessage> next)
{
using (var transaction = _bloggingContext.Database.BeginTransaction())
{
await next(@event, CancellationToken.None);
await transaction.CommitAsync();
}
}
public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TMessage? message)
{
return Task.CompletedTask;
}
public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TMessage? message, Exception? ex)
{
return Task.FromResult(true);
}
}</code></pre>
<p><br /></p>
<p>Both consumer and event bus patterns can handle large messages, as shown in the image below, where each message is nearly 500kb, with multiple queues consumed concurrently.</p>
<p><img src="https://www.whuanle.cn/wp-content/uploads/2025/02/post-21721-67b7ba0b87ad6.png" alt="image-20240720221514504" /></p>
<p><br /></p>
<p>If the message content is small, high consumption speeds can be achieved.</p>
<p><img src="https://www.whuanle.cn/wp-content/uploads/2025/02/post-21721-67b7ba0b8b43f.png" alt="image-20240720212715583" /></p>
<h3>Dynamic Consumers</h3>
<p>Dynamic consumers can dynamically subscribe to queues at runtime and support three methods: consumer type, event bus type, and function binding.</p>
<p>Injecting IDynamicConsumer allows you to use the dynamic consumer service.</p>
<pre><code class="language-csharp">await _dynamicConsumer.ConsumerAsync<MyConsumer, TestEvent>(new ConsumerOptions("myqueue")
{
Qos = 10
});</code></pre>
<pre><code class="language-csharp">// Automatically corresponding consumer to event model
await _dynamicConsumer.ConsumerAsync<TestEvent>(new ConsumerOptions("myqueue")
{
Qos = 10
});</code></pre>
<pre><code class="language-csharp">// Function-based consumption
_dynamicConsumer.ConsumerAsync<TestEvent>(new ConsumerOptions("myqueue")
{
Qos = 10
}, async (header, message) =>
{
Console.WriteLine($"Event ID: {message.Id} {DateTime.Now}");
await Task.CompletedTask;
});</code></pre>
<h3>Consumer Registration Pattern</h3>
<p>Maomi.MQ provides the ITypeFilter interface, allowing developers to implement a custom consumer registration pattern using this interface.</p>
<p>Maomi.MQ has three built-in ITypeFilter:</p>
<ul>
<li>Consumer Pattern: ConsumerTypeFilter</li>
<li>Event Bus Pattern: EventBusTypeFilter</li>
<li>Custom Consumer Pattern: ConsumerTypeFilter</li>
</ul>
<p><br /></p>
<p>By default, the framework registers the ConsumerTypeFilter and EventBusTypeFilter patterns, and developers can adjust which patterns to use.</p>
<pre><code class="language-csharp">var consumerTypeFilter = new ConsumerTypeFilter();
// ...
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
// ... ...
},
[typeof(Program).Assembly], // Assembly to automatically scan
[new ConsumerTypeFilter(), new EventBusTypeFilter(), consumerTypeFilter]); // Configure consumer registration patterns to use</code></pre>
<h2>Consumer Pattern</h2>
<p>The consumer pattern requires the service to implement the <code>IConsumer<TEvent></code> interface, and there are three registration methods for consumer services.</p>
<ul>
<li>Use the <code>[Consumer]</code> attribute for automatic scanning and injection during application startup, allowing for dynamic modifications of <code>[Consumer]</code>.</li>
<li>Do not set <code>[Consumer]</code>, manually set the consumer service and configuration using <code>CustomConsumerTypeFilter</code>.</li>
<li>Use IDynamicConsumer for dynamic binding of consumers at runtime.</li>
</ul>
<blockquote>
<p>This example can reference the ConsumerWeb project.</p>
</blockquote>
<p><br /></p>
<p>The <code>IConsumer<TEvent></code> interface is quite simple, defined as follows:</p>
<pre><code class="language-csharp">public interface IConsumer<TMessage>
where TMessage : class
{
public Task ExecuteAsync(MessageHeader messageHeader, TMessage message);
public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TMessage message);
public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TMessage? message, Exception? ex);
}</code></pre>
<p><br /></p>
<p>When using the consumer pattern, you first need to define a model class for message exchange between the publisher and the consumer. The event model class can be any class that can serialize and deserialize properly, with no other requirements.</p>
<pre><code class="language-csharp">public class TestEvent
{
public int Id { get; set; }
public override string ToString()
{
return Id.ToString();
}
}</code></pre>
<p><br /></p>
<p>Then inherit the <code>IConsumer<TEvent></code> interface to implement consumer functionality:</p>
<pre><code class="language-csharp">[Consumer("ConsumerWeb", Qos = 1)]
public class MyConsumer : IConsumer<TestEvent>
{
private readonly ILogger<MyConsumer> _logger;
public MyConsumer(ILogger<MyConsumer> logger)
{
_logger = logger;
}
// Consume
public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
{
Console.WriteLine($"Event ID: {message.Id}");
}
// Executed on each failure
public async Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message)
{
_logger.LogError(ex, "Consumer exception, event ID: {Id}, retry count: {retryCount}", message!.Id, retryCount);
}
// Executed on the last failure
public async Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
{
return ConsumerState.Ack;
}
}</code></pre>
<p><br /></p>
<p>For attribute configuration details, please refer to <a href="3.configuration.md#消费者配置">Consumer Configuration</a>.</p>
<h3>Manual Injection of Consumers</h3>
<p>Developers can also manually register consumer services through CustomConsumerTypeFilter by configuring ConsumerOptions.</p>
<pre><code class="language-csharp">var consumerOptions = new ConsumerOptions("test-queue_2")
{
DeadExchange = "test-dead-exchange_2",
DeadRoutingKey = "test-dead-routing-key_2",
Expiration = 60000,
Qos = 10,
RetryFaildRequeue = true,
AutoQueueDeclare = AutoQueueDeclare.Enable,
BindExchange = "test-bind-exchange_2",
ExchangeType = "direct",
RoutingKey = "test-routing_2"
};
// Create custom consumer pattern
var consumerTypeFilter = new CustomConsumerTypeFilter();
var consumerType = typeof(TestConsumer);
consumerTypeFilter.AddConsumer(consumerType, consumerOptions);</code></pre>
<p><br /></p>
<p>When registering MQ services, add the custom consumer pattern:</p>
<pre><code class="language-csharp">builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
// ... ...
},
[typeof(Program).Assembly],
[new ConsumerTypeFilter(), new EventBusTypeFilter(), consumerTypeFilter]); // Add custom consumer pattern</code></pre>
<h3>Dynamic Consumers</h3>
<p>Inject IDynamicConsumer to use dynamic consumer services, and the added consumers will run automatically in the background.</p>
<pre><code class="language-csharp">var consumerTag = await _dynamicConsumer.ConsumerAsync<MyConsumer, TestEvent>(new ConsumerOptions("myqueue")
{
Qos = 10
});</code></pre>
<p><br /></p>
<p>If subscription cancellation is needed, it can be done via consumerTag or queue name.</p>
<pre><code class="language-csharp">await _dynamicConsumer.StopConsumerTagAsync(consumerTag);
await _dynamicConsumer.StopConsumerAsync(queueName);</code></pre>
<h3>Consumption, Retry, and Compensation</h3>
<p>When a consumer receives a message pushed from the server, the <code>ExecuteAsync</code> method is automatically executed. When <code>ExecuteAsync</code> causes an exception, the <code>FaildAsync</code> method is immediately triggered, allowing developers to log related information.</p>
<pre><code class="language-csharp">// Executed on each failure
public async Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message)
{
_logger.LogError(ex, "Consumer exception, event ID: {Id}, retry count: {retryCount}", message!.Id, retryCount);
}</code></pre>
<p><br /></p>
<p>By default, the framework retries a maximum of three times, meaning that the <code>ExecuteAsync</code> method will be executed a total of four times at most.</p>
<p>If the <code>FaildAsync</code> method also encounters an exception, it will not affect the overall process; the framework will wait until the interval time arrives to continue retrying the <code>ExecuteAsync</code> method.</p>
<p>It is recommended to use a <code>try{}</code>-<code>catch{}</code> block in <code>FaildAsync</code>, avoiding throwing exceptions externally. The logic within <code>FaildAsync</code> should not be too complex, and it should primarily be used for logging or alarming.</p>
<p><br /></p>
<p>When an exception occurs in the <code>ExecuteAsync</code> method, the framework automatically retries, with a default of three retries. If all three attempts fail, the <code>FallbackAsync</code> method will be executed for compensation.</p>
<p>The retry interval will gradually increase; refer to <a href="5.retry.md">Retry</a> for details.</p>
<p><br /></p>
<p>After three retries, the compensation mechanism will be initiated immediately.</p>
<pre><code class="language-csharp">// Executed on the last failure
public async Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
{
return ConsumerState.Ack;
}</code></pre>
<p><br /></p>
<p>The <code>FallbackAsync</code> method needs to return a <code>ConsumerState</code> indicating that although an exception occurred in <code>ExecuteAsync</code>, compensation via <code>FallbackAsync</code> has been completed successfully, and the message will be considered consumed normally. If it returns <code>false</code>, it indicates that compensation failed, and the message will be treated as consumption failure.</p>
<p>Only when an exception occurs in <code>ExecuteAsync</code> will <code>FaildAsync</code> and <code>FallbackAsync</code> be triggered.</p>
<h3>Consumption Failure</h3>
<p>When the number of failures in <code>ExecuteAsync</code> reaches the threshold, the consumption of that message fails, or if serialization or other errors occur, it will fail directly, triggering the last <code>FallbackAsync</code>.</p>
<pre><code>
<br />
<br />
In <code>IConsumerOptions</code>, there are three important configurations:
```csharp
public class IConsumerOptions : Attribute
{
// Whether to re-queue the message if the consumption fails a certain number of times.
public bool RetryFaildRequeue { get; set; }
/// Binding dead letter exchange
public string? DeadExchange { get; set; }
/// Binding dead letter queue
public string? DeadRoutingKey { get; set; }
}
The return value of FallbackAsync
is the ConsumerState
enumeration, which is defined as follows:
/// After receiving RabbitMQ messages, determine whether to perform ACK, NACK, or requeue the messages through the state enumeration.
public enum ConsumerState
{
/// ACK.
Ack = 1,
/// Immediately NACK and use the default settings to decide whether to requeue the message.
Nack = 1 << 1,
/// Immediately NACK and requeue the message.
NackAndRequeue = 1 << 2,
/// Immediately NACK, and the message will be removed from the server queue.
NackAndNoRequeue = 1 << 3,
/// An exception occurred.
Exception = 1 << 4
}
There are several scenarios for consumption failures, detailed logic is listed below:
- If deserialization fails or if there is an exception during
FallbackAsync
, it will directly triggerConsumerState.Exception
, after which it will determine whether to requeue the message based onIConsumerOptions.RetryFaildRequeue
for retrying next time. - If
FallbackAsync
returnsConsumerState.ACK
, it indicates that despite continuous consumption failures, the message is still acknowledged. - If
FallbackAsync
returnsConsumerState.Nack
, it indicates a failure in consumption, but whether to requeue is determined byIConsumerOptions.RetryFaildRequeue
. - If
FallbackAsync
returnsConsumerState.NackAndRequeue
, it indicates an immediate consumption failure and the message is requeued. - If
FallbackAsync
returnsConsumerState.NackAndNoRequeue
, it indicates an immediate consumption failure and this message will not be requeued.
Automatic Queue Creation
The framework will automatically create queues by default. If you need to disable this feature, set AutoQueueDeclare
to false
.
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AppName = "myapp";
options.AutoQueueDeclare = false;
options.Rabbit = (ConnectionFactory options) =>
{
// ... ...
};
}, [typeof(Program).Assembly]);
You can also individually configure whether to automatically create queues for consumers:
[Consumer("ConsumerWeb_create", AutoQueueDeclare = AutoQueueDeclare.Enable)]
By default, if global auto-creation is disabled, queues will not be created automatically.
If global auto-creation is disabled but the consumer is configured with AutoQueueDeclare = AutoQueueDeclare.Enable
, queues will still be created automatically.
If the consumer is configured with AutoQueueDeclare = AutoQueueDeclare.Disable
, it will ignore the global configuration and will not create queues.
Qos
The default Qos
is 100
.
When strict order consumption is required, you can use Qos = 1
, which strictly guarantees that messages are consumed one by one. If the program does not require ordered consumption and hopes to process all messages quickly, the Qos
can be set higher. Since Qos
and retry/compensation mechanisms combined may lead to various scenarios, please refer to Retries.
Qos
is configured through attributes:
[Consumer("ConsumerWeb", Qos = 1)]
You can increase the Qos
value to allow concurrent message processing and increase throughput.
Setting the Qos
value according to the network environment, server performance, and the number of instances can effectively improve message processing speed; please refer to Qos.
Delayed Queue
There are two types of delayed queues: one sets the message expiration time, and the other sets the queue expiration time.
Setting the message expiration time means that if the message is not consumed within a certain time, it will be discarded or moved to the dead letter queue. This configuration only applies to individual messages; please refer to Message Expiration.
When the queue is set to expire, if messages are not consumed within a certain time, they will be discarded or moved to the dead letter queue. This configuration applies to all messages. Based on this, we can implement a delayed queue.
First, create a consumer that inherits EmptyConsumer
; this queue will be created upon application startup without establishing an IConnection
to consume messages. Then set the message expiration time for the queue and bind it to the dead letter queue, which can be implemented using either consumer mode or event mode.
[Consumer("consumerWeb_dead", Expiration = 6000, DeadRoutingKey = "consumerWeb_dead_queue")]
public class EmptyDeadConsumer : EmptyConsumer<DeadEvent>
{
}
// Messages that fail consuming from ConsumerWeb_dead will be consumed by this consumer.
[Consumer("consumerWeb_dead_queue", Qos = 1)]
public class Dead_QueueConsumer : IConsumer<DeadQueueEvent>
{
// Consume
public Task ExecuteAsync(MessageHeader messageHeader, DeadQueueEvent message)
{
Console.WriteLine($"Dead letter queue, event id:{message.Id}");
return Task.CompletedTask;
}
// Executed each time a failure occurs
public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, DeadQueueEvent message) => Task.CompletedTask;
// Executed when the last failure occurs
public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, DeadQueueEvent? message, Exception? ex)
=> Task.FromResult(ConsumerState.Ack);
}
Empty Consumer
When an empty consumer is detected, the framework will only create the queue without starting the consumer to consume messages.
It can be used in conjunction with delayed queues; this queue will have no consumers. When the messages in this queue expire, they will all be consumed directly by the dead letter queue. An example is shown below:
[Consumer("ConsumerWeb_empty", Expiration = 6000, DeadQueue = "ConsumerWeb_empty_dead")]
public class MyEmptyConsumer : EmptyConsumer<TestEvent> { }
[Consumer("ConsumerWeb_empty_dead", Qos = 10)]
public class MyDeadConsumer : IConsumer<TestEvent>
{
// ... ...
}
For cross-process queues, service A publishes but does not consume, while service B is responsible for consumption. Service A can add an empty consumer to ensure that the queue exists upon startup, while on the other hand, the consumer service should not be concerned with the definition of the queue and shouldn't create it.
Broadcast Mode
In RabbitMQ, after setting up a Fanout or Topic exchange, multiple queues bound to that exchange will receive identical messages. In a microservices scenario, for example, after an employee leaves the company, a message needs to be published, and all systems subscribed to this message need to handle the related data.
Create two consumer queues; the names of the queues cannot be the same and can be bound to the same exchange with arbitrary names, e.g., exchange
.
[Consumer("ConsumerWeb_exchange_1", BindExchange = "exchange")]
public class Exchange_1_Consumer : IConsumer<TestEvent>
{
/// ... ...
}
[Consumer("ConsumerWeb_exchange_2", BindExchange = "exchange")]
public class Exchange_2_Consumer : IConsumer<TestEvent>
{
// ... ...
}
When the publisher publishes a message, it needs to use the broadcasting publisher mode. Please refer to: Broadcast Mode.
Additionally, Maomi.MQ allows the customization of exchange types and exchange names.
Based on Events
Maomi.MQ internally designs an event bus to help developers implement event choreography, local transactions, forward execution, and compensation.
Maomi.MQ is not designed with local message tables for distributed transaction guarantees, based on the following considerations:
- Maomi.MQ is based on a message queue communication model, not specifically designed for distributed transactions, and has limited coordination capabilities for distributed transactions. For using distributed transaction orchestration, platforms similar to DTM, Seata, etc., are needed, as distributed transactions require a transaction center coordination platform.
- Maomi.MQ has designed retry and compensation strategies that can address exceptions to some extent.
- Maomi.MQ cannot guarantee the issues of idempotency or empty compensations; however, not all scenarios require strict assurance of consumption.
- Through the middleware functionality of the event mode, developers can easily handle issues like idempotency, empty compensation, and hangs.
Using Event Mode
First, define an event type that binds to a topic or queue. The event should be marked with the [EventTopic]
attribute and set to the corresponding queue name.
The [EventTopic]
attribute possesses the same characteristics as the [Consumer]
attribute; for event configuration, please refer to Consumer Configuration.
[EventTopic("EventWeb")]
public class TestEvent
{
public string Message { get; set; }
public override string ToString()
{
return Message;
}
}
Then orchestrate the event executors. Each executor must inherit the IEventHandler<T>
interface and use the [EventOrder]
attribute to mark the execution order.
[EventOrder(0)]
public class My1EventEventHandler : IEventHandler<TestEvent>
{
public Task CancelAsync(TestEvent message, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public Task ExecuteAsync(TestEvent message, CancellationToken cancellationToken)
{
Console.WriteLine($"{message.Message}, event 1 has been executed");
return Task.CompletedTask;
}
}
[EventOrder(1)]
public class My2EventEventHandler : IEventHandler<TestEvent>
{
public Task CancelAsync(TestEvent message, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public Task ExecuteAsync(TestEvent message, CancellationToken cancellationToken)
{
Console.WriteLine($"{message.Message}, event 2 has been executed");
return Task.CompletedTask;
}
}
Each event executor must implement the IEventHandler<T>
interface and set the [EventOrder]
attribute to confirm the execution order of the events. The framework will execute the ExecuteAsync
methods of IEventHandler<T>
in order; if any ExecuteAsync
encounters an exception, CancelAsync
will be called in reverse order.
Due to the possibility of the program failing at any time, compensating through CancelAsync
is unlikely; CancelAsync
is mainly used for recording related information.
Middleware
The purpose of middleware is to facilitate developers in intercepting events, logging information, and implementing local transactions. If developers do not configure it, the framework will automatically create the DefaultEventMiddleware<TEvent>
type as middleware service for the event.
Example code for custom event middleware:
public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message, EventHandlerDelegate<TestEvent> next)
{
await next(messageHeader, message, CancellationToken.None);
}
public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent? message) => Task.CompletedTask;
public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex) => Task.FromResult(ConsumerState.Ack);
}
The next
delegate is an event execution chain constructed by the framework. In the middleware, you can intercept events and decide whether to execute the event chain.
When next()
is called in the middleware, the framework begins executing the events in order, that is, My1EventEventHandler
, My2EventEventHandler
, and so forth.
When an event has multiple executors, compensating through local transactions is essential since the program may fail at any moment.
For example, inject a database context into the middleware to start a transaction for executing database operations. If one of the EventHandler
executions fails, the execution chain will roll back, and the transaction will not be committed.
For more information, refer to Consumer Patterns for implementing retry and compensation methods in middleware.
Here is the translation:
public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
private readonly BloggingContext _bloggingContext;
public TestEventMiddleware(BloggingContext bloggingContext)
{
_bloggingContext = bloggingContext;
}
public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message, EventHandlerDelegate<TestEvent> next)
{
using (var transaction = _bloggingContext.Database.BeginTransaction())
{
await next(messageHeader, message, CancellationToken.None);
await transaction.CommitAsync();
}
}
public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent? message)
{
return Task.CompletedTask;
}
public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
{
return Task.FromResult(ConsumerState.Ack);
}
}
[EventOrder(0)]
public class My1EventEventHandler : IEventHandler<TestEvent>
{
private readonly BloggingContext _bloggingContext;
public My1EventEventHandler(BloggingContext bloggingContext)
{
_bloggingContext = bloggingContext;
}
public async Task CancelAsync(TestEvent message, CancellationToken cancellationToken)
{
Console.WriteLine($"{message.Message} has been compensated,[1]");
}
public async Task ExecuteAsync(TestEvent message, CancellationToken cancellationToken)
{
await _bloggingContext.Posts.AddAsync(new Post
{
Title = "Robinson Crusoe",
Content = "Just writing casually"
});
await _bloggingContext.SaveChangesAsync();
}
}
[EventOrder(1)]
public class My2EventEventHandler : IEventHandler<TestEvent>
{
private readonly BloggingContext _bloggingContext;
public My2EventEventHandler(BloggingContext bloggingContext)
{
_bloggingContext = bloggingContext;
}
public async Task CancelAsync(TestEvent message, CancellationToken cancellationToken)
{
Console.WriteLine($"{message.Id} has been compensated,[2]");
}
public async Task ExecuteAsync(TestEvent message, CancellationToken cancellationToken)
{
await _bloggingContext.Posts.AddAsync(new Post
{
Title = "Dream of the Red Chamber",
Content = "Jia Baoyu's first experience of romance"
});
await _bloggingContext.SaveChangesAsync();
throw new OperationCanceledException("Deliberate error");
}
}
When an exception occurs during event execution, it will also be retried, and the middleware TestEventMiddleware
's FaildAsync
and FallbackAsync
methods will be executed in sequence.
You can refer to Consumer Pattern or Retry.
Idempotence, Empty Compensation, and Hanging
In microservices, a service may crash and restart at any time, leading to problems such as idempotence, empty compensation, and hanging.
Idempotence
For example, when consumer A processes the message 01
, writing the result to the database, if Maomi.MQ has not pushed the acknowledgment (ack) to RabbitMQ yet, and the program restarts, the message 01
will be consumed again since it hasn’t been acknowledged. If it tries to write to the database again, it will cause duplication. Therefore, developers need to ensure that even if the same message is consumed multiple times, it won't lead to data inconsistency or duplicate operations in the database.
Not all scenarios prohibit duplicate consumption; we focus on situations where the message can only be consumed once, like inserting order information into the database. This requires each message to have a unique business ID or distributed snowflake ID, and during consumption, it needs to check if the ID already exists in the database to determine if the program is consuming the message again.
For example:
public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
private readonly BloggingContext _bloggingContext;
public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message, EventHandlerDelegate<TestEvent> next)
{
var existId = await _bloggingContext.Posts.AnyAsync(x => x.PostId == @event.Id);
if (existId)
{
return;
}
using (var transaction = _bloggingContext.Database.BeginTransaction())
{
await next(@event, CancellationToken.None);
await transaction.CommitAsync();
}
}
}
Empty Compensation
In distributed transactions, when orchestrating interfaces for services A, B, and C, if C encounters an exception, the distributed transaction manager will call C's compensation interface, and then call B and A.
Each call here is done through interface calls, so it cannot be processed within a single database transaction.
There are two scenarios here.
One scenario is that service C has already inserted data into the database, increasing the user's balance by +100
, but the program then crashes or times out, leading the transaction manager to consider it a failure and invoke the compensation interface. The compensation interface then removes the previously modified data without any issues.
In the second scenario, if service C has not completed the database operation and encounters an exception, and the transaction manager calls the compensation interface, if this interface decrements the user’s balance by -100
, that is incorrect.
Thus, the service must ensure whether the previous operation succeeded; if it has, it begins the rollback process, and if not, it returns a successful compensation result immediately.
Typically, Maomi.MQ does not encounter empty compensation issues because it is not a distributed transaction framework.
Although Maomi.MQ provides the CancelAsync()
method for executing rollback processes, this is mainly for developers to log information and not for executing compensations. Moreover, all processes in event orchestration are local, avoiding the empty compensation issue in distributed transactions, thus ensuring only local database transaction consistency is necessary, i.e., ensuring idempotence.
Hanging
In distributed transactions, there are forward execution requests and rollback requests. If the execution fails, the rollback interface will be called. However, due to the complexity of distributed networks, the transaction manager cannot determine the status of service C; service C behaves like a small black box. When a request fails, the transaction manager calls the compensation interface. After the compensation interface has been called, for various reasons, the forward execution interface might have been called as well, perhaps due to automatic retries in the gateway or a lag in service execution, resulting in the compensation interface executing before the forward execution. At this point, this distributed transaction is considered failed; the transaction manager has called the compensation process, concluding the transaction, but if service C subsequently executes an additional forward interface, increasing the user’s balance by +100
, it may seem normal while being actually incorrect. This phenomenon is termed as hanging.
Since Maomi.MQ does not deal with multi-service transaction orchestration, it only needs to ensure idempotence and doesn't have to worry about empty compensation and hanging issues. Whether idempotence needs assurance should depend on the business requirements, which is why Maomi.MQ does not design a distributed transaction mode with a local message table.
The configuration in the event mode is the same as in the consumer mode, so it won't be elaborated upon here. You can refer to Consumer Pattern.
Custom Consumers and Dynamic Subscriptions
Two main functionalities have been implemented:
- During program startup, consumers can be customized in configuration and model without using attribute annotations.
- A consumer can be started or stopped anytime after the program starts.
Reference example project: https://github.com/whuanle/Maomi.MQ/tree/main/example/consumer/DynamicConsumerWeb
Custom Consumers
Consumers can be implemented without using attribute annotations by simply implementing IConsumer<TEvent>
. During assembly scanning, consumers without attribute annotations will be ignored.
Define the consumer model:
public class DynamicCustomConsumer : IConsumer<TestEvent>
{
public Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
{
throw new NotImplementedException();
}
public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message)
{
throw new NotImplementedException();
}
public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
{
throw new NotImplementedException();
}
}
Next, manually configure consumers and attributes using DynamicConsumerTypeFilter
.
DynamicConsumerTypeFilter dynamicConsumerTypeFilter = new();
dynamicConsumerTypeFilter.AddConsumer(typeof(DynamicCustomConsumer), new ConsumerOptions
{
Queue = "test1"
});
dynamicConsumerTypeFilter.AddConsumer(typeof(DynamicCustomConsumer), new ConsumerOptions
{
Queue = "test2"
});
Then, when injecting services, manually add the type filter.
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AutoQueueDeclare = true;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
// ... ...
};
}, [typeof(Program).Assembly], [
new ConsumerTypeFilter(), // Consumer type filter
new EventBusTypeFilter(), // Event bus type filter
dynamicConsumerTypeFilter // Dynamic consumer filter
]);
Dynamic Subscriptions
After the program starts, consumers can be dynamically started or stopped through the IDynamicConsumer
service. Consumers that were already running at startup will not be controlled by dynamic subscriptions and cannot be stopped during runtime.
Dynamically starting consumers:
private readonly IMessagePublisher _messagePublisher;
private readonly IDynamicConsumer _dynamicConsumer;
[HttpPost("create")]
public async Task<string> CreateConsumer([FromBody] ConsumerDto consumer)
{
foreach (var item in consumer.Queues)
{
await _dynamicConsumer.ConsumerAsync<MyConsumer, TestEvent>(new ConsumerOptions(item));
}
return "ok";
}
If you do not want to define a model class, you can also use a functional approach:
foreach (var item in consumer.Queues)
{
var consumerTag = await _dynamicConsumer.ConsumerAsync<TestEvent>(
consumerOptions: new ConsumerOptions(item),
execute: async (header, message) =>
{
await Task.CompletedTask;
},
faild: async (header, ex, retryCount, message) => { },
fallback: async (header, message, ex) => ConsumerState.Ack
);
}
return "ok";
To stop consumers dynamically, you can use the queue name:
[HttpPost("stop")]
public async Task<string> StopConsumer([FromBody] ConsumerDto consumer)
{
foreach (string queueName in consumer.Queues)
{
await _dynamicConsumer.StopConsumerAsync(queueName);
}
return "ok";
}
You can also use the consumer tag:
var consumerTag = await _dynamicConsumer.ConsumerAsync<MyConsumer, TestEvent>(new ConsumerOptions(item));
await _dynamicConsumer.StopConsumerTagAsync(consumerTag);
Configuration
When introducing the Maomi.MQ framework, various properties can be configured, shown in the example and description below:
// this.
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
// Required: The current program node, used to configure distributed snowflake ID.
// Setting WorkId can avoid ID duplication under high concurrency for the same message.
options.WorkId = 1;
// Whether to automatically create queues.
options.AutoQueueDeclare = true;
// The current application name, used to identify the publisher and consumer programs.
options.AppName = "myapp";
// Required: RabbitMQ configuration.
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
options.Port = 5672;
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly]); // Assemblies to be scanned.
Developers can manually manage RabbitMQ connections through ConnectionFactory
, allowing for fault recovery and customized connection parameters.
Type Filters
The interface for type filters is ITypeFilter
, which is responsible for scanning and identifying types, adding them as consumers. By default, two type filters are enabled: ConsumerTypeFilter
and EventBusTypeFilter
, which will identify and use consumer models and event bus consumer modes that require corresponding attribute annotations.
Additionally, there is a dynamic consumer filter DynamicConsumerTypeFilter
for custom consumer models and configurations.
If developers need to customize consumer models or integrate an in-memory event bus such as MediatR, they only need to implement ITypeFilter
.
Interceptors
Maomi.MQ enables both the consumer pattern and event bus pattern by default, and developers can freely configure whether to enable them.
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AutoQueueDeclare = true;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
// ... ...
};
},
[typeof(Program).Assembly],
[new ConsumerTypeFilter(), new EventBusTypeFilter()]); // Inject consumer and event bus modes.
Furthermore, the framework offers dynamic configuration interception, allowing modifications to consumer attribute configurations at program startup.
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AutoQueueDeclare = true;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
// ... ...
};
},
[typeof(Program).Assembly],
[new ConsumerTypeFilter(ConsumerInterceptor), new EventBusTypeFilter(EventInterceptor)]);
<br />
Implementing the interceptor functions:
```csharp
private static RegisterQueue ConsumerInterceptor(IConsumerOptions consumerOptions, Type consumerType)
{
var newConsumerOptions = new ConsumerOptions(consumerOptions.Queue);
consumerOptions.CopyFrom(newConsumerOptions);
// Modify the configuration in newConsumerOptions
return new RegisterQueue(true, consumerOptions);
}
private static RegisterQueue EventInterceptor(IConsumerOptions consumerOptions, Type eventType)
{
if (eventType == typeof(TestEvent))
{
var newConsumerOptions = new ConsumerOptions(consumerOptions.Queue);
consumerOptions.CopyFrom(newConsumerOptions);
newConsumerOptions.Queue = newConsumerOptions.Queue + "_1";
return new RegisterQueue(true, newConsumerOptions);
}
return new RegisterQueue(true, consumerOptions);
}</code></pre>
<p><br /></p>
<p>Developers can modify configuration values in the interceptors.</p>
<p>The interceptor has a return value; when it returns false, the framework will ignore the registration of that consumer or event, meaning that the queue will not start the consumer.</p>
<h3>Consumer Configuration</h3>
<p>The logic processing for consumers in Maomi.MQ is traversed through the properties of the IConsumerOptions interface. Whether it's custom consumers, event bus, or other consumption modes, they are all registered as IConsumerOptions to the framework.</p>
<p>The configuration is described as follows:</p>
<table>
<thead>
<tr>
<th>Name</th>
<th>Type</th>
<th>Required</th>
<th>Default Value</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>Queue</td>
<td>string</td>
<td>Required</td>
<td></td>
<td>Queue name</td>
</tr>
<tr>
<td>DeadExchange</td>
<td>string?</td>
<td>Optional</td>
<td></td>
<td>Bind to dead letter exchange name</td>
</tr>
<tr>
<td>DeadRoutingKey</td>
<td>string?</td>
<td>Optional</td>
<td></td>
<td>Bind to dead letter routing key</td>
</tr>
<tr>
<td>Expiration</td>
<td>int</td>
<td>Optional</td>
<td></td>
<td>Queue message expiration time, in milliseconds</td>
</tr>
<tr>
<td>Qos</td>
<td>ushort</td>
<td>Optional</td>
<td>100</td>
<td>The number of messages that can be fetched at once to improve consumption capacity</td>
</tr>
<tr>
<td>RetryFaildRequeue</td>
<td>bool</td>
<td>Optional</td>
<td>false</td>
<td>Whether to return to the queue when the number of consumption failures reaches a condition</td>
</tr>
<tr>
<td>AutoQueueDeclare</td>
<td>AutoQueueDeclare</td>
<td>Optional</td>
<td>None</td>
<td>Whether to create the queue automatically</td>
</tr>
<tr>
<td>BindExchange</td>
<td>string?</td>
<td>Optional</td>
<td></td>
<td>Bind exchange name</td>
</tr>
<tr>
<td>ExchangeType</td>
<td>string?</td>
<td>Optional</td>
<td></td>
<td>Type of the BoundExchange</td>
</tr>
<tr>
<td>RoutingKey</td>
<td>string?</td>
<td>Optional</td>
<td></td>
<td>The routing key name of the BoundExchange</td>
</tr>
</tbody>
</table>
<p><br /></p>
<p>As mentioned earlier, the framework scans the consumer and event bus consumer attributes to generate an IConsumerOptions binding for that consumer, which can be modified through interceptor functions.</p>
<pre><code class="language-csharp">new ConsumerTypeFilter((consumerOptions, type) =>
{
var newConsumerOptions = new ConsumerOptions(consumerOptions.Queue);
consumerOptions.CopyFrom(newConsumerOptions);
newConsumerOptions.Queue = "app1_" + newConsumerOptions.Queue;
return new RegisterQueue(true, consumerOptions);
});</code></pre>
<p><br /></p>
<p>Additionally, there is an IRoutingProvider interface that can dynamically map new configurations. After the program starts, Maomi.MQ will automatically create exchanges and queues, and will call the IRoutingProvider to map the new configuration. When publishing messages, if using model classes, it will also map the configuration through the IRoutingProvider, so developers can dynamically modify configuration properties by implementing this interface.</p>
<pre><code class="language-csharp">services.AddSingleton<IRoutingProvider, MyRoutingProvider>();</code></pre>
<h3>Environment Isolation</h3>
<blockquote>
<p>Currently considering whether to support a multi-tenant model.</p>
</blockquote>
<p><br /></p>
<p>During development, it is often necessary to debug locally. After the local program starts, it will connect to the development server. When a queue receives messages, it will push messages to one of the consumers. At this point, when I publish a message during local debugging, my local program may not receive it but rather be consumed by the program in the development environment.</p>
<p>In this case, we want to isolate the local debugging environment from the development environment by using the VirtualHost feature provided by RabbitMQ.</p>
<p><br /></p>
<p>First, create a new VirtualHost via a PUT request to RabbitMQ. Please refer to the documentation: <a href="https://www.rabbitmq.com/docs/vhosts#using-http-api">https://www.rabbitmq.com/docs/vhosts#using-http-api</a></p>
<p><img src="https://www.whuanle.cn/wp-content/uploads/2025/02/post-21721-67b7ba0b929a8.png" alt="image-20240612193415867" /></p>
<p><br />Then, configure the VirtualHost name in the code:</p>
<pre><code class="language-csharp">builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AutoQueueDeclare = true;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
options.Port = 5672;
#if DEBUG
options.VirtualHost = "debug";
#endif
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly]);</code></pre>
<p><br /></p>
<p>When debugging locally, publishing and receiving messages will be isolated from the server environment.</p>
<h3>Snowflake ID Configuration</h3>
<p>Maomi.MQ.RabbitMQ uses an IdGenerator to generate snowflake IDs, ensuring that each event has a unique ID within the cluster.</p>
<p>The framework creates snowflake IDs through the IIdFactory interface. You can replace the <code>IIdFactory</code> interface to configure snowflake ID generation rules.</p>
<pre><code class="language-csharp">services.AddSingleton<IIdFactory>(new DefaultIdFactory((ushort)optionsBuilder.WorkId));</code></pre>
<p><br /></p>
<p>Example:</p>
<pre><code class="language-csharp">public class DefaultIdFactory : IIdFactory
{
/// <summary>
/// Initializes a new instance of the <see cref="DefaultIdFactory"/> class.
/// </summary>
/// <param name="workId"></param>
public DefaultIdFactory(ushort workId)
{
var options = new IdGeneratorOptions(workId) { SeqBitLength = 10 };
YitIdHelper.SetIdGenerator(options);
}
/// <inheritdoc />
public long NextId() => YitIdHelper.NextId();
}</code></pre>
<p><br /></p>
<p>For details on the IdGenerator framework's snowflake ID configuration, please refer to:</p>
<p><a href="https://github.com/yitter/IdGenerator/tree/master/C%23">https://github.com/yitter/IdGenerator/tree/master/C%23</a></p>
<h3>Debugging</h3>
<p>The Maomi.MQ framework provides symbol packages in nuget.org, which can be very convenient when debugging the Maomi.MQ framework.</p>
<p><br /></p>
<p><img src="https://www.whuanle.cn/wp-content/uploads/2025/02/post-21721-67b7ba0b96316.png" alt="image-20240622110409621" /></p>
<p><img src="https://www.whuanle.cn/wp-content/uploads/2025/02/post-21721-67b7ba0b99d8f.png" alt="image-20240622110718661" /></p>
<p><br />It is recommended to load all modules and start the program the first time.</p>
<p><img src="https://www.whuanle.cn/wp-content/uploads/2025/02/post-21721-67b7ba0c972e4.png" alt="image-20240622112130250" /></p>
<p><br /></p>
<p>Later, you can manually select which modules to load.</p>
<p><img src="https://www.whuanle.cn/wp-content/uploads/2025/02/post-21721-67b7ba0c9b053.png" alt="image-20240622110227993" /></p>
<p><br />Press F12 to the location you want to debug, and after starting the program, you can hit breakpoints.</p>
<p><img src="https://www.whuanle.cn/wp-content/uploads/2025/02/post-21721-67b7ba0ca0150.png" alt="image-20240622112507607" /></p>
<p><br /></p>
<p>If you need to debug Maomi.MQ.RabbitMQ, you can set a breakpoint in your program (not in Maomi.MQ), wait for the program to reach this breakpoint after startup, configure the symbols, and click to load all symbols.</p>
<p>Then you can set breakpoints in Maomi.MQ.RabbitMQ to enter debugging.</p>
<p><img src="https://www.whuanle.cn/wp-content/uploads/2025/02/post-21721-67b7ba0ca3c8a.png" alt="image-20240622112753150" /></p>
<h2>Qos Concurrency and Order</h2>
<p>Both consumer mode and event-based mode are configured through attributes regarding consumption properties, of which Qos is one of the important properties. The default value of Qos is 100, which refers to the number of unacknowledged messages a consumer is allowed to receive at once.</p>
<p><br /></p>
<h3>Qos Scenarios</h3>
<p>All consumers share a single IConnection object, while each consumer has its own IChannel.</p>
<p><br /></p>
<p>For queues with high consumption frequency but cannot be consumed concurrently, be sure to set <code>Qos = 1</code>; this way RabbitMQ will push messages one by one, ensuring strict order during consumption.</p>
<pre><code class="language-csharp">[Consumer("web1", Qos = 1)]
public class MyConsumer : IConsumer<TestEvent>
{
}</code></pre>
<p><br /></p>
<p>When higher throughput is needed and order of consumption is not required, set the Qos higher; the RabbitMQ Client framework will improve throughput through prefetching, allowing multiple messages to be consumed concurrently.</p>
<h3>Concurrency and Exception Handling</h3>
<p>It's primarily handled based on Qos and RetryFaildRequeue, where RetryFaildRequeue is true by default.</p>
<p>In the case of <code>Qos = 1</code>, combined with <code>IConsumerOptions.RetryFaildRequeue</code> and <code>FallbackAsync</code>, if the message is returned to the queue, the next time it will still consume that message.</p>
<p>In the case of <code>Qos > 1</code>, due to concurrency, failed messages can be returned to the queue, but it doesn't guarantee that the next time that exact message will be consumed immediately.</p>
<p>With <code>Qos</code> set to 1, it guarantees strict sequential consumption, and ExecptionRequeue and RetryFaildRequeue affect whether failed messages are returned to the queue. If they are returned to the queue, the next consumption will continue from the previously failed message. If the error (such as a bug) is unresolved, it could lead to a cycle of consume, fail, return to queue, and re-consume.</p>
<p><br /></p>
<h3>How to Set Qos</h3>
<p>Note that in RabbitMQ Client version 7.0, many features have been added; one of these is the consumer concurrency thread count, ConsumerDispatchConcurrency, which defaults to 1. If this configuration is not modified, it may lead to very slow consumption. Each IChannel can have this property set individually, or it can be set globally in the ConnectionFactory.</p>
<pre><code class="language-csharp">services.AddMaomiMQ(options =>
{
options.WorkId = 1;
options.AppName = "myapp-consumer";
options.Rabbit = (options) =>
{
options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
options.Port = 5672;
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
options.ConsumerDispatchConcurrency = 100; // Set here
};
}, new System.Reflection.Assembly[] { typeof(Program).Assembly });</code></pre>
<p><br /></p>
<p>In Maomi.MQ.RabbitMQ, Qos refers to prefetch_count, with a value range of 0-65535, where 0 means no limit. Generally, it is recommended to set the default to 100; setting Qos higher does not necessarily improve consumption efficiency.</p>
<p>Qos is not equal to the number of consumer concurrency threads; rather, it is the number of unacknowledged messages that can be received at once. Consumers can fetch N messages simultaneously and then consume them individually.</p>
<p><br /></p>
<p>According to the official <a href="https://www.rabbitmq.com/blog/2014/04/14/finding-bottlenecks-with-rabbitmq-3-3">Finding bottlenecks with RabbitMQ 3.3 | RabbitMQ</a> documentation, prefetch count can affect consumer queue utilization.</p>
<table>
<thead>
<tr>
<th>Prefetch limit</th>
<th>Consumer utilization</th>
</tr>
</thead>
<tbody>
<tr>
<td>1</td>
<td>14%</td>
</tr>
<tr>
<td>3</td>
<td>25%</td>
</tr>
<tr>
<td>10</td>
<td>46%</td>
</tr>
<tr>
<td>30</td>
<td>70%</td>
</tr>
<tr>
<td>1000</td>
<td>74%</td>
</tr>
</tbody>
</table>
<p><br /></p>
<p>Generally, developers need to consider various factors to configure Qos, taking into account network bandwidth, the size of each message, the message publishing frequency, estimating the total resource usage of the program, and service instances.</p>
<p>When the program requires strict sequential consumption, it can be set to 1.</p>
<p>If the connection to RabbitMQ is within an intranet, network bandwidth limitations can be disregarded; when the message content is very large and requires a high degree of concurrency, Qos can be set to 0. When Qos = 0, RabbitMQ.Client will try its best to utilize the machine's performance; please use with caution.</p>
<h3>Qos and Consumption Performance Testing</h3>
<p>To illustrate the impact of different Qos on the performance of consumer programs, below is the code to test consuming 1 million messages with different Qos settings, pushing 1 million data to RabbitMQ server before starting the consumer.</p>
<p>Define the event:</p>
<pre><code class="language-csharp">public class TestEvent
{
public int Id { get; set; }
public string Message { get; set; }
public int[] Data { get; set; }
public override string ToString()
{
return Id.ToString();
}
}</code></pre>
<p>The message publisher code for the QosPublisher project is as follows, used to push 1 million messages to the server, with each message being about 800 bytes in size, less than 1k.</p>
<pre><code>
<br />
```csharp
[HttpGet("publish")]
public async Task<string> Publisher()
{
int totalCount = 0;
List<Task> tasks = new();
var message = string.Join(",", Enumerable.Range(0, 100));
var data = Enumerable.Range(0, 100).ToArray();
for (var i = 0; i < 100; i++)
{
var task = Task.Factory.StartNew(async () =>
{
using var singlePublisher = _messagePublisher.CreateSingle();
for (int k = 0; k < 10000; k++)
{
var count = Interlocked.Increment(ref totalCount);
await singlePublisher.PublishAsync(exchange: string.Empty, routingKey: "qos", message: new TestEvent
{
Id = count,
Message = message,
Data = data
});
}
});
tasks.Add(task);
}
await Task.WhenAll(tasks);
return "ok";
}
<br />
After waiting for a while, the server already has 1 million messages.

Create the consumer project QosConsole, intentionally adding a 50ms delay for the consumer, and run the program.
```csharp
class Program
{
static async Task Main()
{
var host = new HostBuilder()
.ConfigureLogging(options =>
{
options.AddConsole();
options.AddDebug();
})
.ConfigureServices(services =>
{
services.AddMaomiMQ(options =>
{
options.WorkId = 1;
options.AppName = "myapp-consumer";
options.Rabbit = (options) =>
{
options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
options.Port = 5672;
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
options.ConsumerDispatchConcurrency = 1000;
};
}, new System.Reflection.Assembly[] { typeof(Program).Assembly });
}).Build();
Console.WriteLine($"start time:{DateTime.Now}");
await host.RunAsync();
}
}
[Consumer("qos", Qos = 30)]
public class QosConsumer : IConsumer<TestEvent>
{
private static int Count = 0;
public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
{
Interlocked.Increment(ref Count);
Console.WriteLine($"date time:{DateTime.Now},id:{message.Id}, count:{Count}");
await Task.Delay(50);
}
public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message)
{
return Task.CompletedTask;
}
public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
{
return Task.FromResult(ConsumerState.Ack);
}
}</code></pre>
<p>To have an intuitive comparison, here is a native consumer project RabbitMQConsole using RabbitMQ.Client.</p>
<pre><code class="language-csharp">static async Task Main()
{
ConnectionFactory connectionFactory = new ConnectionFactory
{
HostName = Environment.GetEnvironmentVariable("RABBITMQ")!,
Port = 5672,
ConsumerDispatchConcurrency = 1000
};
var connection = await connectionFactory.CreateConnectionAsync();
var channel = await connection.CreateChannelAsync(new CreateChannelOptions(
publisherConfirmationsEnabled: false,
publisherConfirmationTrackingEnabled: false,
consumerDispatchConcurrency: 1000));
var messageSerializer = new DefaultMessageSerializer();
var consumer = new AsyncEventingBasicConsumer(channel);
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1000, global: true);
consumer.ReceivedAsync += async (sender, eventArgs) =>
{
var testEvent = messageSerializer.Deserialize<TestEvent>(eventArgs.Body.Span);
Console.WriteLine($"start time:{DateTime.Now} {testEvent.Id}");
await Task.Delay(50);
await channel.BasicAckAsync(eventArgs.DeliveryTag, false);
};
await channel.BasicConsumeAsync(
queue: "qos",
autoAck: false,
consumer: consumer);
while (true)
{
await Task.Delay(10000);
}
}</code></pre>
<p><br /></p>
<p>Maomi.MQ.RabbitMQ is wrapped based on RabbitMQ.Client. When consuming with Maomi.MQ.RabbitMQ, additional time and resources are inevitably consumed due to the need for logging, additional observability information, and building new dependency injection containers, so it is necessary to compare both.</p>
<p><br /></p>
<p>Start the programs in Release mode in VS, using a single process, separately starting QosConsole and RabbitMQConsole to test and measure consumption speeds under different Qos conditions.</p>
<h3>Stability Testing</h3>
<p>You can refer to <a href="7opentelemetry.md">Observability</a> to set up a monitoring environment, taking into account the code in OpenTelemetryConsole, which has three consumers in one program that publish and consume messages.</p>
<p><br /></p>
<p>Approximately 560 messages are published or consumed per second, with about 9 million messages published and consumed within three hours.</p>
<p><img src="https://www.whuanle.cn/wp-content/uploads/2025/02/post-21721-67b7ba0cab3a1.png" alt="image-20240629101521224" /></p>
<p><img src="https://www.whuanle.cn/wp-content/uploads/2025/02/post-21721-67b7ba0caf2b8.png" alt="image-20240629101645663" /></p>
<p><br /></p>
<p>Memory usage is stable, the machine has low CPU performance, and periodic GC and other factors also consume CPU, with fluctuations as follows:</p>
<p><img src="https://www.whuanle.cn/wp-content/uploads/2025/02/post-21721-67b7ba0cb295a.png" alt="image-20240629101738893" /></p>
<h2>Retry</h2>
<h3>Retry Timing</h3>
<p>When the consumer's <code>ExecuteAsync</code> method encounters an exception, the framework will retry by default three times, using an exponential backoff strategy with a base of 2 for retry intervals.</p>
<p>After the first failure, it will retry immediately, then wait 2 seconds for the next retry, after the second failure it will wait 4 seconds, followed by 8 and 16 seconds.</p>
<p>Maomi.MQ.RabbitMQ uses the Polly framework for retry strategy management, with the default retry interval strategy generated through the DefaultRetryPolicyFactory service.</p>
<p><br /></p>
<p>An example of the DefaultRetryPolicyFactory code is as follows:</p>
<pre><code class="language-csharp">/// <summary>
/// Default retry policy.<br />
/// 默认的策略提供器.
/// </summary>
public class DefaultRetryPolicyFactory : IRetryPolicyFactory
{
protected readonly int RetryCount = 3;
protected readonly int RetryBaseDelaySeconds = 2;
protected readonly ILogger<DefaultRetryPolicyFactory> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="DefaultRetryPolicyFactory"/> class.
/// </summary>
/// <param name="logger"></param>
public DefaultRetryPolicyFactory(ILogger<DefaultRetryPolicyFactory> logger)
{
_logger = logger;
RetryCount = 3;
RetryBaseDelaySeconds = 2;
}
/// <inheritdoc/>
public virtual Task<AsyncRetryPolicy> CreatePolicy(string queue, string id)
{
// Create a retry policy.
// 创建重试策略.
var retryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(
retryCount: RetryCount,
sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(RetryBaseDelaySeconds, retryAttempt)),
onRetry: async (exception, timeSpan, retryCount, context) =>
{
_logger.LogDebug("Retry execution event, queue [{Queue}], retry count [{RetryCount}], timespan [{TimeSpan}]", queue, retryCount, timeSpan);
await FaildAsync(queue, exception, timeSpan, retryCount, context);
});
return Task.FromResult(retryPolicy);
}
public virtual Task FaildAsync(string queue, Exception ex, TimeSpan timeSpan, int retryCount, Context context)
{
return Task.CompletedTask;
}
}</code></pre>
<p><br /></p>
<p>You can replace the default retry strategy service by implementing the IRetryPolicyFactory interface.</p>
<pre><code class="language-csharp">services.AddSingleton<IRetryPolicyFactory, DefaultRetryPolicyFactory>();</code></pre>
<h3>Persisting Remaining Retry Counts</h3>
<p>When the consumer fails to process a message, the default consumer will retry three times. If it has already retried twice, and the program restarts, the next time it tries to consume that message, it will retry one last time.</p>
<p>It is necessary to remember the retry counts so that when the program restarts, it can retry based on the remaining counts.</p>
<p><br /></p>
<p>Import Maomi.MQ.RedisRetry package.</p>
<p>Example configuration:</p>
<pre><code class="language-csharp">builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AutoQueueDeclare = true;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
// ... ...
};
}, [typeof(Program).Assembly]);
builder.Services.AddMaomiMQRedisRetry((s) =>
{
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.3.248");
IDatabase db = redis.GetDatabase();
return db;
});</code></pre>
<p><br /></p>
<p>The default key will only be kept for 5 minutes. This means that if the program is restarted and tries to consume the message after five minutes, the remaining retry counts will be reset.</p>
<h2>Dead Letter Queue</h2>
<p>You can bind a dead letter queue to a consumer or event. When the messages in that queue fail and will not be requeued, they are pushed to the dead letter queue, for example:</p>
<p><br /></p>
<pre><code class="language-csharp">[Consumer("ConsumerWeb_dead", Qos = 1, DeadQueue = "ConsumerWeb_dead_queue", RetryFaildRequeue = false)]
public class DeadConsumer : IConsumer<DeadEvent>
{
// Consume
public Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
{
Console.WriteLine($"event id:{message.Id}");
throw new OperationCanceledException();
}
}
// Messages that fail from ConsumerWeb_dead will be consumed by this consumer.
[Consumer("ConsumerWeb_dead_queue", Qos = 1)]
public class DeadQueueConsumer : IConsumer<DeadQueueEvent>
{
// Consume
public Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
{
Console.WriteLine($"Dead letter queue, event id:{message.Id}");
return Task.CompletedTask;
}
}</code></pre>
<p><img src="https://www.whuanle.cn/wp-content/uploads/2025/02/post-21721-67b7ba0cb5e5e.png" alt="image-20240601012127169" /></p>
<p><br /></p>
<p>When using a dead letter queue, make sure to set <code>RetryFaildRequeue</code> to false so that after multiple retry failures, the consumer sends a nack signal to RabbitMQ, which then forwards the message to the bound dead letter queue.</p>
<h3>Delayed Queue</h3>
<p>Create a consumer that inherits from EmptyConsumer, so that the queue is created when the program starts, but it will not create an IConnection for consumption. Then set the expiration time for the queue messages and bind it to a dead letter queue. The bound dead letter queue can be implemented using either the consumer mode or event mode.</p>
<p><br /></p>
<pre><code class="language-csharp">[Consumer("ConsumerWeb_dead_2", Expiration = 6000, DeadQueue = "ConsumerWeb_dead_queue_2")]
public class EmptyDeadConsumer : EmptyConsumer<DeadEvent>
{
}
// Messages that fail from ConsumerWeb_dead will be consumed by this consumer.
[Consumer("ConsumerWeb_dead_queue_2", Qos = 1)]
public class Dead_2_QueueConsumer : IConsumer<DeadQueueEvent>
{
// Consume
public Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
{
Console.WriteLine($"event id:{message.Id} has expired");
return Task.CompletedTask;
}
}</code></pre>
<p><br /></p>
<p>For example, after a user places an order, if payment is not received within 15 minutes, the message will automatically expire and cancel the order.</p>
<h2>Observability</h2>
<p>Please refer to the ActivitySourceApi and OpenTelemetryConsole examples.</p>
<pre><code>
<br />
<br />
### Deployment Environment
To quickly deploy an observability platform, you can use the sample package provided by OpenTelemetry to swiftly deploy related services, which include middleware such as Prometheus, Grafana, Jaeger, and others.
The official OpenTelemetry integration project address: [OpenTelemetry Demo](https://github.com/open-telemetry/opentelemetry-demo)
<br />
To download the sample repository's source code:
```csharp
git clone -b 1.12.0 https://github.com/open-telemetry/opentelemetry-demo.git
Please note not to download the main branch, as it may contain bugs.
You may set the version number to the latest version.
Since the docker-compose.yml
sample includes a large number of demo microservices, we only need the infrastructure. Therefore, we need to open the docker-compose.yml
file and retain only valkey-cart
under the Core Demo Services
and Dependent Services
nodes, deleting the others. Alternatively, you can directly click to download the version modified by the author and replace it in the project: docker-compose.yml
Note that different versions may vary.
Execute the command to deploy the observability services:
docker-compose up -d
The opentelemetry-collector-contrib
is used to collect observability information for link tracing, supporting both gRPC and HTTP protocols with the following listening ports:
Port | Protocol | Endpoint | Function |
---|---|---|---|
4317 | gRPC | n/a | Accepts traces in OpenTelemetry OTLP format (Protobuf). |
4318 | HTTP | /v1/traces |
Accepts traces in OpenTelemetry OTLP format (Protobuf and JSON). |
After container port mapping, the external ports may not be 4317 or 4318 anymore.
Import the Maomi.MQ.Instrumentation
package and other related OpenTelemetry packages.
<PackageReference Include="Maomi.MQ.Instrumentation" Version="1.1.0" />
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.8.1" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.8.1" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.8.1" />
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.8.1" />
Import the namespaces:
using OpenTelemetry.Logs;
using OpenTelemetry.Metrics;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using Maomi.MQ;
using OpenTelemetry.Exporter;
using RabbitMQ.Client;
using System.Reflection;
using OpenTelemetry;
Inject link tracing and monitoring for automatic reporting to OpenTelemetry.
builder.Services.AddOpenTelemetry()
.ConfigureResource(resource => resource.AddService(serviceName))
.WithTracing(tracing =>
{
tracing.AddMaomiMQInstrumentation(options =>
{
options.Sources.AddRange(MaomiMQDiagnostic.Sources);
options.RecordException = true;
})
.AddAspNetCoreInstrumentation()
.AddOtlpExporter(options =>
{
options.Endpoint = new Uri(Environment.GetEnvironmentVariable("OTLPEndpoint")! + "/v1/traces");
options.Protocol = OtlpExportProtocol.HttpProtobuf;
});
})
.WithMetrics(metrics =>
{
metrics.AddAspNetCoreInstrumentation()
.AddMaomiMQInstrumentation()
.AddOtlpExporter(options =>
{
options.Endpoint = new Uri(Environment.GetEnvironmentVariable("OTLPEndpoint")! + "/v1/metrics");
options.Protocol = OtlpExportProtocol.HttpProtobuf;
});
});
Link Tracing
After starting the ActivitySourceApi
service, perform publishing and consumption, and link tracing information will be automatically pushed to the OpenTelemetry Collector, which can be read through components such as Jaeger and Skywalking.
Open the Jaeger UI panel mapped to port 16686:
Since the publish and consumer are sibling traces rather than the same trace, related traces should be queried via Tags, formatted as event.id=xxx
.
Monitoring
Maomi.MQ
has built-in the following metrics:
Name | Description |
---|---|
maomimq_consumer_message_pull_count_total | Total messages pulled |
maomimq_consumer_message_faild_count_total | Total failed messages |
maomimq_consumer_message_received_Byte_bucket | |
maomimq_consumer_message_received_Byte_count | |
maomimq_consumer_message_received_Byte_sum | Total bytes received |
maomimq_publisher_message_count_total | Total messages sent |
maomimq_publisher_message_faild_count_total | Total failed sends |
maomimq_publisher_message_sent_Byte_bucket | |
maomimq_publisher_message_sent_Byte_count | |
maomimq_publisher_message_sent_Byte_sum | Total bytes sent |
Next, display the data in Grafana.
Download the template file: maomi.json
Then import the file in the Dashboards section of the Grafana panel to view the current message queue monitoring for all services.
Open Source Project Code Reference
The includes code for the OpenTelemetry.Instrumentation.MaomiMQ
project is sourced from OpenTelemetry .NET Contrib
文章评论