Document Explanation
Author: whuanle
Document URL: https://mmq.whuanle.cn
Repository URL: https://github.com/whuanle/Maomi.MQ
Author's Blog:
Introduction
Maomi.MQ is a message communication model project that currently supports only RabbitMQ.
Maomi.MQ.RabbitMQ is a communication model designed specifically for RabbitMQ publishers and consumers, greatly simplifying the code for message publishing and handling, and providing a range of convenient and practical features. Developers can achieve high-performance consumption and event orchestration through the consumption model provided by the framework, which also supports publisher confirmation mechanisms, custom retry mechanisms, compensation mechanisms, dead-letter queues, delayed queues, connection channel multiplexing, and other convenient functionalities. Developers can focus more on business logic, using the Maomi.MQ.RabbitMQ framework to simplify inter-process message communication, making it easier and more reliable.
Additionally, the framework supports distributed observability through built-in runtime APIs which can be further enhanced by using frameworks like OpenTelemetry to collect observability data and push it to infrastructure platforms.
Quick Start
This article will briefly introduce how to use Maomi.MQ.RabbitMQ.
Introduce the Maomi.MQ.RabbitMQ package and inject services in the Web configuration:
builder.Services.AddSwaggerGen();
builder.Services.AddLogging();
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = "192.168.3.248";
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly]);
var app = builder.Build();
- WorkId: Specifies the node ID used for generating distributed Snowflake IDs, defaulting to 0.
- AppName: Used to identify the producer of the message, as well as for identifying the producer or consumer in logs and traceability.
- Rabbit: RabbitMQ client configuration, please refer to ConnectionFactory.
If it is a console project, you need to include the Microsoft.Extensions.Hosting package.
var host = new HostBuilder()
.ConfigureLogging(options =>
{
options.AddConsole();
options.AddDebug();
})
.ConfigureServices(services =>
{
services.AddMaomiMQ(options =>
{
options.WorkId = 1;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = "192.168.3.248";
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, new System.Reflection.Assembly[] { typeof(Program).Assembly });
// Your services.
services.AddHostedService<MyPublishAsync>();
}).Build();
await host.RunAsync();
Define the message model class, which will be serialized into binary content and transmitted to the RabbitMQ server.
public class TestEvent
{
public int Id { get; set; }
public override string ToString()
{
return Id.ToString();
}
}
Define the consumer, which needs to implement the IConsumer<TEvent>
interface and use the [Consumer]
attribute annotation to configure the consumer properties.
[Consumer("test", Qos = 1, RetryFaildRequeue = true)]
public class MyConsumer : IConsumer<TestEvent>
{
private static int _retryCount = 0;
// Consume
public async Task ExecuteAsync(EventBody<TestEvent> message)
{
Console.WriteLine($"Event id: {message.Id} {DateTime.Now}");
await Task.CompletedTask;
}
// Executed on each consumption failure
public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message) => Task.CompletedTask;
// Compensation
public Task<bool> FallbackAsync(EventBody<TestEvent>? message) => Task.FromResult(true);
}
Then inject the IMessagePublisher service to publish messages:
[ApiController]
[Route("[controller]")]
public class IndexController : ControllerBase
{
private readonly IMessagePublisher _messagePublisher;
public IndexController(IMessagePublisher messagePublisher)
{
_messagePublisher = messagePublisher;
}
[HttpGet("publish")]
public async Task<string> Publisher()
{
// Publish message
await _messagePublisher.PublishAsync(queue: "test", message: new TestEvent
{
Id = i
});
return "ok";
}
}
Message Publisher
The message publisher is used to push messages to the RabbitMQ server.
By injecting the IMessagePublisher interface, you can push messages to RabbitMQ. Reference the example project at PublisherWeb.
Define an event model class:
public class TestEvent
{
public int Id { get; set; }
public override string ToString()
{
return Id.ToString();
}
}
After injecting the IMessagePublisher service, publish the message:
[ApiController]
[Route("[controller]")]
public class IndexController : ControllerBase
{
private readonly IMessagePublisher _messagePublisher;
public IndexController(IMessagePublisher messagePublisher)
{
_messagePublisher = messagePublisher;
}
[HttpGet("publish")]
public async Task<string> Publisher()
{
for (var i = 0; i < 100; i++)
{
await _messagePublisher.PublishAsync(queue: "PublisherWeb", message: new TestEvent
{
Id = i
});
}
return "ok";
}
}
IMessagePublisher
IMessagePublisher is relatively simple, containing only three methods and one property:
public ConnectionPool ConnectionPool { get; }
Task PublishAsync<TEvent>(string queue, TEvent message, Action<IBasicProperties>? properties = null)
where TEvent : class;
Task PublishAsync<TEvent>(string queue, TEvent message, IBasicProperties properties);
// It is not recommended to use this interface directly.
Task CustomPublishAsync<TEvent>(string queue, EventBody<TEvent> message, BasicProperties properties);
The three PublishAsync methods are used for publishing events, and the ConnectionPool property is used to obtain the RabbitMQ.Client.IConnection object.
As BasicProperties is directly exposed, developers have complete freedom to configure RabbitMQ's native message properties. Therefore, Maomi.MQ.RabbitMQ does not need to over-design and only provides simple functional interfaces.
For example, you can configure the expiration time of a single message through BasicProperties:
await _messagePublisher.PublishAsync(queue: "RetryWeb", message: new TestEvent
{
Id = i
}, (BasicProperties p) =>
{
p.Expiration = "1000";
});
When a message is published, the framework actually transmits an EventBody<T>
type, which contains some important additional message attributes that greatly facilitate message processing and fault diagnosis.
public class EventBody<TEvent>
{
// Unique event id.
public long Id { get; init; }
// Queue.
public string Queue { get; init; } = null!;
// App name.
public string Publisher { get; init; } = null!;
// Event creation time.
public DateTimeOffset CreationTime { get; init; }
// Event body.
public TEvent Body { get; init; } = default!;
}
Maomi.MQ implements IMessagePublisher through the DefaultMessagePublisher type, which defaults to a Singleton lifecycle:
services.AddSingleton<IMessagePublisher, DefaultMessagePublisher>();
The lifecycle is not important; if you need to change the default lifecycle, you can manually modify it.
services.AddScoped<IMessagePublisher, DefaultMessagePublisher>();
Developers can also implement the IMessagePublisher interface themselves; for specific examples, refer to the DefaultMessagePublisher type.
Connection Pool
To reuse RabbitMQ.Client.IConnection, Maomi.MQ.RabbitMQ internally implements the ConnectionPool type, which maintains reusable RabbitMQ.Client.IConnection objects through an object pool.
The default number of RabbitMQ.Client.IConnection in the object pool is 0; only when a connection is actively used does it get created from the object pool. Connection objects will automatically increase with the program's concurrency levels, but the default maximum number of connection objects is Environment.ProcessorCount * 2
.
In addition to the PublishAsync method offered by the IMessagePublisher interface for publishing events, developers can also obtain connection objects from the ConnectionPool and must return them to the connection pool through the ConnectionPool.Return()
method after use.
Publish messages using the IConnection object directly from the connection pool:
[HttpGet("publish")]
public async Task<string> Publisher()
{
for (var i = 0; i < 100; i++)
{
var connectionPool = _messagePublisher.ConnectionPool;
var connection = connectionPool.Get();
try
{
connection.Channel.BasicPublishAsync(
exchange: string.Empty,
routingKey: "queue",
basicProperties: properties,
body: _jsonSerializer.Serializer(message),
mandatory: true);
}
finally
{
connectionPool.Return(connection);
}
}
return "ok";
}
You can bypass IMessagePublisher and directly inject ConnectionPool to use RabbitMQ connection objects, but this is not recommended.
private readonly ConnectionPool _connectionPool;
public DefaultMessagePublisher(ConnectionPool connectionPool)
{
_connectionPool = connectionPool;
}
public async Task MyPublshAsync()
{
var connection = _connectionPool.Get();
try
{
await connection.Channel.BasicPublishAsync(...);
}
finally
{
_connectionPool.Return(connection);
}
}
To manage connection objects more conveniently, you can use the CreateAutoReturn()
function to create a connection management object, which will automatically return the IConnection to the connection pool when released.
using var poolObject = _messagePublisher.ConnectionPool.CreateAutoReturn();
poolObject.Channel.BasicPublishAsync(
exchange: string.Empty,
routingKey: "queue",
basicProperties: properties,
body: _jsonSerializer.Serializer(message),
mandatory: true);
If you use ConnectionPool to push messages to RabbitMQ, please ensure to serialize the EventBody<TEvent>
event object, so that the Maomi.MQ.RabbitMQ consumer can work properly. Additionally, Maomi.MQ supports observability; if you use ConnectionPool to obtain connection objects for pushing messages, it may lead to missing observability information.
Under normal circumstances, RabbitMQ.Client includes observability functionality, but the additional observability information from Maomi.MQ.RabbitMQ is helpful for diagnosing fault issues.
Please note:
-
Maomi.MQ.RabbitMQ publishes and receives events through the
EventBody<TEvent>
generic object. -
DefaultMessagePublisher includes observability code, such as traceability.
Message Expiration
IMessagePublisher exposes BasicProperties, which allows for freely configuring message properties.
For example, configure the expiration time for messages:
[HttpGet("publish")]
public async Task<string> Publisher()
{
for (var i = 0; i < 1; i++)
{
await _messagePublisher.PublishAsync(queue: "test", message: new TestEvent
{
Id = i
}, properties =>
{
properties.Expiration = "6000";
});
}
return "ok";
}
If a dead-letter queue is bound to test
, the message will be moved to another queue if it is not consumed for a long time. Please refer to Dead Letter Queue.
You can also achieve more functionalities by configuring message properties. Please refer to IBasicProperties.
Transactions
RabbitMQ supports transactions, but according to RabbitMQ's official documentation, transactions significantly decrease throughput by 250 times.
Using transactions in RabbitMQ is relatively straightforward and can ensure that published messages have been pushed to the RabbitMQ server; only when the transaction commits will the submitted messages be stored and pushed to consumers.
。
Usage example:
[HttpGet("publish_tran")]
public async Task<string> Publisher_Tran()
{
using var tranPublisher = await _messagePublisher.TxSelectAsync();
try
{
await tranPublisher.PublishAsync(queue: "publish_tran", message: new TestEvent
{
Id = 666
});
await tranPublisher.TxCommitAsync();
}
catch
{
await tranPublisher.TxRollbackAsync();
throw;
}
return "ok";
}
Or manually start a transaction:
[HttpGet("publish_tran")]
public async Task<string> Publisher_Tran()
{
using var tranPublisher = _messagePublisher.CreateTransaction();
try
{
await tranPublisher.TxSelectAsync();
await tranPublisher.PublishAsync(queue: "publish_tran", message: new TestEvent
{
Id = 666
});
await tranPublisher.TxCommitAsync();
}
catch
{
await tranPublisher.TxRollbackAsync();
throw;
}
return "ok";
}
Note that under this mode, when creating the TransactionPublisher object, a connection object will be retrieved from the object pool. Since starting the transaction mode may pollute the current connection channel, the TransactionPublisher will not return the connection object to the connection pool but will release it directly.
Publisher Confirm Mode
Although the transaction mode ensures that messages will be pushed to the RabbitMQ server, it reduces throughput by 250 times, making it a poor choice. To address this issue, RabbitMQ introduces a confirmation mechanism that acts like a sliding window, ensuring that messages are pushed to the server while maintaining high performance.
Please refer to https://www.rabbitmq.com/docs/confirms
Usage example:
[HttpGet("publish_confirm")]
public async Task<string> Publisher_Confirm()
{
using var confirmPublisher = await _messagePublisher.ConfirmSelectAsync();
for (var i = 0; i < 5; i++)
{
await confirmPublisher.PublishAsync(queue: "publish_confirm1", message: new TestEvent
{
Id = 666
});
var result = await confirmPublisher.WaitForConfirmsAsync();
// If nacks are not received within the timeout, the result is true; otherwise, it is false.
Console.WriteLine($"Published {i},{result}");
}
return "ok";
}
The WaitForConfirmsAsync
method returns a value: if the message is successfully confirmed by the server, the result is true; if it times out without confirmation, it returns false.
In addition, there is a WaitForConfirmsOrDieAsync
method that waits until all published messages on the channel are confirmed. Usage example:
using var confirmPublisher = await _messagePublisher.ConfirmSelectAsync();
for (var i = 0; i < 5; i++)
{
await confirmPublisher.PublishAsync(queue: "publish_confirm1", message: new TestEvent
{
Id = 666
});
Console.WriteLine($"Published {i}");
}
await confirmPublisher.WaitForConfirmsOrDieAsync();
Note that under this mode, when creating the ConfirmPublisher object, a connection object will be retrieved from the object pool. Since starting the transaction mode may pollute the current connection channel, the ConfirmPublisher will not return the connection object to the connection pool but will release it directly.
Note that the same channel cannot use both transaction and publisher confirm modes simultaneously.
Exclusive Mode
By default, each time IMessagePublisher.PublishAsync()
is used to publish a message, a connection object is retrieved from the connection pool and used to publish the message, which is returned to the connection pool after publishing.
If a large number of messages need to be published in a short period of time, it requires repeatedly acquiring and returning the connection object each time.
Using exclusive mode allows a connection object to be held exclusively for a period. After the scope ends, the connection object will automatically be returned to the connection pool. This mode significantly improves throughput for scenarios that require bulk message publishing. To ensure the connection channel is returned to the pool, it is essential to use the using
keyword for the variable or manually call the Dispose
function.
Usage example:
// Create exclusive mode
using var singlePublisher = _messagePublisher.CreateSingle();
for (var i = 0; i < 500; i++)
{
await singlePublisher.PublishAsync(queue: "publish_single", message: new TestEvent
{
Id = 666
});
}
Consumer
In Maomi.MQ.RabbitMQ, there are two consumption modes: consumer mode and event mode (event bus mode).
Let's briefly understand the usage of these two modes.
Consumer Mode
The consumer service needs to implement the IConsumer<TEvent>
interface and configure the [Consumer("queue")]
attribute to bind the queue name, controlling the consumption behavior through the consumer object.
The consumer mode has failure notification and compensation capabilities and is relatively simple to use.
public class TestEvent
{
public int Id { get; set; }
}
[Consumer("PublisherWeb", Qos = 1, RetryFaildRequeue = true)]
public class MyConsumer : IConsumer<TestEvent>
{
private static int _retryCount = 0;
// Consume or retry
public async Task ExecuteAsync(EventBody<TestEvent> message)
{
_retryCount++;
Console.WriteLine($"Execution count: {_retryCount} Event id: {message.Id} {DateTime.Now}");
await Task.CompletedTask;
}
// Failure
public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message) => Task.CompletedTask;
// Compensation
public Task<bool> FallbackAsync(EventBody<TestEvent>? message) => Task.FromResult(true);
}
Event Mode
The event mode is implemented through an event bus, focusing on the event model and controlling consumption behavior through events.
[EventTopic("web2", Qos = 1, RetryFaildRequeue = true)]
public class TestEvent
{
public string Message { get; set; }
}
Then use the [EventOrder]
attribute to arrange the order of event execution.
// Arrange event consumption order
[EventOrder(0)]
public class My1EventEventHandler : IEventHandler<TestEvent>
{
public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
}
public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{@event.Id}, Event 1 has been executed");
}
}
[EventOrder(1)]
public class My2EventEventHandler : IEventHandler<TestEvent>
{
public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
}
public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{@event.Id}, Event 2 has been executed");
}
}
Of course, the event mode also allows for compensation functionality by creating middleware. Through middleware, all ordered events can be placed in the same transaction, succeeding or failing together to avoid consistency issues that may arise when the program exits during event execution.
public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
private readonly BloggingContext _bloggingContext;
public TestEventMiddleware(BloggingContext bloggingContext)
{
_bloggingContext = bloggingContext;
}
public async Task ExecuteAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next)
{
using (var transaction = _bloggingContext.Database.BeginTransaction())
{
await next(@event, CancellationToken.None);
await transaction.CommitAsync();
}
}
public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
{
return Task.CompletedTask;
}
public Task<bool> FallbackAsync(EventBody<TestEvent>? message)
{
return Task.FromResult(true);
}
}
Grouping
Both consumer mode and event mode can set groups by placing the Group
attribute in the attributes. Events in the same group will be placed in a single connection channel (RabbitMQ.Client.IConnection
). For events with infrequent consumption, reusing the connection channel can effectively reduce resource consumption.
Example of consumer mode grouping:
[Consumer("ConsumerWeb_group_1", Qos = 1, Group = "group")]
public class Group_1_Consumer : IConsumer<GroupEvent>
{
public Task ExecuteAsync(EventBody<GroupEvent> message) => Task.CompletedTask;
public Task FaildAsync(Exception ex, int retryCount, EventBody<GroupEvent>? message) => Task.CompletedTask;
public Task<bool> FallbackAsync(EventBody<GroupEvent>? message) => Task.FromResult(true);
}
[Consumer("ConsumerWeb_group_2", Qos = 1, Group = "group")]
public class Group_2_Consumer : IConsumer<GroupEvent>
{
public Task ExecuteAsync(EventBody<GroupEvent> message) => Task.CompletedTask;
public Task FaildAsync(Exception ex, int retryCount, EventBody<GroupEvent>? message) => Task.CompletedTask;
public Task<bool> FallbackAsync(EventBody<GroupEvent>? message) => Task.FromResult(true);
}
Example of event bus mode grouping:
[EventTopic("web1", Qos = 1, RetryFaildRequeue = true, Group = "group")]
public class Test1Event
{
public string Message { get; set; }
}
[EventTopic("web2", Qos = 1, RetryFaildRequeue = true, Group = "group")]
public class Test2Event
{
public string Message { get; set; }
}
Consumer Mode
The consumer mode requires the service to implement the IConsumer<TEvent>
interface and add the [Consumer]
attribute.
The IConsumer<TEvent>
interface is relatively simple, defined as follows:
public interface IConsumer<TEvent>
where TEvent : class
{
// Message processing.
public Task ExecuteAsync(EventBody<TEvent> message);
// Immediately execute this code after an exception in ExecuteAsync.
public Task FaildAsync(Exception ex, int retryCount, EventBody<TEvent>? message);
// Executed when the last retry fails, used for compensation.
public Task<bool> FallbackAsync(EventBody<TEvent>? message);
}
In consumer mode, a model class needs to be defined first for message passing between publishers and consumers. The event model class can be any class that can be serialized and deserialized without any other requirements.
public class TestEvent
{
public int Id { get; set; }
public override string ToString()
{
return Id.ToString();
}
}
Then, inherit the IConsumer<TEvent>
interface to implement consumer functionality:
[Consumer("web1", Qos = 1)]
public class MyConsumer : IConsumer<TestEvent>
{
// Consume
public async Task ExecuteAsync(EventBody<TestEvent> message)
{
Console.WriteLine(message.Body.Id);
}
// Executed each time it fails
public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
{
Console.WriteLine($"Retry {message.Body.Id}, count {retryCount}");
await Task.CompletedTask;
}
// Executed during the last failure
public async Task<bool> FallbackAsync(EventBody<TestEvent>? message)
{
Console.WriteLine($"Last time {message.Body.Id}");
// If returns true, indicates compensation successful.
return true;
}
}
For a detailed explanation of attribute configuration, please refer to Consumer Configuration.
Consumption, Retry, and Compensation
When a consumer receives a message pushed by the server, the ExecuteAsync
method is automatically executed. If an exception occurs during the execution of ExecuteAsync
, the FaildAsync
method is immediately triggered, allowing developers to log relevant information.
// Executed each time it fails
public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
{
// When retryCount == -1, the error is not caused by the ExecuteAsync method
if (retryCount == -1)
{
_logger.LogError(ex, "Consumer error, event id: {Id}", message?.Id);
// You can add alarm notification code here
await Task.Delay(1000);
}
else
{
_logger.LogError(ex, "Consumer exception, event id: {Id}, retry count: {retryCount}", message!.Id, retryCount);
}
}
If the FaildAsync
method also throws an exception, it will not affect the overall process. The framework will wait until the interval time is reached and then continue retrying the ExecuteAsync
method.
It is recommended to enclose the FaildAsync
logic in try{}catch{}
and not throw exceptions externally. The logic in FaildAsync
should not contain too much logic and should be used only for logging or alerts.
FaildAsync
is executed in an additional scenario when an error occurs before consuming the message. For example, if an event model class has a constructor that prevents it from being deserialized, FaildAsync
will be executed immediately, and retryCount
will be -1
.
When exceptions occur during the execution of the ExecuteAsync
method, the framework will automatically retry, with a default of five retries. If all five attempts fail, the FallbackAsync
method will be executed for compensation.
The retry intervals will progressively increase; please refer to Retry.
After five retries, the compensation mechanism will be triggered immediately.
// Executed on the last failure
public async Task<bool> FallbackAsync(EventBody<TestEvent>? message)
{
return true;
}
The FallbackAsync
method needs to return a boolean. If it returns true
, it indicates that although ExecuteAsync
encountered an exception, the compensation performed by FallbackAsync
has successfully resolved the issue, and the message will be consumed normally. If it returns false
, it indicates that the compensation failed, and the message will be treated as a consumption failure.
Only when an exception occurs in ExecuteAsync
will FaildAsync
and FallbackAsync
be triggered. If the exception occurs before processing the message, it will fail directly.
Consumption Failure
When the number of failures of ExecuteAsync
reaches the threshold, and FallbackAsync
returns false
, then the message consumption fails, or it fails directly due to serialization errors.
The [Consumer]
attribute has three important configurations:
public class ConsumerAttribute : Attribute
{
// When the number of consumption failures reaches a condition, whether to return to the queue.
public bool RetryFaildRequeue { get; set; }
// Whether to return to the queue upon exception, for example, due to serialization errors, rather than exceptions occurring during consumption.
public bool ExecptionRequeue { get; set; } = true;
// Bind to the dead letter queue.
public string? DeadQueue { get; set; }
}
When the number of failures in ExecuteAsync
reaches the threshold, and FallbackAsync
returns false
, the message consumption fails.
If RetryFaildRequeue == false
, this message will be discarded by RabbitMQ.
If a dead letter queue is bound, it will first be pushed to the dead letter queue and then discarded.
If RetryFaildRequeue == true
, this message will be returned to the RabbitMQ queue, awaiting the next consumption.
Since the message will be returned to the queue after failing, the bound dead letter queue will not receive this message.
When serialization exceptions or other issues prevent entering the ExecuteAsync
method, the FaildAsync
method will be triggered first, with the retryCount parameter value as -1
.
Such issues are generally caused by developer bugs, and no compensation or other operations will be performed; developers can handle the event in FaildAsync
and log relevant information.
// Executed on each failure or when an exception that cannot enter ExecuteAsync occurs
public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
{
// When retryCount == -1, the error is not caused by the ExecuteAsync method
if (retryCount == -1)
{
_logger.LogError(ex, "Consumer error, event id: {Id}", message?.Id);
// Alert notification code can be added here
await Task.Delay(1000);
}
else
{
_logger.LogError(ex, "Consumer exception, event id: {Id}, retry count: {retryCount}", message!.Id, retryCount);
}
}
Handling this situation improperly will cause message loss; therefore, the framework defaults ExecptionRequeue
to true
. This means that when such an exception occurs, the message will be returned to the queue. If the problem remains unresolved, a cycle will occur: call FaildAsync
, return to the queue, call FaildAsync
, return to the queue...
Thus, FaildAsync
should include code to notify developers of relevant information and set an interval to avoid excessive retries.
Auto Queue Creation
The framework will automatically create queues by default. To disable this auto creation feature, set AutoQueueDeclare
to false
.
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AppName = "myapp";
options.AutoQueueDeclare = false;
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = "192.168.3.248";
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly]);
Queues can also be configured individually for auto creation:
[Consumer("ConsumerWeb_create", AutoQueueDeclare = AutoQueueDeclare.Enable)]
By default, when global auto creation is disabled, queues will not be created automatically.
If global auto creation is disabled but a consumer is configured with AutoQueueDeclare = AutoQueueDeclare.Enable
, queues will still be created automatically.
If a consumer is configured with AutoQueueDeclare = AutoQueueDeclare.Disable
, it will ignore the global configuration and not create the queue.
Qos
If the program needs to consume strictly in order, Qos = 1
can be used, ensuring that messages are consumed one by one. If the program does not require sequential consumption and prefers to handle all messages quickly, a larger Qos value can be set. Since the combination of Qos with retry and compensation mechanisms can result in various scenarios, please refer to Retry.
Qos is configured through attributes:
[Consumer("ConsumerWeb", Qos = 1)]
By increasing the Qos value, the program can concurrently process messages, improving throughput.
Delayed Queues
There are two types of delayed queues: one sets the message expiration time, and the other sets the queue expiration time.
Setting a message expiration time means that if the message is not consumed within a certain time, it will be discarded or moved to the dead letter queue. This configuration only applies to individual messages; please refer to Message Expiration.
When a queue is set to expire, messages that are not consumed within a certain time will be discarded or moved to the dead letter queue. This configuration applies to all messages. Based on this, we can implement delayed queues.
First, create a consumer that inherits from EmptyConsumer
, which will create the queue when the program starts but will not create an IConnection
to consume messages. Then set the message expiration time for the queue and bind it to a dead letter queue, which can be implemented using either the consumer or event model.
[Consumer("ConsumerWeb_dead_2", Expiration = 6000, DeadQueue = "ConsumerWeb_dead_queue_2")]
public class EmptyDeadConsumer : EmptyConsumer<DeadEvent>
{
}
// ConsumerWeb_dead failed messages will be consumed by this consumer.
[Consumer("ConsumerWeb_dead_queue_2", Qos = 1)]
public class Dead_2_QueueConsumer : IConsumer<DeadQueueEvent>
{
// Consumption
public Task ExecuteAsync(EventBody<DeadQueueEvent> message)
{
Console.WriteLine($"Dead letter queue, event id:{message.Id}");
return Task.CompletedTask;
}
// Executed on every failure
public Task FaildAsync(Exception ex, int retryCount, EventBody<DeadQueueEvent>? message) => Task.CompletedTask;
// Executed on the last failure
public Task<bool> FallbackAsync(EventBody<DeadQueueEvent>? message) => Task.FromResult(false);
}
Empty Consumers
When an empty consumer is detected, the framework will only create the queue and will not start the consumer to consume messages.
This can be used in conjunction with delayed queues. This queue will not have any consumers, and when the messages in this queue expire, they will be directly consumed by the dead letter queue, as shown below:
[Consumer("ConsumerWeb_empty", Expiration = 6000, DeadQueue = "ConsumerWeb_empty_dead")]
public class MyEmptyConsumer : EmptyConsumer<TestEvent> { }
[Consumer("ConsumerWeb_empty_dead", Qos = 10)]
public class MyDeadConsumer : IConsumer<TestEvent>
{
public Task ExecuteAsync(EventBody<TestEvent> message) => Task.CompletedTask;
public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message) => Task.CompletedTask;
public Task<bool> FallbackAsync(EventBody<TestEvent>? message) => Task.FromResult(true);
}
For cross-process queues, Service A can publish without consuming, while Service B is responsible for consumption. An empty consumer can be added in Service A to ensure that the queue exists when Service A starts. Furthermore, the consumer service should not focus on the definition of the queue, nor should it be responsible for creating queues.
Grouping
By configuring the Group
attribute, multiple consumers can be executed in the same connection channel. For queues with low concurrency, reusing connection channels can reduce resource consumption.
Example:
[Consumer("ConsumerWeb_group_1", Qos = 1, Group = "group")]
public class Group_1_Consumer : IConsumer<GroupEvent>
{
}
[Consumer("ConsumerWeb_group_2", Qos = 1, Group = "group")]
public class Group_2_Consumer : IConsumer<GroupEvent>
{
}
Event Bus Pattern
Maomi.MQ internally designs an event bus that helps developers achieve event orchestration, implement local transactions, forward executions, and compensation.
First, define an event type that binds to a topic or queue. The event needs to be marked with [EventTopic]
and the corresponding queue name must be set.
The [EventTopic]
attribute has the same properties as [Consumer]
, and for event configuration, please refer to Consumer Configuration.
[EventTopic("EventWeb")]
public class TestEvent
{
public string Message { get; set; }
public override string ToString()
{
return Message;
}
}
Next, orchestrate event executors. Each executor must inherit from the IEventHandler<T>
interface and must be marked with the [EventOrder]
attribute to denote execution order.
[EventOrder(0)]
public class My1EventEventHandler : IEventHandler<TestEvent>
{
public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
}
public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{@event.Id}, event 1 has been executed");
}
}
[EventOrder(1)]
public class My2EventEventHandler : IEventHandler<TestEvent>
{
public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
}
public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{@event.Id}, event 2 has been executed");
}
}
Each event handler must implement the IEventHandler<T>
interface and set the [EventOrder]
attribute to confirm the execution order. The framework will execute the ExecuteAsync
method of IEventHandler<T>
in order. When an exception occurs during ExecuteAsync
, CancelAsync
will be called in reverse order.
Since a program can crash at any time, it is impractical to achieve compensation through CancelAsync
. The CancelAsync
method is mainly used for logging associated information.
Middleware
Middleware allows developers to intercept events, log information, and implement local transactions. If developers do not configure it, the framework will automatically create a DefaultEventMiddleware<TEvent>
type as the middleware service for that event.
Example code for custom event middleware:
public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
public async Task HandleAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next)
{
await next(@event, CancellationToken.None);
}
}
The next
delegate is the event execution chain created by the framework, allowing interception of events and decisions regarding whether to execute the event chain.
When the next()
delegate is called within the middleware, the framework begins sequentially executing the events, referring to My1EventEventHandler
, My2EventEventHandler
, mentioned earlier.
Given that a program can fail at any time when an event has several handlers, local transactions become essential.
For example, inject a database context in the middleware, begin a transaction to execute a database operation, and if one EventHandler
fails, the execution chain will roll back without committing the transaction.
public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
private readonly BloggingContext _bloggingContext;
public TestEventMiddleware(BloggingContext bloggingContext)
{
_bloggingContext = bloggingContext;
}
public async Task HandleAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next)
{
using (var transaction = _bloggingContext.Database.BeginTransaction())
{
await next(@event, CancellationToken.None);
await transaction.CommitAsync();
}
}
public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
{
return Task.CompletedTask;
}
public Task<bool> FallbackAsync(EventBody<TestEvent>? message)
{
return Task.FromResult(true);
}
}
[EventOrder(0)]
public class My1EventEventHandler : IEventHandler<TestEvent>
{
private readonly BloggingContext _bloggingContext;
public My1EventEventHandler(BloggingContext bloggingContext)
{
_bloggingContext = bloggingContext;
}
public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{{--event.Id} 被补偿,[1]");
}
public async Task HandlerAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
await _bloggingContext.Posts.AddAsync(new Post
{
Title = "鲁滨逊漂流记",
Content = "随便写写就对了"
});
await _bloggingContext.SaveChangesAsync();
}
}
[EventOrder(1)]
public class My2EventEventHandler : IEventHandler<TestEvent>
{
private readonly BloggingContext _bloggingContext;
public My2EventEventHandler(BloggingContext bloggingContext)
{
_bloggingContext = bloggingContext;
}
public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{{--event.Id} 被补偿,[2]");
}
public async Task HandlerAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
await _bloggingContext.Posts.AddAsync(new Post
{
Title = "红楼梦",
Content = "贾宝玉初试云雨情"
});
await _bloggingContext.SaveChangesAsync();
throw new OperationCanceledException("故意报错");
}
}
When exceptions occur during event execution, they will also be retried, and the middleware TestEventMiddleware's FaildAsync and FallbackAsync will be executed in sequence.
You can refer to the Consumer Model or Retry.
Group Consumption
Event group consumption mainly utilizes the same IConnection to handle multiple message queues simultaneously, improving channel utilization.
Example:
[EventTopic("EventGroup_1", Group = "aaa")]
public class Test1Event
{
public string Message { get; set; }
public override string ToString()
{
return Message;
}
}
[EventTopic("EventGroup_2", Group = "aaa")]
public class Test2Event
{
public string Message { get; set; }
public override string ToString()
{
return Message;
}
}
Maomi.MQ's IConsumer<T>
is a consumer (a queue) that uses one IConnection, and by default, the event bus does too.
For queues with low concurrency or utilization, they can be merged into the same IConnection using event grouping for processing.
Using it is simple; just configure the Group
method of the [EventTopic]
attribute when defining events.
Since different queues are consumed in one IConnection, if events have set Qos, the framework will default to calculating the average value, for example:
[EventTopic("web3_1", Group = "aaa", Qos = 10)]
public class Test1Event
[EventTopic("web3_2", Group = "aaa", Qos = 6)]
public class Test2Event
At this time, the framework will set Qos to 8
.
Configuration
When introducing the Maomi.MQ framework, related properties can be configured. Here are examples and descriptions:
// this.
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
// Current program node, used for configuring distributed snowflake id
options.WorkId = 1;
// Whether to automatically create a queue
options.AutoQueueDeclare = true;
// Current application name, used to identify the message publisher and consumer program
options.AppName = "myapp";
// RabbitMQ configuration
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = "192.168.3.248";
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly]); // Assembly to be scanned
Consumer Configuration
The consumer mode [Consumer]
and event bus mode [EventTopic]
have the same property configurations. Their configuration descriptions are as follows:
| Name | Type | Default Value | Description |
|-------------------|------------------|-----------------------|---------------------------------------------------------------|
| Queue | string | | Queue name |
| DeadQueue | string? | | Binds dead letter queue name |
| ExecptionRequeue | bool | true | Whether to return to the queue when an exception occurs, e.g., due to serialization errors, rather than exceptions during consumption |
| Expiration | int | 0 | Queue message expiration time in milliseconds |
| Qos | ushort | 1 | Qos |
| RetryFaildRequeue | bool | false | Whether to return to the queue when the number of consumption failures reaches the condition |
| Group | string? | | Group name |
| AutoQueueDeclare | AutoQueueDeclare | AutoQueueDeclare.None | Whether to automatically create a queue |
Environment Isolation
Currently considering whether to support multi-tenant mode.
In development, it is often necessary to debug locally. After the local program starts, it will connect to the development server. When a queue receives a message, it will push messages to one of the consumers. During local debugging, after publishing a message, the local program may not receive that message, but rather it is consumed by the program in the development environment.
At this time, we hope to isolate the local debugging environment from the development environment, which can be achieved using the VirtualHost feature provided by RabbitMQ.
First, create a new VirtualHost through a PUT request. Please refer to the documentation: https://www.rabbitmq.com/docs/vhosts#using-http-api
Then, configure the VirtualHost in the code:
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AutoQueueDeclare = true;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = "192.168.3.248";
#if DEBUG
options.VirtualHost = "debug";
#endif
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly]);
Snowflake ID Configuration
Maomi.MQ.RabbitMQ uses IdGenerator to generate snowflake IDs, ensuring that each event has a unique ID within the cluster.
The framework creates snowflake IDs through the IIdFactory interface. You can replace the IIdFactory interface to configure the snowflake ID generation rules.
services.AddSingleton<IIdFactory>(new DefaultIdFactory((ushort)optionsBuilder.WorkId));
Example:
public class DefaultIdFactory : IIdFactory
{
/// <summary>
/// Initializes a new instance of the <see cref="DefaultIdFactory"/> class.
/// </summary>
/// <param name="workId"></param>
public DefaultIdFactory(ushort workId)
{
var options = new IdGeneratorOptions(workId) { SeqBitLength = 10 };
YitIdHelper.SetIdGenerator(options);
}
/// <inheritdoc />
public long NextId() => YitIdHelper.NextId();
}
For snowflake ID configuration using IdGenerator framework, please refer to:
https://github.com/yitter/IdGenerator/tree/master/C#
Qos Concurrency and Order
Both consumer mode and event mode allow configuration of consumption properties through attributes, with Qos being one of the important attributes.
Qos Scenarios
For both consumer mode and event bus mode, if the Group
attribute is not used to configure consumption behavior, each queue will occupy one IConnection and Host service exclusively.
For queues with high consumption frequency but cannot be concurrent, it is best not to set the Group
attribute and ensure that Qos = 1
. This ensures that the consumer occupies resources for consumption, which helps improve consumption capacity while guaranteeing order.
[Consumer("web1", Qos = 1)]
public class MyConsumer : IConsumer<TestEvent>
{
}
When there is a need to increase consumption throughput but order consumption is not required, you can set Qos higher. The RabbitMQ Client framework will improve throughput through pre-fetching, allowing multiple messages to be consumed concurrently.
If it is determined that some consumers will not have a high consumption frequency, those consumers can be grouped.
When multiple consumers or events are configured to share a group, the Qos of these events should be consistent; otherwise, it will be averaged.
Example:
[Consumer("web1", Qos = 10, Group = "group")]
public class My1Consumer : IConsumer<TestEvent>
{
}
[Consumer("web2", Qos = 6, Group = "group")]
public class My2Consumer : IConsumer<TestEvent>
{
}
Since both consumers use the same group, the reused channel's Qos will be set to 8.
If the consumption frequency is low but ordered consumption is needed, these consumers can be placed in the same group with Qos set to 1.
[Consumer("web1", Qos = 1, Group = "group1")]
public class My1Consumer : IConsumer<TestEvent>
{
}
[Consumer("web2", Qos = 1, Group = "group1")]
public class My2Consumer : IConsumer<TestEvent>
{
}
Concurrency and Exception Handling
In the first case, when Qos is set to 1 and neither ExecptionRequeue nor RetryFaildRequeue is set.
In the second case, when Qos is set to 1 and both ExecptionRequeue and RetryFaildRequeue are set.
When Qos is 1, strict order consumption is guaranteed. ExecptionRequeue and RetryFaildRequeue will affect whether failed messages are returned to the queue. If they are returned to the queue, the next consumption will continue to consume the previously failed messages. If an error (such as a bug) is not resolved, it will cause a cycle of consumption, failure, returning to the queue, and re-consumption.
In the third case, when Qos > 1 and neither ExecptionRequeue nor RetryFaildRequeue is set.
In the fourth case, when Qos > 1 and both ExecptionRequeue and RetryFaildRequeue are set.
When Qos is greater than 1, if RetryFaildRequeue = true
is set, failed messages will be returned to the queue; however, it does not guarantee that the next consumption will immediately re-consume that message.
Retry
Retry Time
When the consumer's ExecuteAsync
method throws an exception, the framework will retry, defaulting to five retries with an exponential backoff where the interval is set as 2.
After the first failure, it retries after 2 seconds. After the second failure, it retries after 4 seconds, followed by 8, 16, and 32 seconds.
Maomi.MQ.RabbitMQ uses the Polly framework as a retry strategy manager, and by default generates a retry interval strategy through DefaultRetryPolicyFactory service.
<br />
。
DefaultRetryPolicyFactory code example is as follows:
```csharp
/// <summary>
/// Default retry policy.<br />
/// 默认的策略提供器.
/// </summary>
public class DefaultRetryPolicyFactory : IRetryPolicyFactory
{
/// <inheritdoc/>
public virtual Task<AsyncRetryPolicy> CreatePolicy(string queue, long id)
{
// Create a retry policy.
// 创建重试策略.
var retryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(
retryCount: 5,
sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
onRetry: async (exception, timeSpan, retryCount, context) =>
{
_logger.LogDebug("Retry execution event,queue [{Queue}],retry count [{RetryCount}],timespan [{TimeSpan}]", queue, retryCount, timeSpan);
await FaildAsync(queue, exception, timeSpan, retryCount, context);
});
return Task.FromResult(retryPolicy);
}
public virtual Task FaildAsync(string queue, Exception ex, TimeSpan timeSpan, int retryCount, Context context)
{
return Task.CompletedTask;
}
}
You can replace the default retry policy service by implementing the IRetryPolicyFactory interface.
services.AddSingleton<IRetryPolicyFactory, DefaultRetryPolicyFactory>();
Retry Mechanism
The consumer code is as follows:
[Consumer("web1", Qos = 1 , RetryFaildRequeue = true)]
public class MyConsumer : IConsumer<TestEvent>
{
private int _retryCount = 0;
// Consume
public async Task ExecuteAsync(EventBody<TestEvent> message)
{
Console.WriteLine($"执行 {message.Body.Id} 第几次:{_retryCount} {DateTime.Now}");
_retryCount++;
throw new Exception("1");
}
// Executed each time it fails
public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
{
Console.WriteLine($"重试 {message.Body.Id} 第几次:{retryCount} {DateTime.Now}");
await Task.CompletedTask;
}
// Executed on the last failure
public async Task<bool> FallbackAsync(EventBody<TestEvent>? message)
{
Console.WriteLine($"执行 {message.Body.Id} 补偿 {DateTime.Now}");
return true;
}
}
}
First, IConsumer<TEvent>.ExecuteAsync()
or IEventMiddleware<TEvent>.ExecuteAsync()
will execute to consume the message. At this moment, if ExecuteAsync()
fails, it will immediately trigger the FaildAsync()
function.
Then, after waiting for a period of time, ExecuteAsync()
method will be retried.
For instance, if the default retry mechanism retries five times, then ultimately IConsumer<TEvent>.ExecuteAsync()
or IEventMiddleware<TEvent>.ExecuteAsync()
will execute six times: once for normal consumption and five times for retry consumption.
The FallbackAsync()
method will be called after the last retry fails, and this function must return a bool type.
After multiple retry failures, the framework will call the FallbackAsync
method. If this method returns true, the framework will consider that although ExecuteAsync()
failed, it has been compensated through FallbackAsync()
, and that message will be treated as successfully consumed, with the framework sending an ACK to the RabbitMQ server, then proceeding to consume the next message.
If FallbackAsync()
returns false, the framework will consider that the message has completely failed. If RetryFaildRequeue = true
is set, this message will be put back into the message queue for the next consumption attempt; otherwise, this message will be discarded outright.
Persisting Remaining Retry Count
When the consumer fails to process a message, the default consumer will retry five times. If it has already retried three times and then the application restarts, the next time this message is consumed, it will still be retried five times.
It is necessary to keep track of the retry count so that upon program restart, it can continue the retries based on the remaining count.
Introduce the Maomi.MQ.RedisRetry package.
Configuration example:
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AutoQueueDeclare = true;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = "192.168.3.248";
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly]);
builder.Services.AddMaomiMQRedisRetry((s) =>
{
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.3.248");
IDatabase db = redis.GetDatabase();
return db;
});
The default key will only be retained for five minutes. This means that if the program tries to consume this message after five minutes, the remaining retry count will be reset.
Dead Letter Queue
Dead Letter Queue
You can bind a dead letter queue to a consumer or event. When the messages in this queue fail and are not returned to the queue, the message will be pushed to the dead letter queue, example:
[Consumer("ConsumerWeb_dead", Qos = 1, DeadQueue = "ConsumerWeb_dead_queue", RetryFaildRequeue = false)]
public class DeadConsumer : IConsumer<DeadEvent>
{
// Consume
public Task ExecuteAsync(EventBody<DeadEvent> message)
{
Console.WriteLine($"事件 id:{message.Id}");
throw new OperationCanceledException();
}
// Executed each time it fails
public Task FaildAsync(Exception ex, int retryCount, EventBody<DeadEvent>? message) => Task.CompletedTask;
// Executed on the last failure
public Task<bool> FallbackAsync(EventBody<DeadEvent>? message) => Task.FromResult(false);
}
// Messages that fail to consume in ConsumerWeb_dead will be consumed by this consumer.
[Consumer("ConsumerWeb_dead_queue", Qos = 1)]
public class DeadQueueConsumer : IConsumer<DeadQueueEvent>
{
// Consume
public Task ExecuteAsync(EventBody<DeadQueueEvent> message)
{
Console.WriteLine($"死信队列,事件 id:{message.Id}");
return Task.CompletedTask;
}
// Executed each time it fails
public Task FaildAsync(Exception ex, int retryCount, EventBody<DeadQueueEvent>? message) => Task.CompletedTask;
// Executed on the last failure
public Task<bool> FallbackAsync(EventBody<DeadQueueEvent>? message) => Task.FromResult(false);
}
If using a dead letter queue, be sure to set RetryFaildRequeue
to false; then, after multiple retry failures, the consumer will send a nack signal to RabbitMQ, which will forward the message to the bound dead letter queue.
Delayed Queue
Create a consumer that inherits from EmptyConsumer; this queue will be created when the program starts, but no IConnection will be created for consumption. Then set the message expiration time for the queue and bind it to a dead letter queue, which can either be implemented with a consumer pattern or an event pattern.
[Consumer("ConsumerWeb_dead_2", Expiration = 6000, DeadQueue = "ConsumerWeb_dead_queue_2")]
public class EmptyDeadConsumer : EmptyConsumer<DeadEvent>
{
}
// Messages that fail to consume in ConsumerWeb_dead will be consumed by this consumer.
[Consumer("ConsumerWeb_dead_queue_2", Qos = 1)]
public class Dead_2_QueueConsumer : IConsumer<DeadQueueEvent>
{
// Consume
public Task ExecuteAsync(EventBody<DeadQueueEvent> message)
{
Console.WriteLine($"事件 id:{message.Id} 已到期");
return Task.CompletedTask;
}
// Executed each time it fails
public Task FaildAsync(Exception ex, int retryCount, EventBody<DeadQueueEvent>? message) => Task.CompletedTask;
// Executed on the last failure
public Task<bool> FallbackAsync(EventBody<DeadQueueEvent>? message) => Task.FromResult(false);
}
For example, after a user places an order, if payment is not made within 15 minutes, the message will expire and the order will be automatically canceled.
Observability
This feature is still being improved. Please refer to the ActivitySourceApi example.
To quickly deploy the observability platform, you can use the example packages provided by OpenTelemetry for rapid deployment of the related services.
Download the example repository source code:
git clone https://github.com/open-telemetry/opentelemetry-demo.git
Because the example includes a large number of demo microservices, we need to open the docker-compose.yml file, directly delete the Core Demo Services
and Dependent Services
services from the services node, and only keep the observability components. Alternatively, you can directly download the modified version: docker-compose.yml
Execute the command to deploy the observability services:
docker-compose up -d
opentelemetry-collector-contrib is used to collect observability information for link tracing, available in both grpc and http, with the following listening ports:
| Port | Protocol | Endpoint | Function |
| :--- | :------- | :----------- | :----------------------------------------------------------- |
| 4317 | gRPC | n/a | Accepts traces in OpenTelemetry OTLP format (Protobuf). |
| 4318 | HTTP | /v1/traces
| Accepts traces in OpenTelemetry OTLP format (Protobuf and JSON). |
After container port mapping, the external port may not be 4317 or 4318.
Introduce the Maomi.MQ.Instrumentation package, along with other related OpenTelemetry packages.
<PackageReference Include="Maomi.MQ.Instrumentation " Version="1.1.0" />
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.8.1" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.8.1" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.8.1" />
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.8.1" />
Then inject the services:
const string serviceName = "myapp";
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AutoQueueDeclare = true;
options.AppName = serviceName;
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = "192.168.3.248";
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly]);
builder.Services.AddOpenTelemetry()
.ConfigureResource(resource => resource.AddService(serviceName))
.WithTracing(tracing =>
{
tracing.AddMaomiMQInstrumentation(options =>
{
options.Sources.AddRange(MaomiMQDiagnostic.Sources);
options.RecordException = true;
})
.AddAspNetCoreInstrumentation()
.AddOtlpExporter(options =>
{
options.Endpoint = new Uri("http://127.0.0.1:32772/v1/traces");
options.Protocol = OtlpExportProtocol.HttpProtobuf;
});
});
After starting the service, the link tracing information will be automatically pushed to the OpenTelemetry Collector during publishing and consumption, which can be retrieved through components like Jaeger and Skywalking.
由于 publish
和 consumer
属于兄弟 trace
而不是同一个 trace
,因此需要通过 Tags
查询相关联的 trace
,格式 event.id=xxx
。
文章评论