Maomi.MQ 2.0 | Powerful .NET Messaging Queue Communication Model Framework

2025年2月21日 28点热度 0人点赞 0条评论
内容目录

Description

Author: Chizhe Gongliang

Document Address: https://mmq.whuanle.cn

Repository Address: https://github.com/whuanle/Maomi.MQ

Author's Blog:

Introduction

Maomi.MQ is a communication framework that simplifies the use of message queues, currently supporting RabbitMQ.

Maomi.MQ.RabbitMQ is a communication model specifically designed for RabbitMQ publishers and consumers, greatly simplifying the code for publishing and messaging, while providing a range of convenient and practical features. Developers can achieve high-performance consumption and event orchestration through the consumption model provided by the framework. The framework also supports publisher acknowledgment mechanisms, custom retry mechanisms, compensation mechanisms, dead-letter queues, delayed queues, connection channel reuse, and various other convenient features. Developers can focus more on business logic by simplifying cross-process message communication patterns with the Maomi.MQ.RabbitMQ framework, making inter-process message delivery simpler and more reliable.

In addition, the framework supports distributed observability through runtime built-in APIs, allowing further use of frameworks like OpenTelemetry to collect observability information and push it to infrastructure platforms.

Quick Start

This tutorial will introduce how to use Maomi.MQ.RabbitMQ, to help readers quickly understand the usage and features of the framework.

Create a web project (refer to the WebDemo project), introduce the Maomi.MQ.RabbitMQ package, and inject the service into the web configuration:

// using Maomi.MQ;
// using RabbitMQ.Client;

builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
    options.WorkId = 1;
    options.AppName = "myapp";
    options.Rabbit = (ConnectionFactory options) =>
    {
        options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
        options.Port = 5672;
        options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
    };
}, [typeof(Program).Assembly]);

var app = builder.Build();

  • WorkId: Specifies the node ID used to generate distributed snowflake IDs, defaulting to 0.

    Each message generates a unique ID for tracking purposes. If the snowflake ID is not set, duplicate IDs may occur during parallel operations of multiple instances in distributed services.

  • AppName: Used to identify the message producer and mark the producer or consumer in logs and traceability.

  • Rabbit: RabbitMQ client configuration, refer to ConnectionFactory.

Define a message model class; this model class is the basis of MQ communication and will be serialized into binary content for transmission to the RabbitMQ server.

public class TestEvent
{
    public int Id { get; set; }

    public override string ToString()
    {
        return Id.ToString();
    }
}

Define a consumer that needs to implement the IConsumer<TEvent> interface and use the [Consumer] attribute to configure consumer properties. For example, [Consumer("test")] indicates that the queue subscribed to by this consumer is test.

The IConsumer<TEvent> interface contains three methods. The ExecuteAsync method handles the message, FaildAsync executes immediately upon an exception in ExecuteAsync, and if the code continues to throw exceptions, the FallbackAsync method will eventually be called. The Maomi.MQ framework will determine whether to return the message to the queue for re-consumption or perform other actions based on the ConsumerState value.

[Consumer("test")]
public class MyConsumer : IConsumer<TestEvent>
{
    // Consumption
    public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
    {
        Console.WriteLine($"Event ID: {message.Id} {DateTime.Now}");
        await Task.CompletedTask;
    }

    // Executed on every consumption failure
    public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message) 
        => Task.CompletedTask;

    // Compensation
    public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex) 
        => Task.FromResult(ConsumerState.Ack);
}

Maomi.MQ also features multiple consumer patterns, with different code implementations that will be elaborated on later.

To publish a message, simply inject the IMessagePublisher service.

[ApiController]
[Route("[controller]")]
public class IndexController : ControllerBase
{
    private readonly IMessagePublisher _messagePublisher;

    public IndexController(IMessagePublisher messagePublisher)
    {
        _messagePublisher = messagePublisher;
    }

    [HttpGet("publish")]
    public async Task<string> Publisher()
    {
        // Publish message
        await _messagePublisher.PublishAsync(exchange: string.Empty, routingKey: "test", message: new TestEvent
        {
            Id = 123
        });
        return "ok";
    }
}

Start the web service, and upon requesting the API interface on the Swagger page, the MyConsumer service will immediately receive the published message.

image-20250206160702304

If it is a console project, the Microsoft.Extensions.Hosting package needs to be introduced to allow the consumer to subscribe to the queue in the background.

Refer to the ConsoleDemo project.

using Maomi.MQ;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using System.Reflection;

var host = new HostBuilder()
    .ConfigureLogging(options =>
    {
        options.AddConsole();
        options.AddDebug();
    })
    .ConfigureServices(services =>
    {
        services.AddMaomiMQ(options =>
        {
            options.WorkId = 1;
            options.AppName = "myapp";
            options.Rabbit = (ConnectionFactory options) =>
            {
                options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
                options.Port = 5672;
                options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
            };
        }, new System.Reflection.Assembly[] { typeof(Program).Assembly });

    }).Build();

// Run in background
var task = host.RunAsync();

Console.ReadLine();

Message Publisher

The message publisher is used to push messages to the RabbitMQ server. Maomi.MQ supports various message publisher patterns and RabbitMQ transaction modes. For example projects, please refer to PublisherWeb.

Maomi.MQ provides message-pushing services to developers through IMessagePublisher.

Before publishing messages, an event model class needs to be defined for message transmission.

public class TestEvent
{
    public int Id { get; set; }

    public override string ToString()
    {
        return Id.ToString();
    }
}

Then, inject the IMessagePublisher service to publish messages:

[ApiController]
[Route("[controller]")]
public class IndexController : ControllerBase
{
    private readonly IMessagePublisher _messagePublisher;

    public IndexController(IMessagePublisher messagePublisher)
    {
        _messagePublisher = messagePublisher;
    }

    [HttpGet("publish")]
    public async Task<string> Publisher()
    {
        for (var i = 0; i < 100; i++)
        {
            await _messagePublisher.PublishAsync(exchange: string.Empty, routingKey: "publish", message: new TestEvent
            {
                Id = i
            });
        }

        return "ok";
    }
}

In general, a model class should be used by only one consumer, so that the unique consumer can be found through the event type, i.e., finding the IConsumerOptions corresponding to the event type, allowing the framework to use the corresponding configuration to send messages.

The TestMessageEvent model has only one consumer:

[Consumer("publish", Qos = 1, RetryFaildRequeue = true)]
public class TestEventConsumer : IConsumer<TestMessageEvent>
{
    // ... ...
}

Messages can be sent directly without the need to specify an exchange or routing key.

[HttpGet("publish_message")]
public async Task<string> PublisherMessage()
{
    // If in this project, TestMessageEvent has only one specified consumer, it will automatically find the corresponding configuration by TestMessageEvent
    for (var i = 0; i < 100; i++)
    {
        await _messagePublisher.PublishAsync(model: new TestMessageEvent
        {
            Id = i
        });
    }

    return "ok";
}

IMessagePublisher

IMessagePublisher is the basic message publishing interface of Maomi.MQ, with the following methods:

// Message publisher.
public interface IMessagePublisher
{
    Task PublishAsync<TMessage>(string exchange,    // Exchange name.
                                string routingKey,  // Queue/routing key name.
                                TMessage message,   // Event object.
                                Action<BasicProperties> properties, 
                                CancellationToken cancellationToken = default)
        where TMessage : class;

    Task PublishAsync<TMessage>(string exchange, 
                                string routingKey, 
                                TMessage message, 
                                BasicProperties? properties = default, 
                                CancellationToken cancellationToken = default);

    Task PublishAsync<TMessage>(TMessage message, 
                                Action<BasicProperties>? properties = null, 
                                CancellationToken cancellationToken = default)
        where TMessage : class;

    Task PublishAsync<TMessage>(TMessage model, 
                                BasicProperties? properties = default, 
                                CancellationToken cancellationToken = default);
    
    Task CustomPublishAsync<TMessage>(string exchange, 
                                      string routingKey, 
                                      TMessage message, 
                                      BasicProperties? properties = default, 
                                      CancellationToken cancellationToken = default);
}

Maomi.MQ has a small number of message publishing interfaces, as it directly exposes BasicProperties, allowing developers to configure RabbitMQ's native message properties freely. Thus, the interfaces are simpler, and developers can be flexible while using them with little difficulty.

BasicProperties is the basic properties object of messages in RabbitMQ, directly oriented towards developers, allowing message publishing and consumption to become flexible with rich functionalities. For example, you can configure the expiration time of a single message with BasicProperties:

await _messagePublisher.PublishAsync(exchange: string.Empty, routingKey: "publish", message: new TestEvent
{
    Id = i
}, (BasicProperties p) =>
{
    p.Expiration = "1000";
});

Maomi.MQ implements IMessagePublisher through the DefaultMessagePublisher type, with its default lifecycle being Scoped:

services.AddScoped<IMessagePublisher, DefaultMessagePublisher>();

Developers can also implement the IMessagePublisher interface by themselves to create their own message publishing models. For specific examples, refer to the DefaultMessagePublisher type.

Native Channel

Developers can obtain native connection objects via the ConnectionPool service to publish messages directly on the IConnection using RabbitMQ interfaces:

private readonly ConnectionPool _connectionPool;

var connectionObject = _connectionPool.Get();
connectionObject.DefaultChannel.BasicPublishAsync(... ...);

Persistent In-Memory Connection Objects

Maomi.MQ manages RabbitMQ connection objects via ConnectionPool. After injecting the ConnectionPool service, you can obtain the global default connection instance using the .Get() interface.

If developers have their own requirements, they can create new connection objects through the .Create() interface.

