首頁

目前文章總數:157 篇

  

最後更新:2024年 12月 07日

0073. RabbitMQ 生產者-消費者模式下 & 如何使用 RabbitMQ 內建負載平衡進行推送

日期:2024年 09月 28日

標籤: RabbitMQ Asp.net Core Web MVC Web Load Balancer

摘要:C# 學習筆記


應用所需:1. 已架設好 RabbitMQ 主機
     2. Visual Studio 2022 Asp.net Core
範例檔案:本篇範例代碼
相關參考:0029. Windows作業系統架設 RabbitMQ 的流程
     0030. RabbitMQ生產者與消費者範例說明
     0068. SignalR 橫向擴展部署 Server - RabbitMQ Backplane 解決方案
基本介紹:本篇分為四大部分。
第一部分:RabbitMQ 負載平衡原理
第二部分:Web專案架構
第三部分:代碼解析
第四部分:DEMO 驗證成果






第一部分:RabbitMQ 負載平衡原理

Step 1:解決問題

在實務工作中,可能會有像下單系統,我們準備多個伺服器,讓用戶可以下訂單,但是只需要一伺服器做處理。
如果這時有多個伺服器同時執行這個訂單就會發生重複生成訂單的問題,這時就可以借助 RabbitMQ 當負載平衡
每個[伺服器]只要負責建立資料,再提交給 RabbitMQ 再讓 RabbitMQ 發送給比較悠閒的[伺服器]進行處理。


Step 2:確保消息的穩定性與可靠性

假設我們有 2 個伺服器(代碼完全相同,並且都可以負責建訂單,處理訂單)
建立訂單的速度在實務上很快的

1. 每個伺服器只要建立訂單(生產者)後,立刻往 RabbitMQ 推送
2. MQ 機器收到後,會負責分派工作給某個伺服器處理,並且成功完成的伺服器必須做 ACK 響應後,此工作才會從 MQ 完成


※ Rabbit MQ 的負載平衡機制下,即使 MQ 當機、伺服器掛掉,只要重啟後,仍會持續將未完成的定單派送到訂閱的伺服器進行消費。


Step 3:優點、缺點

使用此機制的優點如下:

精確的路由控制 Rabbit MQ 的 Direct 交換器會根據消息的 routing key 來進行精確路由,也就是可以精確的指定使用隊列,不衝突
靈活的負載分配 Rabbit MQ 有內建的功能,可以分配交換器任務的優先級,並且可以下定自己的策略
簡單且易於配置 Direct 交換器的設置十分容易,只需配置 routing key 即可
不遺失消息與具可靠性 Direct 交換器會確保消息只被發送到匹配的隊列,這能避免消息被錯誤地路由到不相關的消費者,並且未響應完成的任務,會常駐於 Queue 中


相對的有以下缺點:

額外的成本 Rabbit MQ Server 是額外的服務,需要占用記憶體、硬碟空間
單點故障風險 如果某一特定 routing key 對應的隊列或消費者出現故障,該 routing key 的消息將無法被處理,這可能會導致該類型的消息積壓,嚴重
  會影響整個主機無法啟動
仍有消息分佈不均的風險 伺服器如果規格不統一,或者機器異常,仍會有消息分配不均的問題,MQ 的工作本質不是維護其他伺服器是否正常
  無法工作的伺服器,仍會被不均的跳過。
學習成本 使用 Rabbit MQ 仍需要理解此套件的用法





第二部分:Web專案架構

Step 1:範例專案架構

打開範例專案後,架構基本分成以下:

1. RabbitMQ 實現生產者、消費者與共用的工廠方法,提供任何專案進行訂閱
2. 假資料庫 為了方便說明,利用 Singleton 保存記憶體的資料,偽裝成資料庫,並提供訂單格式
3. RabbitMQ 訂閱 每個專案如果有實現生產者,需要對 RabbitMQ 做訂閱,並且實現自己的消費者行為
4. Service 這邊實現查詢、下訂單、處理訂單,模擬帳戶訂單流程
5. Web控制器 提供 Html 畫面取資料、進行下訂單的 API 接口
6. Html 畫面 與用戶互動,提供可視化的介面,可以知道處理訂單的消費者是哪一台機器
7. 初始化配置 RabbitMQ、Service 等所需的依賴注入





