首頁

目前文章總數:172 篇

  

最後更新:2025年 03月 22日

0084. Redis 生產者消費者模式 - Pub/Sub 與 Queue (List)的使用時機優劣分析與實作範例

日期:2025年 01月 18日

標籤: C# Nginx SignalR Redis Asp.net Core Web MVC

摘要:C# 學習筆記


應用所需:1. Visual Studio 2022 以上,支援 .net Core 8.0 以上
解決問題:1. 說明採用 **Redis** 的 Pub/Sub 與 Queue (List) 模式 實現生產者&消費者 模式有什麼優缺點
     2. Redis 與 RabbitMQ 都可以實現生產者&消費者模式,說明差異與使用時機
範例檔案:Redis Pub/Sub 生產者消費者代碼
     Redis Queue(List) 生產者消費者代碼
相關參考:0073. RabbitMQ 生產者-消費者模式下 & 如何使用 RabbitMQ 內建負載平衡進行推送
基本介紹:本篇分為三大部分。
第一部分:Redis 的生產者模式優劣
第二部分:Redis Pub/Sub 的生產者消費者模式
第三部分:Redis Queue (List) 的生產者消費者模式






第一部分:Redis 的生產者模式優劣

Step 1:差異性、優缺點

以下是整理的列表,並且加入 RabbitMQ 進行比較:

特性 Pub/Sub List RabbitMQ
即時性 高,即時推送給所有訂閱者 較低,消費者需主動拉取消息 高,支持即時消息推送
持久化 不支持,消息未消費即丟失 支持,未消費的消息保留在隊列中 支持,支持消息持久化到磁盤
多消費者支持 支持,所有訂閱者接收相同消息 不支持,每條消息只能被一個消費者處理 支持,可配置為多消費模式或單消費模式
消息順序性 不保證 保證 FIFO 支持 FIFO,且可以配置為嚴格順序
錯誤處理 不支持重試,消息可能丟失 消息可重新處理 支持,死信隊列和重試機制
消息篩選 不支持,所有訂閱者收到相同消息 不支持 支持通過交換機和綁定鍵過濾消息
消息吞吐量 高,且支持多種優化機制
消息確認 不支持,消息推送即完成 不需要 支持,確認後消息才認為成功處理
適用場景 廣播、通知 工作隊列、任務調度 複雜消息隊列架構,如分布式系統、事件驅動架構


由以上可知,在選擇時任務時,若小型專案,可以選用 Redis 處理生產者與消費者模式,如果資料沒有保存可靠性,可以只用 Pub/Sub 模式。



第二部分:Redis Pub/Sub 的生產者消費者模式

Step 1:範例專案架構

打開範例專案後 Redis Pub/Sub 生產者消費者代碼,架構基本分成以下:

1. Service 實現生產者、消費者的 Pub/Sub 方法
2. 背景服務 訂閱事件,實現消費者,處理 SubScribe Publish 到消費者的工作
3. Web控制器 提供 Html 畫面、生產者資料 API 接口
4. Html 畫面 畫面,呼叫生產者進行 Publish 工作
5. 初始化配置 Redis、Pub/Sub、背景服務所需的依賴注入



Step 2:[Pub/Sub] - RedisPubSubService.cs

實現生產者、消費者的 Pub/Sub
總共分成以下 2 個區塊:

1. 推送 Publish 到 Redis 頻道上
2. 訂閱 當前對象 Subscribe Redis 頻道,使被 Publish 時會收到資料工作
public class RedisPubSubService
{
    private readonly ISubscriber _subscriber;
    private readonly ConnectionMultiplexer _redis;
    public RedisPubSubService(string connectionString)
    {
        _redis = ConnectionMultiplexer.Connect(connectionString);
        _subscriber = _redis.GetSubscriber();
    }

    /// <summary>
    /// 1. 提供立即推送到 Redis 上 Subscribe 的對象
    /// </summary>
    public void Publish(string channel, string message)
    {
        _subscriber.Publish(channel, message);
    }

    /// <summary>
    /// 2. 提供對象進行訂閱,在這篇範例是背景服務
    /// </summary>
    public void Subscribe(string channel, Action<string> messageHandler)
    {
        _subscriber.Subscribe(channel, (redisChannel, value) =>
        {
            messageHandler(value);
        });
    }
}


Step 3:[Pub/Sub] - RedisConsumerService.cs

訂閱事件,實現消費者,處理 SubScribe Publish 到消費者的工作
建立背景工作,伺服器啟動時,都會保持處理 Publish 過來的資料,給訂閱者處理此工作

public class RedisConsumerService : BackgroundService
{
    private readonly RedisPubSubService _pubSubService;