using var newConnectionObject = _connectionPool.Create();
using var newConnection = newConnectionObject.Connection;
using var newChannel = newConnection.CreateChannelAsync();

Please make sure to use connection objects properly. Avoid frequently creating and releasing them, and do not forget to manage their lifecycle; otherwise, memory leaks may occur.

A single IConnection is sufficient for most scenarios, and the throughput is adequate. After extensive testing, I found that one IConnection meets the requirements without providing any advantages from multiple IConnections. Thus, the old version of the connection pool has been removed, and now there is only one global IConnection by default, while different consumers use IChannel to isolate themselves.

When the program maintains only one IConnection, four publishers publish messages simultaneously, and the speed per second is as follows:

image-20240720220241778

If the message content is very large, a single IConnection can handle it as well, depending on the bandwidth.

Each message is 478 KiB.

image-20240720220937413

Message Expiration

IMessagePublisher exposes BasicProperties, allowing developers to configure message attributes freely.

For example, configuring expiration time for messages:

[HttpGet("publish")]
public async Task<string> Publisher()
{
	for (var i = 0; i < 1; i++)
	{
		await _messagePublisher.PublishAsync(exchange: string.Empty, routingKey: "publish", message: new TestEvent
		{
			Id = i
		}, properties =>
		{
			properties.Expiration = "6000";
		});
	}

	return "ok";
}

After setting an expiration time for the message, if the queue is bound to a dead letter queue, the message will be moved to another queue if it has not been consumed for a long time. Please refer to Dead Letter Queue.

More functionalities can also be configured via message properties. Please refer to the IBasicProperties documentation for more information.

Transactions

RabbitMQ natively supports a transactional model. For RabbitMQ’s transactional communication protocol, refer to https://www.rabbitmq.com/docs/semantics.

According to the RabbitMQ official documentation, transaction mode can reduce throughput by 250 times, primarily due to the transactional mechanism. The transactional mode not only guarantees that the message has been pushed to the Rabbit broker, but also ensures that Rabbit brokers synchronize across multiple nodes. In case of a Rabbit broker failure, the message must be completely synchronized. However, such strict modes may generally not be necessary, so the acknowledgment mechanism mentioned in the next subsection can also be used.

The transaction interface in Maomi.MQ is simple to use. You can directly open an ITransactionPublisher using extension methods, and the usage is quite straightforward. Here is an example:

[HttpGet("publish_tran")]
public async Task<string> Publisher_Tran()
{
	using var tranPublisher = _messagePublisher.CreateTransaction();
	await tranPublisher.TxSelectAsync();

	try
	{
		await tranPublisher.PublishAsync(exchange: string.Empty, routingKey: "publish", message: new TestEvent
		{
			Id = 666
		});
		await Task.Delay(5000);
		await tranPublisher.TxCommitAsync();
	}
	catch
	{
		await tranPublisher.TxRollbackAsync();
		throw;
	}

	return "ok";
}

Publisher Confirm Mode

The transaction mode ensures that messages will be pushed to the RabbitMQ server and synchronized across nodes, but due to the throughput reduction of 250 times, RabbitMQ has introduced a confirmation mechanism. This mechanism works like a sliding window, ensuring messages are pushed to the server while maintaining high-performance characteristics, with a throughput that is 100 times greater than that of transaction mode. Reference materials:

https://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms

https://www.rabbitmq.com/docs/confirms

However, the new version of the .NET RabbitMQClient library has removed several API interfaces, and detailed information about the modifications can be found in: Issue #1682, RabbitMQ tutorial - Reliable Publishing with Publisher Confirms.

Maomi.MQ has simplified and adjusted its implementation according to the new version. The specific usage is by creating a message publisher that uses a separate channel, then specifying the IChannel property in the parameters.

using var confirmPublisher = _messagePublisher.CreateSingle(
	new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true));

for (var i = 0; i < 5; i++)
{
	await confirmPublisher.PublishAsync(exchange: string.Empty, routingKey: "publish", message: new TestEvent
	{
		Id = 666
	});
}

The publisher for the transaction mode and confirmation mechanism is isolated from each other. When creating these two objects, they automatically use a new IChannel by default, so there is no need to worry about conflicts.

If developers have their own requirements, they can create a separate IChannel using CreateSingle and customize the CreateChannelOptions property. For a description of CreateChannelOptions, please refer to:

https://rabbitmq.github.io/rabbitmq-dotnet-client/api/RabbitMQ.Client.CreateChannelOptions.html

Broadcast Mode

Broadcast mode is used to push a message to an exchange, allowing all bound queues to receive the same message. Simply put, this mode pushes a message to the exchange, which then forwards the message to all bound queues, enabling different consumers from various queues to simultaneously receive the message.

98ae0f8039f4b17a0c14048c82f1e631_post-21430-6555f746c77f1

In RabbitMQ, there are four types of exchanges, which are defined as constants:

public static class ExchangeType
{
	public const string Direct = "direct";
	public const string Fanout = "fanout";
	public const string Headers = "headers";
	public const string Topic = "topic";
	private static readonly string[] s_all = { Fanout, Direct, Topic, Headers };
}

However, different exchange types have different usage. For example, when a queue is bound to a fanout type exchange, the Rabbit broker will ignore RoutingKey and push the message to all bound queues.

So we define two consumers that are bound to the same fanout type exchange:

[Consumer("fanout_1", BindExchange = "fanouttest", ExchangeType = "fanout")]
public class FanoutEvent_1_Consumer : IConsumer<FanoutEvent>
{
    // Consume
    public virtual async Task ExecuteAsync(MessageHeader messageHeader, FanoutEvent message)
    {
        Console.WriteLine($"【fanout_1】, event id: {message.Id} {DateTime.Now}");
        await Task.CompletedTask;
    }
    
    // ... ...
}

[Consumer("fanout_2", BindExchange = "fanouttest", ExchangeType = "fanout")]
public class FanoutEvent_2_Consumer : IConsumer<FanoutEvent>
{
    // Consume
    public virtual async Task ExecuteAsync(MessageHeader messageHeader, FanoutEvent message)
    {
        Console.WriteLine($"【fanout_2】, event id: {message.Id} {DateTime.Now}");
        await Task.CompletedTask;
    }
    
    // ... ...
}

image-20250208090419019

When publishing messages, you only need to set the exchange name, and both consumer services will receive the messages simultaneously:

[HttpGet("publish_fanout")]
public async Task<string> Publisher_Fanout()
{
	for (var i = 0; i < 5; i++)
	{
		await _messagePublisher.PublishAsync(exchange: "fanouttest", routingKey: string.Empty, message: new FanoutEvent
		{
			Id = 666
		});
	}

	return "ok";
}

For Topic type exchanges and queues, the usage is consistent. Here are two consumer definitions:

[Consumer("red.yellow.#", BindExchange = "topictest", ExchangeType = "topic")]
public class TopicEvent_1_Consumer : IConsumer<TopicEvent>
{
    // Consume
    public virtual async Task ExecuteAsync(MessageHeader messageHeader, TopicEvent message)
    {
        Console.WriteLine($"【red.yellow.#】, event id: {message.Id} {DateTime.Now}");
        await Task.CompletedTask;
    }
    
    // ... ...
}

[Consumer("red.#", BindExchange = "topictest", ExchangeType = "topic")]
public class TopicEvent_2_Consumer : IConsumer<TopicEvent>
{
    // Consume
    public virtual async Task ExecuteAsync(MessageHeader messageHeader, TopicEvent message)
    {
        Console.WriteLine($"【red.#】, event id: {message.Id} {DateTime.Now}");
        await Task.CompletedTask;
    }
    
    // ... ...
}

Publishing messages looks like this:

[HttpGet("publish_topic")]
public async Task<string> Publisher_Topic()
{
	for (var i = 0; i < 5; i++)
	{
		await _messagePublisher.PublishAsync(exchange: "topictest", routingKey: "red.a", message: new TopicEvent
		{
			Id = 666
		});
		await _messagePublisher.PublishAsync(exchange: "topictest", routingKey: "red.yellow.a", message: new TopicEvent
		{
			Id = 666
		});
	}

	return "ok";
}

Unrouteable Messages

When a message is published and it is unrouteable (e.g., no corresponding queue), the IBreakdown.BasicReturnAsync interface will be triggered. The BasicReturnEventArgs property provides detailed information about the error reason.

image-20250208092628386

image-20250206205340364

In the case of network failures, RabbitMQ service crashes, or missing corresponding exchange names, exceptions will occur on the current thread, and the TCP connection will automatically reconnect.

It is important to note RabbitMQ's mechanism that message pushing does not happen synchronously. Therefore, even if pushing fails, it will not raise an exception on the current thread, and thus you cannot determine whether the current message was successfully pushed.

For unrouteable messages, Maomi.MQ only provides simple interface notifications without any other handling mechanisms, so developers need to handle it themselves. There is an MQ communication framework in the community called EasyNetQ, which automatically creates new queues to push currently unrouteable messages into new queues for persistent storage by default.

Developers can implement this interface and then register it into the container:

services.AddScoped<IBreakdown, MyDefaultBreakdown>();

For example, to push unrouteable messages to a new queue:

public class MyDefaultBreakdown : IBreakdown
{
    private readonly ConnectionPool _connectionPool;

    public MyDefaultBreakdown(ConnectionPool connectionPool)
    {
        _connectionPool = connectionPool;
    }

    /// <inheritdoc />
    public async Task BasicReturnAsync(object sender, BasicReturnEventArgs @event)
    {
        var connectionObject = _connectionPool.Get();
        await connectionObject.DefaultChannel.BasicPublishAsync<BasicProperties>(
            @event.Exchange, 
            @event.RoutingKey + ".failed", 
            true, 
            new BasicProperties(@event.BasicProperties), 
            @event.Body);
    }