第三部分:代碼解析

Step 1:[RabbitMQ]

RabbitMQ 資料夾下有以下六個大項、3, 5, 6, 會在進一步說明

1. Common 共用方法,包含生產者、接收者取得工廠、建立連線、取得通道
2. Consts 定義交換器名稱、Routing key
3. Factory 建立生產者的工廠,並提供生產者發送 MQ
4. Model RabbitMQ 基本配置、交換器基本配置、連線至 MQ 基本配置
5. RabbitMqMessagePublisher.cs 實作生產者的邏輯、保持持久化、傳至MQ、檢查連線等
6. RabbitMqMessageReceiver.cs 實作消費者的邏輯、連線開始、結束事件、提供消費者接收器、標記處理完成等




Step 2:[RabbitMQ] - Factory

整個 Factory 再注入時是單例模式(SingleTon),在關閉整個 Server 前,都會永久記錄資源。
總共分成以下 4 個區塊:

1. 建構式 建立基本連線,將 Server 啟動時的 MQ 連線紀錄
2. 取得生產者 每個叫用的 Server 可以取得自己訂閱的交換器的生產者紀錄
3. 取得鎖物件 建立生產者的工廠,並提供生產者發送 MQ
4. 釋放資源 Server 完全結束時,安全的釋放資源,中斷與 RabbitMQ 的連線


2. 取得生產者的部分,要進行鎖,避免多個地方併發同時發送到 MQ (如果代碼處理得好或邏輯簡單此段少鎖也不影響)
通常情況下為了併發造成異常而增加此鎖

/// <summary>
/// MQ 連線工廠
/// </summary>
public class RabbitMqFactory : IRabbitMqFactory, IDisposable
{
    private readonly IConfiguration _configuration;
    private readonly string _rabbitMqHostName;
    private readonly string _rabbitMqUserName;
    private readonly string _rabbitMqPassword;

    /// <summary>
    /// 1. 建構式
    /// </summary>        
    public RabbitMqFactory(IConfiguration configuration)
    {
        _configuration = configuration;
        var rabbitParam = _configuration.GetSection("RabbitMQ").Get<RabbitMQConnectionModel>();
        _rabbitMqHostName = rabbitParam?.HostName ?? string.Empty;
        _rabbitMqUserName = rabbitParam?.UserName ?? string.Empty;
        _rabbitMqPassword = rabbitParam?.Password ?? string.Empty;
    }

    private static readonly object _lockObject = new();
    private static readonly ConcurrentDictionary<string, object> _lockObjectDict = new();
    private static readonly ConcurrentDictionary<string, RabbitMqMessagePublisher> _publisherDict = new();

    /// <summary>
    /// 2. 取得生產者
    /// </summary>                
    public RabbitMqMessagePublisher Get(string mqExchangeName, string exchangeType = "Direct")
        {
            var key = $"{_rabbitMqHostName}_{mqExchangeName}";
            var publisher = GetPublisher(key, mqExchangeName, exchangeType);
            return publisher;

            // 取得生產者配置
            RabbitMqMessagePublisher GetPublisher(string key, string mqExchangeName, string exchangeType)
            {
                if (!_publisherDict.TryGetValue(key, out var publisher) || !publisher.IsOpen)
                {
                    var lockObject = GetLockObj(key);
                    lock (lockObject)
                    {
                        if (!_publisherDict.TryGetValue(key, out publisher) || !publisher.IsOpen)
                        {
                            publisher?.Dispose();

                            publisher = new RabbitMqMessagePublisher(new ExchangeModel
                            {
                                HostName = _rabbitMqHostName,
                                UserName = _rabbitMqUserName,
                                Password = _rabbitMqPassword,
                                ExchangeName = mqExchangeName,
                                ExchangeType = exchangeType
                            });
                            _publisherDict[key] = publisher;
                        }
                    }
                }
                return publisher;
            }
        }