    public RedisConsumerService(RedisPubSubService pubSubService)
    {
        _pubSubService = pubSubService;
    }

    //2-1. 背景建立 - 保持訂閱事件
    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _pubSubService.Subscribe("myChannel", message =>
        {
            // 2-2. 消費者處理接收到的消息
            Console.WriteLine($"收到資料囉,從 myChannel 來,得到的資訊: {message}");
        });

        return Task.CompletedTask;
    }
}


Step 4:[Pub/Sub] - HomeController.cs

畫面,呼叫生產者進行 Publish 工作

public class HomeController : Controller
{
    private readonly ILogger<HomeController> _logger;

    // 1. 訂閱生產者
    private readonly RedisPubSubService _pubSubService;
    public HomeController(ILogger<HomeController> logger,
        RedisPubSubService pubSubService)
    {
        _logger = logger;
        _pubSubService = pubSubService;
    }
    public IActionResult Index()
    {
        return View();
    }

    /// <summary>
    /// 2. 實現 Pub/Sub 生產者推送訊息
    /// </summary>
    [HttpPost]
    public IActionResult ProduceMessage([FromBody] MessageRequest request)
    {
        if (string.IsNullOrEmpty(request.Message))
        {
            return BadRequest("Message is required.");
        }
        _pubSubService.Publish("myChannel", request.Message);
        return Ok(new { message = "Message sent: " + request.Message });
    }
    public class MessageRequest
    {
        public string Message { get; set; }
    }
}


Step 5:[Pub/Sub] - Index.html

呼叫 API 實現生產者 Publish 的工作

@{
    ViewData["Title"] = "Home Page";
}

<div class="text-center">
    <h1 class="display-4">Pub/Sub Redis 範例說明</h1>
    <input type="button" value="點擊送出生產者資料" onclick="ClickButtonMethod()" />
    
</div>

<script>
    // 1. 呼叫 API 生產資料
    function ClickButtonMethod() {
        fetch('/Home/ProduceMessage', {
            method: 'POST',
            headers: {
                'Content-Type': 'application/json'
            },
            body: JSON.stringify({ message: 'Pub/Sub Redis 範例說明' })
        })
            .then(response => {
                if (!response.ok) {
                    // 錯誤處理:檢查非 2xx 狀態碼
                    throw new Error(`HTTP error! status: ${response.status}`);
                }
                return response.json(); // 確保響應是 JSON 格式
            })
            .then(data => {
                alert(data.message);
                location.reload();
            })
            .catch(error => {
                console.error('error:', error);
                alert('Error: ' + error.message);
            });
    }
</script>


Step 6:[Pub/Sub] - Program.cs

Redis、Pub/Sub、背景服務所需的依賴注入

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.
builder.Services.AddControllersWithViews();

// 1. 依賴注入 Redis 生產者 ( 127.0.0.1:6379 是本地的 Redis)
builder.Services.AddSingleton(new RedisPubSubService("127.0.0.1:6379"));
// 2. 依賴注入 Redis 消費者背景服務
builder.Services.AddHostedService<RedisConsumerService>();
</script>

...
...
...

app.Run();



Step 7:[Pub/Sub] - Demo 啟動專案

啟動專案時,點擊 Button

Step 8:[Pub/Sub] - Demo 完成

Pub/Sub 模式,生產的瞬間,消費者事件就會立刻進行處理



第三部分:Redis Queue (List) 的生產者消費者模式

Step 1:範例專案架構

打開範例專案後 Redis Queue(List) 生產者消費者代碼,架構基本分成以下:

1. Service 實現生產者、消費者的 Queue(List) 方法
2. 背景服務 提供生產者將資料加入 Redis Queue 中,實現消費者 Redis DeQueue 取出工作
3. Web控制器 提供 Html 畫面、生產者資料 API 接口
4. Html 畫面 畫面,呼叫生產者進行 Publish 工作
5. 初始化配置 Redis、Queue(List)、背景服務所需的依賴注入



Step 2:[Queue(List)] - RedisQueueService.cs

實現生產者、消費者的 Queue(List) 方法
總共分成以下 2 個區塊:

1. 加入 Queue 將資料放進 Redis 的 Queue 中加入
2. 取出 Queue 將資料從 Redis 的 Queue 中取出
public class RedisQueueService
{
    private readonly IDatabase _database;
    private readonly ConnectionMultiplexer _redis;

    public RedisQueueService(string connectionString)
    {
        _redis = ConnectionMultiplexer.Connect(connectionString);
        _database = _redis.GetDatabase();
    }

    /// <summary>
    /// 1. 將資料送到 Redis 的 Queue 中
    /// </summary>
    public void Enqueue(string queueName, string message)
    {
        _database.ListRightPush(queueName, message);
    }