    /// <inheritdoc />
    public Task NotFoundConsumerAsync(string queue, Type messageType, Type consumerType)
    {
        return Task.CompletedTask;
    }
}

In fact, in the case of unrouteable messages, it is not just about forwarding and storing them; it is essential to check whether queues were mistakenly deleted or whether queue names were consistent when publishing messages, etc.

Consumers

In Maomi.MQ.RabbitMQ, there are three consumption modes: consumer mode, event mode (event bus mode), and dynamic consumer mode, where the dynamic consumer mode also supports multiple consumption models.

Below is a brief introduction to the usage of these three modes, which will be explained in detail later.

Consumer Mode

Consumer services need to implement the IConsumer<TEvent> interface and configure the [Consumer("queue")] attribute to bind the queue name. The consumer object controls the consumption behavior, and the consumer mode possesses failure notification and compensation capabilities, making it relatively simple to use.

The configuration of [ConsumerAttribute] can be modified at runtime.

public class TestEvent
{
    public int Id { get; set; }
}

[Consumer("PublisherWeb", Qos = 1, RetryFaildRequeue = true)]
public class MyConsumer : IConsumer<TestEvent>
{
    private static int _retryCount = 0;

    // Consume
    public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
    {
        _retryCount++;
        Console.WriteLine($"Execution count: {_retryCount} Event id: {message.Id} {DateTime.Now}");
        await Task.CompletedTask;
    }

    // Executed on each consumption failure
    public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message)
        => Task.CompletedTask;

    // Compensation
    public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
        => Task.FromResult(ConsumerState.Ack);
}

<br />

### Event Pattern

The event pattern is implemented through an event bus, centered around an event model to control consumption behavior via events.

```csharp
[EventTopic("web2", Qos = 1, RetryFaildRequeue = true)]
public class TestEvent
{
    public string Message { get; set; }
}

The event execution order is orchestrated using the [EventOrder] attribute.

// Orchestrating event consumption order
[EventOrder(0)]
public class My1EventEventHandler : IEventHandler<TestEvent>
{
    public async Task CancelAsync(TestEvent @event, CancellationToken cancellationToken)
    {
    }

    public async Task ExecuteAsync(TestEvent @event, CancellationToken cancellationToken)
    {
        Console.WriteLine($"{@event.Id}, Event 1 has been executed");
    }
}

[EventOrder(1)]
public class My2EventEventHandler : IEventHandler<TestEvent>
{
    public async Task CancelAsync(TestEvent @event, CancellationToken cancellationToken)
    {
    }

    public async Task ExecuteAsync(TestEvent @event, CancellationToken cancellationToken)
    {
        Console.WriteLine($"{@event.Id}, Event 2 has been executed");
    }
}

Of course, the event pattern can also add compensation functionality by creating middleware. Middleware can also group all ordered events into the same transaction to succeed or fail together, avoiding consistency issues caused by program termination during event execution.

public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
    private readonly BloggingContext _bloggingContext;

    public TestEventMiddleware(BloggingContext bloggingContext)
    {
        _bloggingContext = bloggingContext;
    }

    public async Task ExecuteAsync(MessageHeader messageHeader, TMessage message, EventHandlerDelegate<TMessage> next)
    {
        using (var transaction = _bloggingContext.Database.BeginTransaction())
        {
            await next(@event, CancellationToken.None);
            await transaction.CommitAsync();
        }
    }

    public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TMessage? message)
    {
        return Task.CompletedTask;
    }

    public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TMessage? message, Exception? ex)
    {
        return Task.FromResult(true);
    }
}

Both the consumer pattern and the event bus pattern can handle large-capacity messages, as shown in the figure below, where each message is nearly 500kb, with multiple queues consuming concurrently.

image-20240720221514504

If the message content is not large, very high consumption speeds can be achieved.

image-20240720212715583

Dynamic Consumers

Dynamic consumers can dynamically subscribe to queues at runtime and support three ways of consumer types, event bus types, and function binding.

You can use dynamic consumer services by injecting IDynamicConsumer.

await _dynamicConsumer.ConsumerAsync<MyConsumer, TestEvent>(new ConsumerOptions("myqueue")
{
    Qos = 10
});
// Automatically corresponding consumer to the event model
await _dynamicConsumer.ConsumerAsync<TestEvent>(new ConsumerOptions("myqueue")
{
    Qos = 10
});
// Function-based consumption
_dynamicConsumer.ConsumerAsync<TestEvent>(new ConsumerOptions("myqueue")
{
    Qos = 10
}, async (header, message) =>
{
    Console.WriteLine($"Event id: {message.Id} {DateTime.Now}");
    await Task.CompletedTask;
});

Consumer Registration Pattern

Maomi.MQ provides the ITypeFilter interface, allowing developers to implement a custom consumer registration pattern using this interface.

Maomi.MQ has three built-in ITypeFilter:

  • Consumer pattern ConsumerTypeFilter
  • Event bus pattern EventBusTypeFilter
  • Custom consumer pattern ConsumerTypeFilter

The framework registers ConsumerTypeFilter and EventBusTypeFilter by default; developers can adjust which pattern to use.

var consumerTypeFilter = new ConsumerTypeFilter();
// ...
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
    // ... ...
}, 
[typeof(Program).Assembly],  // Assembly to scan automatically
[new ConsumerTypeFilter(), new EventBusTypeFilter(), consumerTypeFilter]);  // Configuration of the consumer registration pattern to use

Consumer Pattern

The consumer pattern requires services to implement the IConsumer<TEvent> interface. There are three ways to register consumer services.

  • Add the [Consumer] attribute, and the application will automatically scan and inject it at startup, allowing dynamic modification of the [Consumer].
  • Not setting [Consumer], using CustomConsumerTypeFilter to manually set the consumer service and configuration.
  • Dynamically bind consumers at runtime using IDynamicConsumer.

This example can refer to the ConsumerWeb project.

The IConsumer<TEvent> interface is quite simple, defined as follows:

public interface IConsumer<TMessage>
    where TMessage : class
{
    public Task ExecuteAsync(MessageHeader messageHeader, TMessage message);

    public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TMessage message);

    public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TMessage? message, Exception? ex);
}

When using the consumer pattern, a model class needs to be defined for passing messages between the publisher and consumer. The event model class can be any class that can be serialized and deserialized properly, with no other requirements.

public class TestEvent
{
    public int Id { get; set; }

    public override string ToString()
    {
        return Id.ToString();
    }
}

Then inherit from the IConsumer<TEvent> interface to implement consumer functionality:

[Consumer("ConsumerWeb", Qos = 1)]
public class MyConsumer : IConsumer<TestEvent>
{
    private readonly ILogger<MyConsumer> _logger;

    public MyConsumer(ILogger<MyConsumer> logger)
    {
        _logger = logger;
    }

    // Consume
    public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
    {
        Console.WriteLine($"Event id: {message.Id}");
    }

    // Executed on each failure
    public async Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message)
    {
        _logger.LogError(ex, "Consumer exception, event id: {Id}, retry count: {retryCount}", message!.Id, retryCount);
    }

    // Executed on the last failure
    public async Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
    {
        return ConsumerState.Ack;
    }
}

For the description of the attribute configuration, refer to Consumer Configuration.

Manually Injecting Consumers

Developers can manually register consumer services through CustomConsumerTypeFilter by simply configuring ConsumerOptions.

var consumerOptions = new ConsumerOptions("test-queue_2")
{
    DeadExchange = "test-dead-exchange_2",
    DeadRoutingKey = "test-dead-routing-key_2",
    Expiration = 60000,
    Qos = 10,
    RetryFaildRequeue = true,
    AutoQueueDeclare = AutoQueueDeclare.Enable,
    BindExchange = "test-bind-exchange_2",
    ExchangeType = "direct",
    RoutingKey = "test-routing_2"
};

// Create a custom consumer pattern
var consumerTypeFilter = new CustomConsumerTypeFilter();
var consumerType = typeof(TestConsumer);
consumerTypeFilter.AddConsumer(consumerType, consumerOptions);

When registering MQ services, add the custom consumer pattern:

builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
    // ... ...
}, 
[typeof(Program).Assembly], 
[new ConsumerTypeFilter(), new EventBusTypeFilter(), consumerTypeFilter]); // Add custom consumer pattern

Dynamic Consumers

Inject IDynamicConsumer to use dynamic consumer services; the added consumers will run in the background automatically.

var consumerTag = await _dynamicConsumer.ConsumerAsync<MyConsumer, TestEvent>(new ConsumerOptions("myqueue")
{
    Qos = 10
});

If you need to unsubscribe, you can do so using the consumerTag or queue name.

await _dynamicConsumer.StopConsumerTagAsync(consumerTag);
await _dynamicConsumer.StopConsumerAsync(queueName);

Consumption, Retrying, and Compensation

When the consumer receives a message pushed by the server, the ExecuteAsync method will be automatically executed. If an exception occurs during ExecuteAsync, the FaildAsync method will be immediately triggered, allowing developers to log relevant information.

// Executed on each failure
public async Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message)
{
    _logger.LogError(ex, "Consumer exception, event id: {Id}, retry count: {retryCount}", message!.Id, retryCount);
}

By default, the framework will retry a maximum of three times, meaning that the ExecuteAsync method can be executed a total of four times.

If the FaildAsync method also throws an exception, it will not affect the overall process—the framework will wait for the interval time to continue retrying the ExecuteAsync method.

It is recommended to wrap FaildAsync logic in try{}catch{} and not throw exceptions outward; FaildAsync logic should not include too much logic and should only be used for logging or alerts.

When the ExecuteAsync method throws an exception, the framework will automatically retry, defaulting to three retries. If all three fail, it will execute the FallbackAsync method for compensation.

The retry interval will gradually increase; please refer to Retry.

After three retries, the compensation mechanism will be triggered immediately.

// Executed on the last failure
public async Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
{
    return ConsumerState.Ack;
}

The FallbackAsync method needs to return ConsumerState to indicate that although ExecuteAsync encountered an exception, the compensation via FallbackAsync has normalized it, and the message will be considered consumed properly. If it returns false, it indicates that the compensation failed and the message will be treated as a consumption failure.

Only when ExecuteAsync fails will FaildAsync and FallbackAsync be triggered.

Consumption Failure

When the failure count of ExecuteAsync reaches the threshold, the message consumption fails, or if serialization or other errors occur, it will fail directly, eventually triggering FallbackAsync.

<br />
<br />

There are three important configurations in `IConsumerOptions`:

```csharp
public class IConsumerOptions : Attribute
{
    // When the number of failed consumption reaches the condition, whether to requeue.
    public bool RetryFaildRequeue { get; set; }