    /// <summary>
    /// 3. 取得鎖物件
    /// </summary>
    private object GetLockObj(string key)
        {
            if (!_lockObjectDict.TryGetValue(key, out var obj) || obj == null)
            {
                lock (_lockObject)
                {
                    if (!_lockObjectDict.TryGetValue(key, out obj) || obj == null)
                    {
                        obj = new object();
                        _lockObjectDict[key] = obj;
                    }
                }
            }
            return obj;
        }

    #region
    /// <summary>
    /// 4. 釋放資源
    /// </summary>
    public void Dispose()
        {
            foreach (var publisher in _publisherDict.Values)
            {
                publisher.Dispose();
            }
            _publisherDict.Clear();
        }
    #endregion
}




Step 3:[RabbitMQ] - RabbitMqMessagePublisher.cs

生產者建立方法,並且成功建立的結果會記錄於 Factory 中,紀錄於記憶體中
總共分成以下 4 個區塊:

1. 生產者建構式 建立基本連線,並呼交工廠方法建立通道與 MQ 連線
2. 檢查連線狀態 檢查當前的生產者是否保持啟用,在伺服器中的各個方法執行生產者行為前都會進行檢查
3. 發送訊息 建立持久化配置,並且呼叫指定交換器的通道,將訊息推送到 MQ
4. 釋放資源 Server 完全結束時,安全的釋放資源,中斷與 RabbitMQ 的連線
public class RabbitMqMessagePublisher : RabbitMQBaseParameterModel, IDisposable
{
    /// <summary>
    /// 1. 生產者建構式
    /// </summary>        
    public RabbitMqMessagePublisher(ExchangeModel rabbitParameters)
        {
            SettingValue();
            
            // 設定生產者 - 基本設定
            void SettingValue()
            {
                _connection = RabbitMQHelper.GetConnection(rabbitParameters);
                _channel = RabbitMQHelper.GetModel(_connection);
                _exchangeType = rabbitParameters.ExchangeType;
                _channel.ExchangeDeclare(rabbitParameters.ExchangeName, rabbitParameters.ExchangeType.ToString().ToLower(), true, false, null);
            }
        }

    /// <summary>
    /// 2. 檢查 RabbitMQ 連線,確保當前連線是否開啟
    /// </summary>
    public bool IsOpen => _connection?.IsOpen == true;

    /// <summary>
    /// 3. 發送訊息
    /// </summary>        
    public void PblisherSend(string message, string exchangeName, string routingKey)
        {
            var properties = _channel.CreateBasicProperties();
            properties.Timestamp = new AmqpTimestamp(DateTime.Now.Ticks);
            // 設定消息的持久性,確保 RabbitMQ 伺服器重啟後仍然存在
            properties.Persistent = true; 
            _channel.BasicPublish(exchangeName, routingKey, false, properties, Encoding.UTF8.GetBytes(message));
        }

    #region 4. 解構式 - 釋放資源
        
        ~RabbitMqMessagePublisher()
        {
            Dispose();
        }

        public void Dispose()
        {
            if (_disposed)
            {
                return;
            }
            _channel?.Dispose();
            _connection?.Dispose();
            _disposed = true;
            GC.SuppressFinalize(this);
        }

        #endregion
}



Step 4:[RabbitMQ] - RabbitMqMessageReceiver.cs

消費者接收器建立方法,並且成功建立的結果一樣會記錄於 Factory 中,並且實現接收 MQ 隊列消息,與反饋標記已完成
總共分成以下 7 個區塊:

1. 建構式 - 消費者接收器 建立基本連線,將 Server 啟動時的 MQ 連線紀錄
2. 設定消費者接收器 - 連線 消費者的連線會註冊以下事件
3. 連線結束事件 當觸發連線結束時,會先進行資源釋放與然後再嘗試重新連線,如果重試失敗數次後,確定中斷即釋放接收器
4. 消費者接收器 建立消費者事件,任何傳進來的字串,會序列化成 Json 再傳進 Server 建立的消費者事件中
5. 增加消費者訂閱的隊列 設定和管理 RabbitMQ 消費者的隊列訂閱,設置、宣告、綁定、消費
6. 標記訊息已處理 提供一個方法讓 Server 完成註冊的消費者事件後,進行標記,宣告此任務結束,可從 MQ 移除
7. 釋放資源 Server 完全結束時,安全的釋放資源,中斷與 RabbitMQ 的連線
public class RabbitMqMessageReceiver<TMessage> : RabbitMQBaseParameterModel, IDisposable
{
    protected ConnectionFactory _factory;
    protected Action<TMessage, RabbitMqMessageReceiver<TMessage>, ulong, long> _receivedAction;