    /// <summary>
    /// 2. 背景服務會從 Redis 的 Queue 中取出
    /// </summary>
    public string Dequeue(string queueName)
    {
        return _database.ListLeftPop(queueName);
    }
}


Step 3:[Queue(List)] - RedisQueueConsumerService.cs

提供生產者將資料加入 Redis Queue 中,實現消費者 Redis DeQueue 取出工作
建立背景工作,只要伺服器啟動,就會開始從 DeQueue 中將資料取出,利用 Redis 持久化資料的特性,避免資料遺失。

/// <summary>
/// 1. 消費者的 Host 在 Server 啟動時,就會保生產者
/// </summary>
public class RedisQueueConsumerService : BackgroundService
{
    private readonly RedisQueueService _queueService;
    private readonly int _waitMilliSecond = 10000;//等待 10 秒

    public RedisQueueConsumerService(RedisQueueService queueService)
    {
        _queueService = queueService;
    }

    //2-1. 背景建立 - 保持消費事件
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            // 2-2. 取出資料
            var message = _queueService.Dequeue("myQueue");

            if (message != null)
            {
                // 2-3. 消費者處理 Queue 堆積的消息
                Console.WriteLine($"從 myQueue 傳來的資料,進行消費 message: {message} 收到消費的時間: {DateTime.Now}");
            }
            else
            {
                // 2-4. 如果隊列為空,稍作等待
                await Task.Delay(_waitMilliSecond, stoppingToken);
            }
        }
    }
}


Step 4:[Queue(List)] - HomeController.cs

畫面,呼叫生產者進行 Publish 工作

public class HomeController : Controller
{
    private readonly ILogger<HomeController> _logger;
    private readonly RedisQueueService _queueService;

    public HomeController(ILogger<HomeController> logger
        , RedisQueueService queueService)
    {
        _queueService = queueService;
        _logger = logger;
    }

    public IActionResult Index()
    {
        return View();
    }

    /// <summary>
    /// 2. 實現 List Redis 生產者推送訊息
    /// </summary>
    [HttpPost]
    public IActionResult ProduceMessage([FromBody] MessageRequest request)
    {
        if (string.IsNullOrEmpty(request.Message))
        {
            return BadRequest("Message is required.");
        }
        request.Message = request.Message + $@" => 傳送資料的時間 : {DateTime.Now}";

        _queueService.Enqueue("myQueue", request.Message);
        return Ok(new { message = "Message added to queue: " + request.Message });
    }

    public class MessageRequest
    {
        public string Message { get; set; }
    }
}


Step 5:[Queue(List)] - Index.html

呼叫 API ,實現生產者進行 Queue 工作

@{
    ViewData["Title"] = "Home Page";
}

<div class="text-center">
    <h1 class="display-4">List Redis 範例說明</h1>
    <input type="button" value="點擊送出生產者資料" onclick="ClickButtonMethod()" />

</div>

<script>
    // 1. 呼叫 API 生產資料
    function ClickButtonMethod() {
        fetch('/Home/ProduceMessage', {
            method: 'POST',
            headers: {
                'Content-Type': 'application/json'
            },
            body: JSON.stringify({ message: 'List Redis 範例說明' })
        })
            .then(response => {
                if (!response.ok) {
                    // 錯誤處理:檢查非 2xx 狀態碼
                    throw new Error(`HTTP error! status: ${response.status}`);
                }
                return response.json(); // 確保響應是 JSON 格式
            })
            .then(data => {
                alert(data.message);
                location.reload();
            })
            .catch(error => {
                console.error('error:', error);
                alert('Error: ' + error.message);
            });
    }
</script>


Step 6:[Queue(List)] - Program.cs

Redis、Pub/Sub、背景服務所需的依賴注入

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.
builder.Services.AddControllersWithViews();

// 1. 依賴注入 Redis 生產者 ( 127.0.0.1:6379 是本地的 Redis)
builder.Services.AddSingleton(new RedisQueueService("127.0.0.1:6379"));
// 2. 依賴注入 Redis 消費者背景服務
builder.Services.AddHostedService<RedisQueueConsumerService>();
</script>

...
...
...

app.Run();



Step 7:[Queue(List)] - Demo 啟動專案

啟動專案時,點擊 Button

Step 8:[Queue(List)] - Demo 完成

Queue(List) 模式,生產後,消費者事件會在背景服務指定的時間觸發時才消費處理

Step 8:[Queue(List)] - Demo 持久化特性

若伺服器當機時,有資料在 Redis 的 Queue 中,當啟動機器會自動完成消費者模式的資料。
藉由 Redis 持久化特性,保持資料不遺失