    /// Binds the dead letter exchange
    public string? DeadExchange { get; set; }

    /// Binds the dead letter queue
    public string? DeadRoutingKey { get; set; }

}

The return value of FallbackAsync is the ConsumerState enumeration, defined as follows:

/// After receiving a RabbitMQ message, determine the action (ACK, NACK, requeue, etc.) based on the state enumeration.
public enum ConsumerState
{
    /// ACK.
    Ack = 1,

    /// Immediately NACK and use default configuration to determine whether to requeue the message.
    Nack = 1 &lt;&lt; 1,

    /// Immediately NACK and requeue the message.
    NackAndRequeue = 1 &lt;&lt; 2,

    /// Immediately NACK, and the message will be removed from the server queue.
    NackAndNoRequeue = 1 &lt;&lt; 3,

    /// Exception occurred.
    Exception = 1 &lt;&lt; 4
}

There are various situations that can lead to consumption failures, with specific logic outlined below:

  • If there is a deserialization exception or an exception occurs when executing FallbackAsync, ConsumerState.Exception will be triggered directly. Finally, based on IConsumerOptions.RetryFaildRequeue, it will be determined whether to requeue the message for the next consumption attempt.
  • If FallbackAsync returns ConsumerState.ACK, it indicates that although the consumption of the message has consistently failed, the message is still acknowledged.
  • If FallbackAsync returns ConsumerState.Nack, it indicates a consumption failure, but whether to requeue is decided by IConsumerOptions.RetryFaildRequeue.
  • If FallbackAsync returns ConsumerState.NackAndRequeue, it indicates an immediate consumption failure and that the message will be requeued.
  • If FallbackAsync returns ConsumerState.NackAndNoRequeue, it indicates an immediate consumption failure and that the message will not be requeued.

Automatic Queue Creation

The framework automatically creates queues by default. To disable the automatic creation feature, simply set AutoQueueDeclare to false.

builder.Services.AddMaomiMQ((MqOptionsBuilder options) =&gt;
{
	options.WorkId = 1;
	options.AppName = &quot;myapp&quot;;
	options.AutoQueueDeclare = false;
	options.Rabbit = (ConnectionFactory options) =&gt;
	{
        // ... ...
	};
}, [typeof(Program).Assembly]);

Consumers can also be configured separately for automatic queue creation:

[Consumer(&quot;ConsumerWeb_create&quot;, AutoQueueDeclare = AutoQueueDeclare.Enable)]

By default, if global automatic creation is disabled, queues will not be created automatically.

If global automatic creation is disabled but a consumer is configured with AutoQueueDeclare = AutoQueueDeclare.Enable, the queue will still be created automatically.

If a consumer is configured with AutoQueueDeclare = AutoQueueDeclare.Disable, it will ignore the global configuration and the queue will not be created.

Qos

The default Qos is 100.

When the program needs to strictly consume in order, you can use Qos = 1, and the framework will ensure strict one-by-one consumption. If the program does not require ordered consumption and aims to process all messages quickly, a larger Qos can be set. Because the combination of Qos with retry and compensation mechanisms creates various scenarios, please refer to Retry.

Qos is configured via attributes:

[Consumer(&quot;ConsumerWeb&quot;, Qos = 1)]

By increasing the Qos value, the program can handle messages concurrently, increasing throughput.

Setting the Qos value according to network environment, server performance, and instance count can effectively improve message processing speed. Please refer to Qos.

Delay Queues

There are two types of delay queues: one sets message expiration time and the other sets queue expiration time.

Setting a message expiration time means that if the message is not consumed within a certain time, it will be discarded or moved to the dead letter queue. This configuration applies only to individual messages; please refer to Message Expiry.

Once a queue is set to expire, if messages are not consumed within a specific timeframe, they will be either discarded or moved to the dead letter queue. This configuration applies to all messages. Based on this, we can implement a delay queue.

First, create a consumer that inherits from EmptyConsumer, so that the queue will be created when the program starts, but no IConnection will be created for consumption. Then set the message expiration time for the queue and bind the dead letter queue. The bound dead letter queue can utilize either the consumer pattern or the event pattern.

[Consumer(&quot;consumerWeb_dead&quot;, Expiration = 6000, DeadRoutingKey = &quot;consumerWeb_dead_queue&quot;)]
public class EmptyDeadConsumer : EmptyConsumer&lt;DeadEvent&gt;
{
}

// Messages that fail to be consumed in ConsumerWeb_dead will be consumed by this consumer.
[Consumer(&quot;consumerWeb_dead_queue&quot;, Qos = 1)]
public class Dead_QueueConsumer : IConsumer&lt;DeadQueueEvent&gt;
{
    // Consume
    public Task ExecuteAsync(MessageHeader messageHeader, DeadQueueEvent message)
    {
        Console.WriteLine($&quot;Dead letter queue, event id:{message.Id}&quot;);
        return Task.CompletedTask;
    }

    // Executed on each failure
    public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, DeadQueueEvent message) =&gt; Task.CompletedTask;

    // Executed on the last failure
    public Task&lt;ConsumerState&gt; FallbackAsync(MessageHeader messageHeader, DeadQueueEvent? message, Exception? ex)
        =&gt; Task.FromResult(ConsumerState.Ack);
}

Empty Consumers

When an empty consumer is identified, the framework will create only the queue and will not start consuming messages.

It can be used in conjunction with delay queues such that this queue will not have any consumers; when the messages in this queue expire, they will be directly consumed by the dead letter queue, as shown below:

[Consumer(&quot;ConsumerWeb_empty&quot;, Expiration = 6000, DeadQueue = &quot;ConsumerWeb_empty_dead&quot;)]
public class MyEmptyConsumer : EmptyConsumer&lt;TestEvent&gt; { }

[Consumer(&quot;ConsumerWeb_empty_dead&quot;, Qos = 10)]
public class MyDeadConsumer : IConsumer&lt;TestEvent&gt;
{
    // ... ...
}

For cross-process queues, Service A publishes but does not consume, while Service B is responsible for consumption. In Service A, an empty consumer can be added to ensure that the queue exists upon startup. On the other hand, the consumer service should not concern itself with the definition of the queue and should not create queues.

Broadcast Mode

In RabbitMQ, after setting up a Fanout or Topic exchange, if multiple queues are bound to the exchange, each queue will receive identical messages. In microservice scenarios, for example, after an employee leaves a company, a message needs to be published so that all systems subscribed to this message can process the relevant employee separation data.

Create two consumer queues with unique names, and bind them to the same exchange, which can be named arbitrarily, e.g., exchange.

[Consumer(&quot;ConsumerWeb_exchange_1&quot;, BindExchange = &quot;exchange&quot;)]
public class Exchange_1_Consumer : IConsumer&lt;TestEvent&gt;
{
    /// ... ...
}

[Consumer(&quot;ConsumerWeb_exchange_2&quot;, BindExchange = &quot;exchange&quot;)]
public class Exchange_2_Consumer : IConsumer&lt;TestEvent&gt;
{
    // ... ... 
}

When publishing messages, the publisher needs to use broadcast mode; refer to: Broadcast Mode

Of course, Maomi.MQ allows for custom exchange types and exchange names.

Based on Events

Maomi.MQ is internally designed with an event bus to help developers implement event orchestration, local transactions, forward execution, and compensation.

Maomi.MQ does not design local message tables or other distributed transaction assurance mechanisms based on several considerations:

  • Maomi.MQ is a communication model based on message queues and is not specifically designed for distributed transactions, lacking coordination capabilities. To utilize distributed transaction orchestration, platforms such as DTM, Seata, etc., should be employed as transaction coordination centers.
  • Maomi.MQ has designed retry and compensation strategies that can address exceptions to some extent.
  • Maomi.MQ itself cannot guarantee idempotency, empty compensations, etc., but it is also not necessary to strictly guarantee consumption in all cases.
  • By using the middleware functionality of event patterns, developers can easily handle issues like idempotency, empty compensations, and suspension.

Using Event Patterns

First, define an event type that binds to a topic or queue. The event needs to be marked with the [EventTopic] attribute and set the corresponding queue name.

The [EventTopic] attribute possesses the same characteristics as the [Consumer] attribute; refer to the configuration of [Consumer] for event usage, as detailed in Consumer Configuration.

[EventTopic(&quot;EventWeb&quot;)]
public class TestEvent
{
	public string Message { get; set; }

	public override string ToString()
	{
		return Message;
	}
}

Next, orchestrate event handlers, where each handler needs to inherit from IEventHandler<T> and utilize the [EventOrder] attribute to mark execution order.