    /// <summary>
    /// 1. 建構式 - 消費者接收器
    /// </summary>        
    public RabbitMqMessageReceiver(
        ExchangeModel rabbitParameters,
        int concurrentCount,
        Action<TMessage, RabbitMqMessageReceiver<TMessage>, ulong, long> action,
        bool autoAck = false)
    {
        SettingValue();
        Connect();

        // 設定消費者接收器 - 基本設定
        void SettingValue()
        {
            _concurrentCount = concurrentCount;
            _receivedAction = action;
            _autoAck = autoAck;
            _factory = RabbitMQHelper.GetConstructFactory(rabbitParameters);
            _exchangeType = rabbitParameters.ExchangeType;
            _exchangeName = rabbitParameters.ExchangeName;
        }
    }

    #region 連線事件

    /// <summary>
    /// 2. 設定消費者接收器 - 連線
    /// </summary>
    public void Connect()
    {
        // 建立連線
        _connection = _factory.CreateConnection();
        _connection.ConnectionShutdown += ConnectionForConnectionShutdown;
        _channel = _connection.CreateModel();

        // 宣告交換機 - 確保存在
        _channel.ExchangeDeclare(_exchangeName, _exchangeType.ToString().ToLower(), true, false, null);

        // 消費者接收事件
        _consumer = new EventingBasicConsumer(_channel);
        _consumer.Received += OnConsumerForReceived;
    }

    /// <summary>
    /// 3. 連線結束事件
    /// </summary>        
    private void ConnectionForConnectionShutdown(object? sender, ShutdownEventArgs e)
    {
        ReleaseResource();
        RetryProcess();

        // 釋出資源
        void ReleaseResource()
        {
            try
            {
                _consumer = null!;
                _channel?.Dispose();
                _connection?.Dispose();
            }
            catch (Exception ex)
            {
                Console.Out.WriteLineAsync(ex.Message);
                throw;
            }
        }

        // 重連機制
        void RetryProcess()
        {
            var retryMaxTimes = 10;
            var retryTimes = 0;
            while (retryTimes <= retryMaxTimes)
            {
                try
                {
                    retryTimes++;
                    Connect();
                    AddQueue(_queueName, _routingKey);
                    break;
                }
                catch (Exception ex)
                {
                    Console.Out.WriteLineAsync(ex.Message);
                }

                // 指數延遲 : 2 的 tryTimes 次方 最大 10 秒
                var delaySeconds = Math.Min(10000, 1000 * (int)Math.Pow(2, retryTimes));
                Task.Delay(delaySeconds);
            }

            // 避免無限循環
            if (retryTimes > retryMaxTimes)
            {
                Console.Out.WriteLineAsync($@"Exceeded maximum {retryMaxTimes} retry attempts.");
            }
        }
    }

    /// <summary>
    /// 4. 消費者接收器
    /// </summary>
    public void OnConsumerForReceived(object sender, BasicDeliverEventArgs e)
    {
        try
        {
            var bodyStr = Encoding.UTF8.GetString(e.Body.ToArray());
            var message = JsonSerializer.Deserialize<TMessage>(bodyStr);
            _receivedAction.Invoke(message, this, e.DeliveryTag, e.BasicProperties.Timestamp.UnixTime);
        }
        catch (Exception ex)
        {
            Console.Out.WriteLineAsync(ex.Message);
            throw;
        }
    }

    /// <summary>
    /// 5. 增加消費者訂閱的隊列
    /// </summary>
    public void AddQueue(string queueName, string routingKey)
    {
        _queueName = queueName;
        _routingKey = routingKey;

        _channel.QueueDeclare(queueName, true, false, false, null);
        _channel.QueueBind(queueName, _exchangeName, routingKey, null);

        _channel.BasicQos(0, (ushort)_concurrentCount, false);
        _channel.BasicConsume(queueName, _autoAck, _consumer);
    }
    #endregion

    #region RabbitMQ 反饋 Ack 標記

    /// <summary>
    /// 6. 標記訊息已處理
    /// </summary>        
    public async Task BasicAck(ulong deliveryTag)
    {
        try
        {
            if (!_autoAck)
            {
                _channel.BasicAck(deliveryTag, false);
            }
        }
        catch (Exception ex)
        {
            await Console.Out.WriteLineAsync($@"RabbitMqMessageReceiver BasicAck error:{ex}");
            throw;
        }
    }

    #endregion

    #region 解構式 - 7. 釋放資源

    ~RabbitMqMessageReceiver()
    {
        Dispose(false);
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    private void Dispose(bool disposing)
    {
        if (_disposed)
        {
            return;
        }
        if (disposing)
        {
            {
                _channel?.Dispose();
                _connection?.Dispose();
            }
            _disposed = true;
        }

        #endregion
    }
}



Step 5:[假資料庫]

為了便於 Demo 說明,簡單用 Singleton 紀錄訂單資訊,保存於記憶體中,充當假的資料庫說明
總共分成以下 3 個區塊:

1. 紀錄模擬帳戶系統查詢訂單 - 當前資料 紀錄當前的資料於記憶體中
2. 模擬帳戶系統查詢訂單 - 當前資料 查詢當前記憶體中有的訂單資料
3. 模擬帳戶建單 - 消費者 - 更新/插入資料 若觸發生產者時會建立訂單(插入),當觸發接收器消費者會進行更新訂單
public class FakeDataBase
{
    /// <summary>
    /// 1. 紀錄模擬帳戶系統查詢訂單 - 當前資料
    /// </summary>
    protected readonly ConcurrentDictionary<int, AccountTradeOrderModel> _clientQuickPageDict = new();

    public IReadOnlyDictionary<int, AccountTradeOrderModel> ClientQuickPageDict => _clientQuickPageDict;

    /// <summary>
    /// 2. 模擬帳戶系統查詢訂單 - 當前資料
    /// </summary>
    public IReadOnlyDictionary<int, AccountTradeOrderModel> GetAccountTradeOrderAll()
    {
        var getResult = _clientQuickPageDict;
        var sortResult = _clientQuickPageDict.OrderBy(item => item.Value.DateTimeValue).ToDictionary();
        return sortResult;
    }

    /// <summary>
    /// 3. 模擬帳戶建單 - 消費者 - 更新/插入資料
    /// </summary>
    public void AddOrUpdate(AccountTradeOrderModel insertItem)
    {
        _clientQuickPageDict.AddOrUpdate(insertItem.AccountTradeOrderId, insertItem, (k, oldValue) => insertItem);
    }
}




Step 6:[RabbitMQ 訂閱]

Server 端需要配置自己對 MQ 的訂閱,並且實現自己的接收器事件
當從 Server 進行生產者發送後,才可以進行到正確的接收器事件
總共分成以下 3 個區塊:

1-1. 建構式 Server 端建立 MQ 連線,並且依照 Server 的 CPU 核心數動態切割出可執行併發執行的最大量
1-2. 建立接收器 註冊自己的交換器,來實現接受器,進行消費者行為
1-3. 帳戶訂單接收事件 實務上進行接收器後要執行的工作,這邊簡單模擬進行訂單完成
2. 注入 Asp.net core 初始化時,需要進行 Singleton 進行資源管理
3. 初始化配置 - 缺少就不能保持 MQ 背景運行 確保 RabbitMQSubscriber 已完成初始化,進行消費者接收器連結
public class RabbitMQSubscriber
{
    private readonly RabbitMQConnectionModel _selfParameters = new RabbitMQConnectionModel();
    private readonly IConfiguration _configuration;
    private readonly IServiceProvider _serviceProvider;
    private SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(5);
    private RabbitMqMessageReceiver<AccountTradeOrderModel> _orderBatchSequenceReceiver = null!;