[EventOrder(0)]
public class My1EventEventHandler : IEventHandler&lt;TestEvent&gt;
{
    public Task CancelAsync(TestEvent message, CancellationToken cancellationToken)
    {
        return Task.CompletedTask;
    }

    public Task ExecuteAsync(TestEvent message, CancellationToken cancellationToken)
    {
        Console.WriteLine($&quot;{message.Message}, Event 1 has been executed&quot;);
        return Task.CompletedTask;
    }
}

[EventOrder(1)]
public class My2EventEventHandler : IEventHandler&lt;TestEvent&gt;
{
    public Task CancelAsync(TestEvent message, CancellationToken cancellationToken)
    {
        return Task.CompletedTask;
    }

    public Task ExecuteAsync(TestEvent message, CancellationToken cancellationToken)
    {
        Console.WriteLine($&quot;{message.Message}, Event 2 has been executed&quot;);
        return Task.CompletedTask;
    }
}

Each event handler must implement the IEventHandler<T> interface and set the [EventOrder] attribute to confirm the execution order. The framework will execute the ExecuteAsync method of IEventHandler<T> in sequence; if an exception occurs in ExecuteAsync, CancelAsync will be called in reverse order.

Since the program may crash at any time, compensating via CancelAsync is not feasible, and CancelAsync is primarily used for recording related information.

Middleware

The purpose of middleware is to allow developers to intercept events, log information, and implement local transactions. If developers do not configure middleware, the framework will automatically create a DefaultEventMiddleware<TEvent> type as the middleware service for the event.

Example code for custom event middleware:

public class TestEventMiddleware : IEventMiddleware&lt;TestEvent&gt;
{
    public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message, EventHandlerDelegate&lt;TestEvent&gt; next)
    {
        await next(messageHeader, message, CancellationToken.None);
    }
    
    public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent? message) =&gt; Task.CompletedTask;
    
    public Task&lt;ConsumerState&gt; FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex) =&gt; Task.FromResult(ConsumerState.Ack);
}

The next delegate is a constructed event execution chain by the framework, allowing interception of events and deciding whether to execute the event chain.

Calling the next() delegate in the middleware starts the sequential execution of events, as previously mentioned with My1EventEventHandler and My2EventEventHandler.

When an event has multiple handlers, local transactions become essential since the program may crash at any time.

For example, you could inject a database context into the middleware and start a transaction for database operations. If one of the EventHandlers fails, the execution chain will roll back and the transaction will not be committed.

You can refer to Consumer Pattern for implementing retry and compensation methods within middleware.

The example is as follows:

public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
    private readonly BloggingContext _bloggingContext;

    public TestEventMiddleware(BloggingContext bloggingContext)
    {
        _bloggingContext = bloggingContext;
    }

    public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message, EventHandlerDelegate<TestEvent> next)
    {
        using (var transaction = _bloggingContext.Database.BeginTransaction())
        {
            await next(messageHeader, message, CancellationToken.None);
            await transaction.CommitAsync();
        }
    }

    public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent? message)
    {
        return Task.CompletedTask;
    }

    public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
    {
        return Task.FromResult(ConsumerState.Ack);
    }
}

[EventOrder(0)]
public class My1EventEventHandler : IEventHandler<TestEvent>
{
    private readonly BloggingContext _bloggingContext;

    public My1EventEventHandler(BloggingContext bloggingContext)
    {
        _bloggingContext = bloggingContext;
    }

    public async Task CancelAsync(TestEvent message, CancellationToken cancellationToken)
    {
        Console.WriteLine($"{message.Message} is being compensated, [1]");
    }

    public async Task ExecuteAsync(TestEvent message, CancellationToken cancellationToken)
    {
        await _bloggingContext.Posts.AddAsync(new Post
        {
            Title = "Robinson Crusoe",
            Content = "Just writing casually"
        });
        await _bloggingContext.SaveChangesAsync();
    }
}

[EventOrder(1)]
public class My2EventEventHandler : IEventHandler<TestEvent>
{
    private readonly BloggingContext _bloggingContext;

    public My2EventEventHandler(BloggingContext bloggingContext)
    {
        _bloggingContext = bloggingContext;
    }
    public async Task CancelAsync(TestEvent message, CancellationToken cancellationToken)
    {
        Console.WriteLine($"{message.Id} is being compensated, [2]");
    }

    public async Task ExecuteAsync(TestEvent message, CancellationToken cancellationToken)
    {
        await _bloggingContext.Posts.AddAsync(new Post
        {
            Title = "Dream of the Red Chamber",
            Content = "Jia Baoyu's first encounter with love"
        });
        await _bloggingContext.SaveChangesAsync();

        throw new OperationCanceledException("Intentional error");
    }
}

image-20240525155639461

When executing the event, if an exception occurs, a retry will also take place. The FaildAsync and FallbackAsync methods of the middleware TestEventMiddleware will be executed in sequence.

You can refer to the Consumer Pattern or Retry.

Idempotency, Null Compensation, and Suspension

In microservices, a service may crash and restart at any time, which can lead to issues such as idempotency, null compensation, and suspension.

Idempotency

For example, when consumer A consumes message 01 and writes the result to the database, if Maomi.MQ has not yet pushed "ack" to RabbitMQ and the program restarts, the program will retry consuming the message 01 since it has not yet received an acknowledgment. If the program continues to write to the database at this point, it will lead to duplication. Thus, developers need to ensure that even if the message is consumed multiple times, it does not cause inconsistencies or duplicated operations in the database.

Of course, not all cases require non-repetitive consumption. We focus here on situations that can only be consumed once, for instance, inserting order information into the database.

This requires that each message has a specific business ID or a distributed snowflake ID. During consumption, it needs to check whether this ID already exists in the database to determine if the program is re-consuming.

For example:

public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
    private readonly BloggingContext _bloggingContext;

    public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message, EventHandlerDelegate<TestEvent> next)
    {
        var existId = await _bloggingContext.Posts.AnyAsync(x => x.PostId == @event.Id);
        if (existId)
        {
            return;
        }

        using (var transaction = _bloggingContext.Database.BeginTransaction())
        {
            await next(@event, CancellationToken.None);
            await transaction.CommitAsync();
        }
    }
}

Null Compensation

In distributed transactions, when orchestrating interfaces of three services A, B, and C, if C encounters an exception, then the distributed transaction manager will first call the compensation interface of C, followed by B and A.

Each call is made via an interface, so it cannot be handled in a single database transaction.

There are two cases here.

One case is that C has already completed the database insertion operation, giving the user's balance +100. However, if the program then crashes or times out, the transaction manager may mistakenly think it failed and call the compensation interface. In this case, the compensation interface will undo the previously modified data, and there's no issue there.

The second case is that if C has not yet completed the database operation, and it encounters an exception, the transaction manager then calls the compensation interface. If the compensation interface deducts -100 from the user's balance, that would be incorrect.

Hence, services must ensure the success of the previous operations. If successful, it initiates the undo process; if not, it immediately returns a result indicating the compensation was successful.

In general, Maomi.MQ does not experience null compensation issues because it is not a distributed transaction framework.

Although Maomi.MQ does provide a CancelAsync() method for performing rollback processes, this is mainly intended for developers to log information and not to execute compensation. Furthermore, all the processes in event orchestration are local and do not involve issues of null compensation in distributed transactions. Therefore, it is only necessary to ensure local database transaction integrity and guarantee idempotency.

Suspension

In distributed transactions, there is a forward execution request and a rollback request. If the execution fails, the rollback interface will be called. However, due to the complexity of distributed networks, the transaction manager cannot be certain of the status of service C; it treats C as a small black box. When a request fails, the transaction manager will call the compensation interface. After the compensation interface is invoked, for various reasons, the forward execution interface may also get called. This could be due to automatic retries from the gateway or delays in service response. If the compensation interface executes before the forward execution interface, the distributed transaction fails; the transaction manager has already initiated the compensation process, ending the transaction. However, if C executes a forward request afterwards, leading to a user balance of +100, it seems normal while in reality, it’s not. This situation is referred to as suspension.

Since Maomi.MQ does not involve multi-service transaction orchestration, it only needs to focus on idempotency without concerning about null compensation and suspension issues. Whether idempotency needs to be guaranteed depends on the developer and their business requirements. Therefore, Maomi.MQ did not design a distributed transaction work model with a local message table.

The configuration under the event pattern is consistent with that of the consumer pattern; thus, it will not be elaborated here. Refer to the Consumer Pattern.

Custom Consumers and Dynamic Subscriptions

This mainly implements two functionalities.

  • At program startup, custom consumer configurations and consumer models can be defined without using attribute annotations for configuration.
  • After program startup, a consumer can be started or stopped at any time.

Sample project reference: https://github.com/whuanle/Maomi.MQ/tree/main/example/consumer/DynamicConsumerWeb

Custom Consumers

Consumers can be defined without attribute annotations; implementing IConsumer<TEvent> is sufficient. During assembly scanning, consumers without attribute annotations will be ignored.

Define consumer model:

public class DynamicCustomConsumer : IConsumer<TestEvent>
{
    public Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
    {
        throw new NotImplementedException();
    }

    public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message)
    {
        throw new NotImplementedException();
    }

    public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
    {
        throw new NotImplementedException();
    }
}

Then manually configure consumers and attributes through DynamicConsumerTypeFilter.

DynamicConsumerTypeFilter dynamicConsumerTypeFilter = new();

dynamicConsumerTypeFilter.AddConsumer(typeof(DynamicCustomConsumer), new ConsumerOptions
{
	Queue = "test1"
});
dynamicConsumerTypeFilter.AddConsumer(typeof(DynamicCustomConsumer), new ConsumerOptions
{
	Queue = "test2"
});