    /// <summary>
    /// 1-1. 建構式
    /// </summary>        
    public RabbitMQSubscriber(
        IConfiguration configuration,
        IServiceProvider serviceProvider
        )
        {
            _configuration = configuration;
            var rabbitParam = _configuration.GetSection("RabbitMQ").Get<RabbitMQConnectionModel>();
            _serviceProvider = serviceProvider;
            _selfParameters.HostName = rabbitParam?.HostName ?? string.Empty;
            _selfParameters.UserName = rabbitParam?.UserName ?? string.Empty;
            _selfParameters.Password = rabbitParam?.Password ?? string.Empty;

            var serverName = _configuration.GetSection("ServreName").Get<string>();
            _selfParameters.ServerName = serverName ?? string.Empty;
            DynamicLimitSet();

            // 設置 SemaphoreSlim 限制
            void DynamicLimitSet()
            {
                // 依照本機的 CPU 核心數量動態設定最大並行值 ※設太大會導致 CPU 過載,因此應該動態設定適當的值
                int cpuCoreCount = Environment.ProcessorCount;
                _semaphoreSlim = new SemaphoreSlim(Math.Max(1, cpuCoreCount - 1));
            }
        }

    /// <summary>
    /// 1-2. 建立接收器
    /// </summary>
    public void BuildReceive()
        {
            // 建立批次訂單接收器
            var initParameters = new ExchangeModel
            {
                HostName = _selfParameters.HostName,
                UserName = _selfParameters.UserName,
                Password = _selfParameters.Password,
                ExchangeType = ExchangeType.Direct,
                ExchangeName = RabbitMQConsts.MY_EXCHANGE_NAME
            };
            _orderBatchSequenceReceiver = new(initParameters, 5, OnAccountTradeOrderReceived);

            // Direct 模式要帶 Rounting Key
            _orderBatchSequenceReceiver.AddQueue(RabbitMQConsts.MY_EXCHANGE_NAME, RabbitMQConsts.MY_ROUTING_KEY);
        }

    /// <summary>
    /// 1-3. 帳戶訂單接收事件
    /// </summary>    
    private void OnAccountTradeOrderReceived(AccountTradeOrderModel dto, RabbitMqMessageReceiver<AccountTradeOrderModel> receiver, ulong deliverTag, long timestamp)
        {
            _semaphoreSlim.Wait();
            Task.Run(async () =>
            {
                try
                {
                    using (var scope = _serviceProvider.CreateScope())
                    {
                        var orderBatchService = scope.ServiceProvider.GetRequiredService<IAccountTradeOrder>();

                        // 模擬調用消費者事件 - 完成此單
                        dto.MechineName = _selfParameters.ServerName;
                        await orderBatchService.FinishAccountTradeOrder(dto);

                        // 完成後響應MQ - 回覆MQ完成
                        await receiver.BasicAck(deliverTag);
                    }
                }
                catch (Exception ex)
                {
                    Console.Out.WriteLine(ex);
                }
                finally
                {
                    _semaphoreSlim.Release();
                }
            });
        }

}

/// <summary>
/// 2. 注入
/// </summary>
public static class MqSubscriberServiceCollectionExtensions
    {
        public static IServiceCollection AddRabbitMqSubscriber(this IServiceCollection serviceCollection)
        {
            return serviceCollection.AddSingleton(typeof(RabbitMQSubscriber));
        }
    }

/// <summary>
/// 3. 初始化配置 - 缺少就不能保持 MQ 背景運行
/// </summary>
public static class MqSubscriberHostExtensions
{
    public static IHost InitMqSubscriber(this IHost host)
    {
        var mqs = host.Services.GetService<RabbitMQSubscriber>();
        mqs!.BuildReceive();
        return host;
    }
}



Step 7:[Service]

實現訂單的建立、更新、查詢的邏輯與消費者事件(更新訂單,並回報 MQ 已完成)
總共分成以下 3 個區塊:

1. 查詢訂單 從假資料庫,撈取所有的資料,回傳控制器顯示
2-1. 建立接收器 為了展示,使用 Bogus 建立假資料,建立訂單時保存於假資料庫、並且將訂單送到 MQ 隊列
2-2. 提交到 RabbitMQ 隊列 訂單發送於 MQ 隊列中,請當前可工作的 Server 消費
3. 更新訂單 消費者事件才會觸發,模擬訂單一系列工作完成,回應 MQ 已完成,可以解除此筆隊列資料
public class AccountTradeOrder : IAccountTradeOrder
{       
    private readonly IRabbitMqFactory _mqFactory;
    private readonly FakeDataBase _fakeDb;        

    public AccountTradeOrder(
        IRabbitMqFactory mqFactory,
        FakeDataBase fakeDb)
    {
        _mqFactory = mqFactory;
        _fakeDb = fakeDb;
    }

    /// <summary>
    /// 1. 查詢訂單
    /// </summary>
    /// <returns></returns>
    public IEnumerable<AccountTradeOrderModel> GetAccountTraderOrder()
        {
            var result = _fakeDb.GetAccountTradeOrderAll().Values;
            return result;
        }

    /// <summary>
    /// 2-1. 建立訂單
    /// </summary>
    public Task BuildAccountTradeOrder()
        {
            // Bogus 套件,目的是產生假資料
            var faker = new Faker("zh_CN");
            var now = DateTime.Now;
            var newItem = new AccountTradeOrderModel()
            {
                AccountName = faker.Name.FullName(),
                AccountTradeOrderId = Guid.NewGuid().GetHashCode(),
                IsSuccessful = false,
                DateTimeValue = now,
                Remark = $@"建立時間 : {now.ToString("yyyy-MM-dd HH:mm:ss")}"
            };
            // 新增至資料庫
            _fakeDb.AddOrUpdate(newItem);

            // 發送至 RabbitMQ
            SendToRabbitMQ(newItem);

            return Task.CompletedTask;
        }

    /// <summary>
    /// 3. 更新訂單
    /// </summary>        
    public Task FinishAccountTradeOrder(AccountTradeOrderModel tradeOrder)
        {
            // 模擬處理為成功
            tradeOrder.IsSuccessful = true;
            tradeOrder.MechineName = tradeOrder.MechineName;//從消費者處理時才寫入
            // 更新至資料庫
            _fakeDb.AddOrUpdate(tradeOrder);

            return Task.CompletedTask;
        }

    /// <summary>
    /// 2-2. 提交到 RabbitMQ 隊列
    /// </summary>
    private void SendToRabbitMQ(AccountTradeOrderModel tradeOrder)
        {
            var json = JsonSerializer.Serialize(tradeOrder);
            var publisher = _mqFactory.Get(RabbitMQConsts.MY_EXCHANGE_NAME);
            publisher.PblisherSend(json, RabbitMQConsts.MY_EXCHANGE_NAME, RabbitMQConsts.MY_ROUTING_KEY);
        }
}




Step 8:[Web控制器]

HomeController 實現與 Views 前端頁面的交互,並作為 Service 的中間處理者
總共分成以下 3 個區塊:

1. 查詢頁面 Query頁面的查詢
2. 建立假訂單 API 當用戶從網頁上點擊建立訂單時觸發
3. 查詢頁面(更新Table) API 前端 Javascript 觸發輪詢更新,當 MQ 消費完成後會將訂單標記完成,並且記錄機器位置
/// <summary>
/// 1. 查詢頁面
/// </summary>        
public IActionResult QueryPage()
{
    var result = _accountTradeOrder.GetAccountTraderOrder();
    return View(result);
}

/// <summary>
/// 2. 建立假訂單 API
/// </summary>        
[HttpGet]
public IActionResult BuildAccountTradeOrder()
{
    _accountTradeOrder.BuildAccountTradeOrder();
    var result = _accountTradeOrder.GetAccountTraderOrder();
    return PartialView("_AccountTradeOrderPartial", result);
}

/// <summary>
/// 3. 查詢頁面(更新Table)
/// </summary>   
[HttpGet]
public IActionResult QueryPagePartial()
{
    var result = _accountTradeOrder.GetAccountTraderOrder();
    return PartialView("_AccountTradeOrderPartial", result);
}



Step 9:[Html 畫面]

分成 3 個部分,提供模擬用戶建立訂單,顯示畫面資訊,對應 QueryPage.cshtml 代碼
總共分成以下 3 個區塊:

1-1. 建立訂單按鈕 Dom 元件,提供點擊按鈕
1-2. 畫面顯示 Dom 元件,顯示訊息,當錯誤時會顯示於此
1-3. 渲染 Partial View Dom 元件,_AccountTradeOrderPartial.cshtml 是一個 Table 元件,顯示表內容
2. 建立訂單 點擊按鈕後觸發的事件,對後端 API 進行建立訂單
3. 輪詢用 - 更新訂單資料 每隔3秒執行一次,更新 Partial View 的 Table 資料
<html>
<body>
    <!-- 1-1. 建立訂單按鈕 -->
    <button id="buildTradeOrderBtn">建立假訂單</button>