When injecting services, manually add the type filter.

builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
	options.WorkId = 1;
	options.AutoQueueDeclare = true;
	options.AppName = "myapp";
	options.Rabbit = (ConnectionFactory options) =>
	{
        // ... ...
	};
}, [typeof(Program).Assembly], [
    new ConsumerTypeFilter(),  // Consumer type filter
    new EventBusTypeFilter(),  // Event bus type filter
    dynamicConsumerTypeFilter  // Dynamic consumer filter
]);

Dynamic Subscription

After the program starts, a consumer can be dynamically started or stopped via the IDynamicConsumer service. Consumers that are already running at program startup will not be controlled by dynamic subscriptions and cannot be stopped while the program is running.

Dynamically start a consumer:

private readonly IMessagePublisher _messagePublisher;
private readonly IDynamicConsumer _dynamicConsumer;

[HttpPost("create")]
public async Task<string> CreateConsumer([FromBody] ConsumerDto consumer)
{
	foreach (var item in consumer.Queues)
	{
		await _dynamicConsumer.ConsumerAsync<MyConsumer, TestEvent>(new ConsumerOptions(item));
	}

	return "ok";
}

If you do not want to define a model class, you can directly use a function approach:

foreach (var item in consumer.Queues)
{
	var consumerTag = await _dynamicConsumer.ConsumerAsync<TestEvent>(
		consumerOptions: new ConsumerOptions(item),
		execute: async (header, message) =>
		{
			await Task.CompletedTask;
		},
		faild: async (header, ex, retryCount, message) => { },
		fallback: async (header, message, ex) => ConsumerState.Ack
	);
}

return "ok";

Using the queue name, you can dynamically stop consumers:

[HttpPost("stop")]
public async Task<string> StopConsumer([FromBody] ConsumerDto consumer)
{
	foreach (string queueName in consumer.Queues)
	{
		await _dynamicConsumer.StopConsumerAsync(queueName);
	}

	return "ok";
}

You can also stop consumers using the consumer tag:

var consumerTag = await _dynamicConsumer.ConsumerAsync<MyConsumer, TestEvent>(new ConsumerOptions(item));
await _dynamicConsumer.StopConsumerTagAsync(consumerTag);

Configuration

When introducing the Maomi.MQ framework, you can configure related properties, as shown in the example and explanation below:

// this.
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
    // Required: Current program node, used for configuring distributed snowflake IDs.
    // Configuring WorkId can prevent ID duplication of the same message under high concurrency.
	options.WorkId = 1;

    // Whether to automatically create queues
	options.AutoQueueDeclare = true;

    // Current application name, used to identify the message publisher and consumer programs
	options.AppName = "myapp";

    // Required: RabbitMQ configuration
	options.Rabbit = (ConnectionFactory options) =>
	{
        options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
        options.Port = 5672;
		options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
	};
}, [typeof(Program).Assembly]);  // The assembly to be scanned

Developers can manually manage RabbitMQ connections through ConnectionFactory, such as failure recovery and custom connection parameters.

Type Filters

The interface of type filters is ITypeFilter, which serves to scan and recognize types and add them as consumers. By default, ConsumerTypeFilter and EventBusTypeFilter are enabled, which recognize and utilize consumer models and event bus consumer patterns; both models require configuration of the respective attribute annotations.

In addition, there is a dynamic consumer filter named DynamicConsumerTypeFilter, which allows custom consumers and configurations.

If developers need to customize consumer models or integrate in-memory event buses like MediatR, they just need to implement ITypeFilter.

Interceptors

Maomi.MQ enables both consumer and event bus modes by default, and developers can freely configure whether to enable them.

builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
	options.WorkId = 1;
	options.AutoQueueDeclare = true;
	options.AppName = "myapp";
	options.Rabbit = (ConnectionFactory options) =>
	{
        // ... ...
	};
},
[typeof(Program).Assembly], 
[new ConsumerTypeFilter(), new EventBusTypeFilter()]); // Inject consumer and event bus modes

Additionally, the framework offers dynamic configuration interception, allowing modification of consumer attribute configurations at program startup.

builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
	options.WorkId = 1;
	options.AutoQueueDeclare = true;
	options.AppName = "myapp";
	options.Rabbit = (ConnectionFactory options) =>
	{
        // ... ...
	};
},
[typeof(Program).Assembly],
[new ConsumerTypeFilter(ConsumerInterceptor), new EventBusTypeFilter(EventInterceptor)]);

Implementing Interceptor Functions:

private static RegisterQueue ConsumerInterceptor(IConsumerOptions consumerOptions, Type consumerType)
{
	var newConsumerOptions = new ConsumerOptions(consumerOptions.Queue);
	consumerOptions.CopyFrom(newConsumerOptions);

	// Modify configuration in newConsumerOptions

	return new RegisterQueue(true, consumerOptions);
}

private static RegisterQueue EventInterceptor(IConsumerOptions consumerOptions, Type eventType)
{
	if (eventType == typeof(TestEvent))
	{
		var newConsumerOptions = new ConsumerOptions(consumerOptions.Queue);
		consumerOptions.CopyFrom(newConsumerOptions);
		newConsumerOptions.Queue = newConsumerOptions.Queue + "_1";

		return new RegisterQueue(true, newConsumerOptions);
	}
	return new RegisterQueue(true, consumerOptions);
}

Developers can modify configuration values within the interceptors.

The interceptor returns a value, and when it returns false, the framework will ignore registering the consumer or event, meaning the queue will not start the consumer.

Consumer Configuration

In Maomi.MQ, the logical processing of consumers is handled through the properties of the IConsumerOptions interface. Whether it is a custom consumer or an event bus, all consumption patterns are registered with the framework through IConsumerOptions.

The configurations are explained as follows:

| Name | Type | Required | Default Value | Description |
| ----------------- | ---------------- | -------- | ------------- | ------------------------------------------------------- |
| Queue | string | Required | | Queue name |
| DeadExchange | string? | Optional | | Binding dead letter exchange name |
| DeadRoutingKey | string? | Optional | | Binding dead letter routing key |
| Expiration | int | Optional | | Message expiration time in milliseconds |
| Qos | ushort | Optional | 100 | Number of messages that can be pulled at a time, helps increase consumption capability |
| RetryFaildRequeue | bool | Optional | false | Whether to return to the queue upon reaching the failure threshold |
| AutoQueueDeclare | AutoQueueDeclare | Optional | None | Whether to automatically create the queue |
| BindExchange | string? | Optional | | Binding exchange name |
| ExchangeType | string? | Optional | | Type of the exchange bound |
| RoutingKey | string? | Optional | | Routing key name of the bound exchange |

As mentioned before, the framework will scan the consumer and event bus consumer attributes, and then generate IConsumerOptions binding to the consumer. You can modify configuration properties through interception functions.

new ConsumerTypeFilter((consumerOptions, type) =>
{
	var newConsumerOptions = new ConsumerOptions(consumerOptions.Queue);
	consumerOptions.CopyFrom(newConsumerOptions);

	newConsumerOptions.Queue = "app1_" + newConsumerOptions.Queue;

	return new RegisterQueue(true, consumerOptions);
});

In addition, there is an IRoutingProvider interface to dynamically map new configurations. After the application starts, Maomi.MQ will automatically create exchanges and queues and will call IRoutingProvider to map the new configurations. When publishing messages, if using model classes to publish, it will also map configurations through IRoutingProvider, allowing developers to dynamically modify configuration properties by implementing this interface.

services.AddSingleton<IRoutingProvider, MyRoutingProvider>();

Environment Isolation

Currently considering whether to support multi-tenant mode.

In development, local debugging often requires connecting the local program to the development server. When a queue receives a message, it pushes the message to one of the consumers. During local debugging, if I publish a message, my local program may not receive it, as it may be consumed by the program in the development environment.

At this point, we would like to isolate the local debugging environment from the development environment, which can be accomplished using the VirtualHost feature provided by RabbitMQ.

First, create a new VirtualHost through a PUT request to RabbitMQ; refer to the documentation: https://www.rabbitmq.com/docs/vhosts#using-http-api

image-20240612193415867

Then configure the VirtualHost name in code:

builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
	options.WorkId = 1;
	options.AutoQueueDeclare = true;
	options.AppName = "myapp";
	options.Rabbit = (ConnectionFactory options) =>
	{
        options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
        options.Port = 5672;
#if DEBUG
		options.VirtualHost = "debug";
#endif
		options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
	};
}, [typeof(Program).Assembly]);

When local debugging, publishing and receiving messages will be isolated from the server environment.

Snowflake ID Configuration

Maomi.MQ.RabbitMQ uses IdGenerator to generate snowflake IDs, ensuring each event has a unique ID within the cluster.

The framework creates snowflake IDs through the IIdFactory interface, and you can replace the IIdFactory interface to configure the generation rules for the snowflake ID.

services.AddSingleton<IIdFactory>(new DefaultIdFactory((ushort)optionsBuilder.WorkId));

Example:

public class DefaultIdFactory : IIdFactory
{
    /// <summary>
    /// Initializes a new instance of the <see cref="DefaultIdFactory"/> class.
    /// </summary>
    /// <param name="workId"></param>
    public DefaultIdFactory(ushort workId)
    {
        var options = new IdGeneratorOptions(workId) { SeqBitLength = 10 };
        YitIdHelper.SetIdGenerator(options);
    }

    /// <inheritdoc />
    public long NextId() => YitIdHelper.NextId();
}

For IdGenerator framework snowflake ID configuration, please refer to:

https://github.com/yitter/IdGenerator/tree/master/C#

Debugging

The Maomi.MQ framework has symbol packages available on nuget.org, making it very convenient for debugging the Maomi.MQ framework.

image-20240622110409621

image-20240622110718661

It is recommended to load all modules and start the program during the first use.

image-20240622112130250

Later, you can manually select which modules to load.

image-20240622110227993

Use F12 to navigate to the place to debug, and after starting the program, you can enter the breakpoint.

image-20240622112507607

If you need to debug Maomi.MQ.RabbitMQ, you can set a breakpoint in your program (not in Maomi.MQ), wait for the program to start and reach this breakpoint, then configure symbols and click to load all symbols.

After setting the breakpoint in Maomi.MQ.RabbitMQ, you can start debugging.

image-20240622112753150

Qos Concurrency and Order

Both consumer mode and event model are configured through attributes, with Qos being one of the important properties. The default value of Qos is 100, and it indicates how many unacknowledged messages can be received by consumers at a time.

Qos Scenarios

All consumers share a single IConnection object, while each consumer has its own exclusive IChannel.

For queues with high consumption frequency but cannot process concurrently, be sure to set Qos = 1. This way, RabbitMQ will push messages one by one, ensuring messages are consumed in strict order.

[Consumer("web1", Qos = 1)]
public class MyConsumer : IConsumer<TestEvent>
{
}

When there is a need to increase consumption throughput without needing ordered consumption, you can set a higher Qos, allowing the RabbitMQ Client framework to increase throughput via prefetch, enabling concurrent consumption of multiple messages.

Concurrency and Exception Handling

Mainly handled by Qos and RetryFaildRequeue, where RetryFaildRequeue defaults to true.

Under the condition of Qos = 1, combining IConsumerOptions.RetryFaildRequeue and FallbackAsync, when the message is returned to the queue, the next attempt will still continue consuming this message.

In the case where Qos > 1, due to concurrency, any failed consumption messages will be returned to the queue, but it does not guarantee immediate re-consumption of that message on the next attempt.

When Qos is set to 1, it ensures strict sequential consumption, and ExecptionRequeue and RetryFaildRequeue will influence whether the failed message will be returned to the queue. If returned, the next consumption will continue with the previously failed message. If an error (such as a bug) remains unresolved, a cycle of consuming, failing, returning to the queue, and re-consuming may occur.

How to Set Qos

Note that in RabbitMQClient version 7.0, many new features were added, one of which is the consumer concurrency thread count, ConsumerDispatchConcurrency, defaulting to 1. If this configuration is not modified, it may lead to very slow consumption rates. Each IChannel can set this property individually, or it can be set as the default global property in ConnectionFactory.

services.AddMaomiMQ(options =>
{
	options.WorkId = 1;
	options.AppName = "myapp-consumer";
	options.Rabbit = (options) =>
	{
		options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
		options.Port = 5672;
		options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
		options.ConsumerDispatchConcurrency = 100;		// Set here
	};
}, new System.Reflection.Assembly[] { typeof(Program).Assembly });

In Maomi.MQ.RabbitMQ, Qos refers to prefetch_count, with a range of 0-65535, where 0 means no limit, and generally defaults to 100. Setting Qos higher than this does not necessarily improve consumption efficiency.

Qos is not equal to the consumer concurrency thread count; it refers to the number of unprocessed messages that can be received at a time. Consumers can pull N messages at once and then consume them one by one.

According to the official document Finding bottlenecks with RabbitMQ 3.3 | RabbitMQ, prefetch counts affect the utilization rate of consumers.

| Prefetch limit | Consumer utilisation |
|----------------|---------------------|
| 1 | 14% |
| 3 | 25% |
| 10 | 46% |
| 30 | 70% |
| 1000 | 74% |

Generally, developers need to consider various factors when configuring Qos, including machine network bandwidth, message size, message publishing frequency, estimated overall resource consumption, and service instances.

When strict ordered consumption is required, it can be set to 1.

If RabbitMQ is connected in a local network, and network bandwidth limitations are not a concern, or when messages are large and high concurrency is needed, you can set Qos = 0. When Qos = 0, RabbitMQ.Client will utilize machine performance as much as possible, so use it cautiously.

Qos and Consumption Performance Testing

To illustrate the impact of different Qos configurations on consumer program performance, the following code sets different Qos values for consuming 1 million messages, pushing 1 million pieces of data to the RabbitMQ server first.

Define the event:

public class TestEvent
{
    public int Id { get; set; }
    public string Message { get; set; }
    public int[] Data { get; set; }

    public override string ToString()
    {
        return Id.ToString();
    }
}

The message publisher code in the QosPublisher project is as follows, used to push 1 million messages to the server, each message being approximately 800 bytes, less than 1 kB.