    <!-- 1-2. 畫面顯示 -->
    <div id="resultMessage"></div>

    <!-- 1-3. 渲染 Partial View -->
    <div id="tradeOrderContainer">
        @await Html.PartialAsync("_AccountTradeOrderPartial", Model)
    </div>
</body>
<script>
    // 呼叫 API 事件 - 建立假訂單
    $(document).ready(function () {
        // 每3秒更新一次
        setInterval(queryPage, 3000);

        // 按鈕事件
        $('#buildTradeOrderBtn').click(function () {
            updateTradeOrders();
        });

        // 2. 建立訂單
        function updateTradeOrders() {
            $.ajax({
                url: '@Url.Action("BuildAccountTradeOrder", "Home")',
                type: 'GET',
                success: function (response) {
                    $('#tradeOrderContainer').html(response);
                },
                error: function () {
                    $('#resultMessage').text('Error occurred while processing the request.').css('color', 'red');
                }
            });
        }

        // 3. 輪詢用 - 更新訂單資料
        function queryPage() {
            $.ajax({
                url: '@Url.Action("QueryPagePartial", "Home")',
                type: 'GET',
                success: function (response) {
                    $('#tradeOrderContainer').html(response);
                },
                error: function () {
                    $('#resultMessage').text('Error occurred while processing the request.').css('color', 'red');
                }
            });
        }
    });
</script>
</html>



Step 10:[初始化配置]

初始化配置進行 RabbitMQ 的依賴注入,以及生產 & 消費者 Server 的建立

// 1. 注入相依
builder.Services.AddSingleton<IRabbitMqFactory, RabbitMqFactory>();
builder.Services.AddSingleton<FakeDataBase>();
builder.Services.AddScoped<IAccountTradeOrder, AccountTradeOrder>();

// 2. 注入 RabbitMQ Subscriber
builder.Services.AddRabbitMqSubscriber();

var app = builder.Build();

// ... Other Thing

// 3. RabbitMQ Subscriber 啟用交換器
app.InitMqSubscriber();



第四部分:DEMO 驗證成果

Step 1:建立機器A

我們發布相同的代碼發布,在本機上。
並且先調整 Appsetting.json 下的 ServerName 用於辨識

  "ServreName": "機器A" 


然後開啟 Windows Command Line 輸入以下指令啟動,機器 Port 為 6001

dotnet RabbitMQLoadBalanceAspCoreWebExample.dll --urls=http://localhost:6001




Step 2:建立機器B

同上一步驟,在另一個資料夾執行相同的步驟,但做點微調
調整 Appsetting.json 下的 ServerName 為以下:

  "ServreName": "機器B" 


然後開啟 Windows Command Line 輸入以下指令啟動,機器 Port 為 6002

dotnet RabbitMQLoadBalanceAspCoreWebExample.dll --urls=http://localhost:6002




Step 3:驗證成果

如下圖,我們依序執行

1. 機器B-建立訂單 畫面一開始會是空的,點擊建立訂單按鈕
2. 機器B-畫面顯示 這時產生訂單在下面的顯示區塊顯示,並且執行狀態是 False ,機器名稱因為還沒有消費處理所以空
3. 機器A-消費處理 這時透過 Rabbit MQ 發送隊列到 機器A ,A處裡完成更新自己的顯示畫面,並且標記處理者是[機器A]
  ※有網頁輪詢,實際操作時,每隔 3 秒會自動抓最新資料,機器B也會被更新成跟機器A一樣的顯示內容




Step 4:確認 Rabbit MQ

進入 Rabbit MQ 的 Web 檢視頁面,可以發現有資料透過我們設定的交換器並且產生過隊列 & 發送