<br />
```csharp
[HttpGet("publish")]
public async Task<string> Publisher()
{
	int totalCount = 0;
	List<Task> tasks = new();
	var message = string.Join(",", Enumerable.Range(0, 100));
	var data = Enumerable.Range(0, 100).ToArray();
	for (var i = 0; i < 100; i++)
	{
		var task = Task.Factory.StartNew(async () =>
		{
			using var singlePublisher = _messagePublisher.CreateSingle();

			for (int k = 0; k < 10000; k++)
			{
				var count = Interlocked.Increment(ref totalCount);
				await singlePublisher.PublishAsync(exchange: string.Empty, routingKey: "qos", message: new TestEvent
				{
					Id = count,
					Message = message,
					Data = data
				});
			}
		});
		tasks.Add(task);
	}

	await Task.WhenAll(tasks);
	return "ok";
}

After waiting for a while, the server already has 1 million messages.

image-20240621130733745

Create a consumer project QosConsole, artificially adding 50ms delay to the consumer, and run the program.

class Program
{
    static async Task Main()
    {
        var host = new HostBuilder()
            .ConfigureLogging(options =>
            {
                options.AddConsole();
                options.AddDebug();
            })
            .ConfigureServices(services =>
            {
                services.AddMaomiMQ(options =>
                {
                    options.WorkId = 1;
                    options.AppName = "myapp-consumer";
                    options.Rabbit = (options) =>
                    {
                        options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
                        options.Port = 5672;
                        options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
                        options.ConsumerDispatchConcurrency = 1000;
                    };
                }, new System.Reflection.Assembly[] { typeof(Program).Assembly });

            }).Build();

        Console.WriteLine($"start time:{DateTime.Now}");
        await host.RunAsync();
    }
}


[Consumer("qos", Qos = 30)]
public class QosConsumer : IConsumer<TestEvent>
{
    private static int Count = 0;

    public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
    {
        Interlocked.Increment(ref Count);
        Console.WriteLine($"date time:{DateTime.Now},id:{message.Id}, count:{Count}");
        await Task.Delay(50);
    }

    public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message)
    {
        return Task.CompletedTask;
    }

    public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
    {
        return Task.FromResult(ConsumerState.Ack);
    }
}

To have an intuitive comparison, here the native consumer project RabbitMQConsole is also implemented using RabbitMQ.Client.

static async Task Main()
{
	ConnectionFactory connectionFactory = new ConnectionFactory
	{
		HostName = Environment.GetEnvironmentVariable("RABBITMQ")!,
		Port = 5672,
		ConsumerDispatchConcurrency = 1000
	};

	var connection = await connectionFactory.CreateConnectionAsync();
	var channel = await connection.CreateChannelAsync(new CreateChannelOptions(
		publisherConfirmationsEnabled: false,
		publisherConfirmationTrackingEnabled: false,
		consumerDispatchConcurrency: 1000));
	var messageSerializer = new DefaultMessageSerializer();

	var consumer = new AsyncEventingBasicConsumer(channel);
	await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1000, global: true);

	consumer.ReceivedAsync += async (sender, eventArgs) =>
	{
		var testEvent = messageSerializer.Deserialize<TestEvent>(eventArgs.Body.Span);
		Console.WriteLine($"start time:{DateTime.Now} {testEvent.Id}");
		await Task.Delay(50);
		await channel.BasicAckAsync(eventArgs.DeliveryTag, false);
	};

	await channel.BasicConsumeAsync(
		queue: "qos",
		autoAck: false,
		consumer: consumer);

	while (true)
	{
		await Task.Delay(10000);
	}
}

Maomi.MQ.RabbitMQ is built on top of RabbitMQ.Client, and since Maomi.MQ.RabbitMQ consumes messages, it needs to log, increase observability information, build a new dependency injection container, etc., so its timing and resource consumption will definitely be more than RabbitMQ.Client, needing a comparison between the two.

Start the programs in Release mode in VS, separately launching QosConsole and RabbitMQConsole for testing and measuring consumption speed under different Qos conditions.

Stability Testing

You can refer to Observability to set up a monitoring environment. As shown in the code of OpenTelemetryConsole, one program has three consumers, and in this program messages are published and consumed.

About 560 messages are published or consumed per second, with roughly 9 million messages published and consumed within three hours.

image-20240629101521224

image-20240629101645663

Memory is stable, the machine's CPU performance is not high, and periodic GC and other situations also consume CPU, with fluctuations as shown below:

image-20240629101738893

Retry

Retry Timing

When the consumer's ExecuteAsync method throws an exception, the framework will retry by default up to three times, setting the retry time interval as an exponent of 2.

After the first failure, it will retry immediately, then attempt to retry after 2 seconds, after the second failure, it retries after 4 seconds, followed by 8 and 16 seconds.

Maomi.MQ.RabbitMQ utilizes the Polly framework to manage the retry strategy, generating retry interval strategies by default through the DefaultRetryPolicyFactory service.

An example of the DefaultRetryPolicyFactory code is as follows:

/// <summary>
/// Default retry policy.<br />
/// 默认的策略提供器.
/// </summary>
public class DefaultRetryPolicyFactory : IRetryPolicyFactory
{
    protected readonly int RetryCount = 3;
    protected readonly int RetryBaseDelaySeconds = 2;

    protected readonly ILogger<DefaultRetryPolicyFactory> _logger;

    /// <summary>
    /// Initializes a new instance of the <see cref="DefaultRetryPolicyFactory"/> class.
    /// </summary>
    /// <param name="logger"></param>
    public DefaultRetryPolicyFactory(ILogger<DefaultRetryPolicyFactory> logger)
    {
        _logger = logger;

        RetryCount = 3;
        RetryBaseDelaySeconds = 2;
    }

    /// <inheritdoc/>
    public virtual Task<AsyncRetryPolicy> CreatePolicy(string queue, string id)
    {
        // Create a retry policy.
        // 创建重试策略.
        var retryPolicy = Policy
            .Handle<Exception>()
            .WaitAndRetryAsync(
                retryCount: RetryCount,
                sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(RetryBaseDelaySeconds, retryAttempt)),
                onRetry: async (exception, timeSpan, retryCount, context) =>
                {
                    _logger.LogDebug("Retry execution event,queue [{Queue}],retry count [{RetryCount}],timespan [{TimeSpan}]", queue, retryCount, timeSpan);
                    await FaildAsync(queue, exception, timeSpan, retryCount, context);
                });

        return Task.FromResult(retryPolicy);
    }

    
    public virtual Task FaildAsync(string queue, Exception ex, TimeSpan timeSpan, int retryCount, Context context)
    {
        return Task.CompletedTask;
    }
}

You can replace the default retry policy service by implementing the IRetryPolicyFactory interface.

services.AddSingleton<IRetryPolicyFactory, DefaultRetryPolicyFactory>();

Persisting Remaining Retry Count

When the consumer fails to process a message, the default behavior is to retry 3 times; if it has already attempted 2 times and the program restarts, the next time the message is consumed, it will retry one last time.

It is necessary to remember the retry count so that when the program restarts, it can retry according to the remaining count.

Introduce the Maomi.MQ.RedisRetry package.

Example configuration:

builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
	options.WorkId = 1;
	options.AutoQueueDeclare = true;
	options.AppName = "myapp";
	options.Rabbit = (ConnectionFactory options) =>
	{
        // ... ... 
	};
}, [typeof(Program).Assembly]);

builder.Services.AddMaomiMQRedisRetry((s) =>
{
	ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.3.248");
	IDatabase db = redis.GetDatabase();
	return db;
});

By default, keys will only be retained for 5 minutes. This means that if the program starts consuming a message after five minutes, the remaining retry count will reset.

Dead Letter Queue

A consumer or event can be bound to a dead letter queue so that when messages fail and are not put back into the queue, they will be pushed to the dead letter queue. Example:

[Consumer("ConsumerWeb_dead", Qos = 1, DeadQueue = "ConsumerWeb_dead_queue", RetryFaildRequeue = false)]
public class DeadConsumer : IConsumer<DeadEvent>
{
	// Consumption
	public Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
	{
		Console.WriteLine($"event id:{message.Id}");
		throw new OperationCanceledException();
	}
}

// The messages that fail from ConsumerWeb_dead will be consumed by this consumer.
[Consumer("ConsumerWeb_dead_queue", Qos = 1)]
public class DeadQueueConsumer : IConsumer<DeadQueueEvent>
{
    // Consumption
    public Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
    {
        Console.WriteLine($"Dead letter queue, event id:{message.Id}");
        return Task.CompletedTask;
    }
}

image-20240601012127169

When using a dead letter queue, be sure to set RetryFaildRequeue to false, so the consumer will send a nack signal to RabbitMQ after multiple failed retries, ensuring that RabbitMQ forwards the message to the bound dead letter queue.

Delay Queue

Create a consumer that inherits from EmptyConsumer. This queue will be created when the program starts, but it will not create an IConnection for consumption. Set the queue message expiration time and bind it to a dead letter queue, which can be implemented with either the consumer pattern or the event pattern.

[Consumer("ConsumerWeb_dead_2", Expiration = 6000, DeadQueue = "ConsumerWeb_dead_queue_2")]
public class EmptyDeadConsumer : EmptyConsumer<DeadEvent>
{
}

// The messages that fail from ConsumerWeb_dead will be consumed by this consumer.
[Consumer("ConsumerWeb_dead_queue_2", Qos = 1)]
public class Dead_2_QueueConsumer : IConsumer<DeadQueueEvent>
{
    // Consumption
    public Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
    {
        Console.WriteLine($"event id:{message.Id} has expired");
        return Task.CompletedTask;
    }
}

For example, if a user places an order but does not pay within 15 minutes, the message will automatically cancel the order upon expiration.

Observability

Please refer to ActivitySourceApi and OpenTelemetryConsole examples.

<br />
<br />

### Deployment Environment

To quickly deploy an observability platform, you can use the sample package provided by OpenTelemetry to rapidly deploy relevant services, which include middleware such as Prometheus, Grafana, and Jaeger.

The official integration project for OpenTelemetry can be found at: https://github.com/open-telemetry/opentelemetry-demo

<br />

Download the source code of the sample repository:

```csharp
git clone -b 1.12.0 https://github.com/open-telemetry/opentelemetry-demo.git

Please note, do not download the main branch as it may contain bugs.

You can set the version number to the latest version.

Since the docker-compose.yml sample includes a large number of demo microservices, and we only need the infrastructure, we will need to open the docker-compose.yml file and retain only valkey-cart under the Core Demo Services and Dependent Services, deleting the others. Alternatively, you can directly download the modified version provided by the author to replace it in the project: docker-compose.yml

Note that different versions may vary.

image-20250208154943481

Execute the command to deploy the observability service:

docker-compose up -d

image-20240612201100976

The opentelemetry-collector-contrib is used to collect observability information for tracing, supporting both gRPC and HTTP protocols. The listening ports are as follows:

| Port | Protocol | Endpoint | Function |
| :--- | :------- | :----------- | :----------------------------------------------------------- |
| 4317 | gRPC | n/a | Accepts traces in OpenTelemetry OTLP format  (Protobuf). |
| 4318 | HTTP | /v1/traces | Accepts traces in OpenTelemetry OTLP format  (Protobuf and JSON). |

After container port mapping, the external ports may not be 4317 and 4318.

1718196602032.png

Import the Maomi.MQ.Instrumentation package and other related OpenTelemetry packages.

<PackageReference Include="Maomi.MQ.Instrumentation " Version="1.1.0" />
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.8.1" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.8.1" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.8.1" />
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.8.1" />

Import the namespaces:

using OpenTelemetry.Logs;
using OpenTelemetry.Metrics;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using Maomi.MQ;
using OpenTelemetry.Exporter;
using RabbitMQ.Client;
using System.Reflection;
using OpenTelemetry;

Inject tracing and monitoring to automatically report to OpenTelemetry.

builder.Services.AddOpenTelemetry()
	  .ConfigureResource(resource => resource.AddService(serviceName))
	  .WithTracing(tracing =>
	  {
		  tracing.AddMaomiMQInstrumentation(options =>
		  {
			  options.Sources.AddRange(MaomiMQDiagnostic.Sources);
			  options.RecordException = true;
		  })
		  .AddAspNetCoreInstrumentation()
		  .AddOtlpExporter(options =>
		  {
			  options.Endpoint = new Uri(Environment.GetEnvironmentVariable("OTLPEndpoint")! + "/v1/traces");
			  options.Protocol = OtlpExportProtocol.HttpProtobuf;
		  });
	  })
	  .WithMetrics(metrices =>
	  {
		  metrices.AddAspNetCoreInstrumentation()
		  .AddMaomiMQInstrumentation()
		  .AddOtlpExporter(options =>
		  {
			  options.Endpoint = new Uri(Environment.GetEnvironmentVariable("OTLPEndpoint")! + "/v1/metrics");
			  options.Protocol = OtlpExportProtocol.HttpProtobuf;
		  });
	  });

Tracing

After starting the ActivitySourceApi service, the trace information will be automatically pushed to the OpenTelemetry Collector during publishing and consuming actions, and can be retrieved through components like Jaeger and Skywalking.

Open the Jaeger UI panel that maps to port 16686:

image-20240612205140595

Since the publish and consumer belong to sibling traces and not the same trace, you need to query the related traces through Tags, in the format event.id=xxx.

1718196773292

3662d0c35aaac72c77046a430988e87

Monitoring

Maomi.MQ has the following built-in metrics:

| Name | Description |
| --------------------------------------------- | --------------------- |
| maomimq_consumer_message_pull_count_total | Total number of pulled messages |
| maomimq_consumer_message_faild_count_total | Total number of failed messages consumed |
| maomimq_consumer_message_received_Byte_bucket | |
| maomimq_consumer_message_received_Byte_count | |
| maomimq_consumer_message_received_Byte_sum | Total bytes of received messages |
| maomimq_publisher_message_count_total | Total number of sent messages |
| maomimq_publisher_message_faild_count_total | Total number of failed sent messages |
| maomimq_publisher_message_sent_Byte_bucket | |
| maomimq_publisher_message_sent_Byte_count | |
| maomimq_publisher_message_sent_Byte_sum | Total bytes of sent messages |

Next, we need to display the data in Grafana.

Download the template file: maomi.json

Then import the file in the Dashboards section of the Grafana panel to view the monitoring of the message queue for all current services.

image-20240629011543582

image-20250220212204225

Open Source Project Code Reference

The Includes code from the OpenTelemetry.Instrumentation.MaomiMQ project is sourced from https://github.com/open-telemetry/opentelemetry-dotnet-contrib/tree/main/src/Shared

痴者工良

高级程序员劝退师

文章评论