分享程式代碼相關筆記
目前文章總數:172 篇
最後更新:2025年 03月 22日
以下是整理的列表,並且加入 RabbitMQ 進行比較:
特性 | Pub/Sub | List | RabbitMQ |
---|---|---|---|
即時性 | 高,即時推送給所有訂閱者 | 較低,消費者需主動拉取消息 | 高,支持即時消息推送 |
持久化 | 不支持,消息未消費即丟失 | 支持,未消費的消息保留在隊列中 | 支持,支持消息持久化到磁盤 |
多消費者支持 | 支持,所有訂閱者接收相同消息 | 不支持,每條消息只能被一個消費者處理 | 支持,可配置為多消費模式或單消費模式 |
消息順序性 | 不保證 | 保證 FIFO | 支持 FIFO,且可以配置為嚴格順序 |
錯誤處理 | 不支持重試,消息可能丟失 | 消息可重新處理 | 支持,死信隊列和重試機制 |
消息篩選 | 不支持,所有訂閱者收到相同消息 | 不支持 | 支持通過交換機和綁定鍵過濾消息 |
消息吞吐量 | 高 | 高 | 高,且支持多種優化機制 |
消息確認 | 不支持,消息推送即完成 | 不需要 | 支持,確認後消息才認為成功處理 |
適用場景 | 廣播、通知 | 工作隊列、任務調度 | 複雜消息隊列架構,如分布式系統、事件驅動架構 |
由以上可知,在選擇時任務時,若小型專案,可以選用 Redis 處理生產者與消費者模式,如果資料沒有保存可靠性,可以只用 Pub/Sub 模式。
打開範例專案後 Redis Pub/Sub 生產者消費者代碼,架構基本分成以下:
1. Service | : | 實現生產者、消費者的 Pub/Sub 方法 |
2. 背景服務 | : | 訂閱事件,實現消費者,處理 SubScribe Publish 到消費者的工作 |
3. Web控制器 | : | 提供 Html 畫面、生產者資料 API 接口 |
4. Html 畫面 | : | 畫面,呼叫生產者進行 Publish 工作 |
5. 初始化配置 | : | Redis、Pub/Sub、背景服務所需的依賴注入 |
實現生產者、消費者的 Pub/Sub
總共分成以下 2 個區塊:
1. 推送 | : | Publish 到 Redis 頻道上 |
2. 訂閱 | : | 當前對象 Subscribe Redis 頻道,使被 Publish 時會收到資料工作 |
public class RedisPubSubService
{
private readonly ISubscriber _subscriber;
private readonly ConnectionMultiplexer _redis;
public RedisPubSubService(string connectionString)
{
_redis = ConnectionMultiplexer.Connect(connectionString);
_subscriber = _redis.GetSubscriber();
}
/// <summary>
/// 1. 提供立即推送到 Redis 上 Subscribe 的對象
/// </summary>
public void Publish(string channel, string message)
{
_subscriber.Publish(channel, message);
}
/// <summary>
/// 2. 提供對象進行訂閱,在這篇範例是背景服務
/// </summary>
public void Subscribe(string channel, Action<string> messageHandler)
{
_subscriber.Subscribe(channel, (redisChannel, value) =>
{
messageHandler(value);
});
}
}
訂閱事件,實現消費者,處理 SubScribe Publish 到消費者的工作
建立背景工作,伺服器啟動時,都會保持處理 Publish 過來的資料,給訂閱者處理此工作
public class RedisConsumerService : BackgroundService
{
private readonly RedisPubSubService _pubSubService;
public RedisConsumerService(RedisPubSubService pubSubService)
{
_pubSubService = pubSubService;
}
//2-1. 背景建立 - 保持訂閱事件
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
_pubSubService.Subscribe("myChannel", message =>
{
// 2-2. 消費者處理接收到的消息
Console.WriteLine($"收到資料囉,從 myChannel 來,得到的資訊: {message}");
});
return Task.CompletedTask;
}
}
畫面,呼叫生產者進行 Publish 工作
public class HomeController : Controller
{
private readonly ILogger<HomeController> _logger;
// 1. 訂閱生產者
private readonly RedisPubSubService _pubSubService;
public HomeController(ILogger<HomeController> logger,
RedisPubSubService pubSubService)
{
_logger = logger;
_pubSubService = pubSubService;
}
public IActionResult Index()
{
return View();
}
/// <summary>
/// 2. 實現 Pub/Sub 生產者推送訊息
/// </summary>
[HttpPost]
public IActionResult ProduceMessage([FromBody] MessageRequest request)
{
if (string.IsNullOrEmpty(request.Message))
{
return BadRequest("Message is required.");
}
_pubSubService.Publish("myChannel", request.Message);
return Ok(new { message = "Message sent: " + request.Message });
}
public class MessageRequest
{
public string Message { get; set; }
}
}
呼叫 API 實現生產者 Publish 的工作
@{
ViewData["Title"] = "Home Page";
}
<div class="text-center">
<h1 class="display-4">Pub/Sub Redis 範例說明</h1>
<input type="button" value="點擊送出生產者資料" onclick="ClickButtonMethod()" />
</div>
<script>
// 1. 呼叫 API 生產資料
function ClickButtonMethod() {
fetch('/Home/ProduceMessage', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({ message: 'Pub/Sub Redis 範例說明' })
})
.then(response => {
if (!response.ok) {
// 錯誤處理:檢查非 2xx 狀態碼
throw new Error(`HTTP error! status: ${response.status}`);
}
return response.json(); // 確保響應是 JSON 格式
})
.then(data => {
alert(data.message);
location.reload();
})
.catch(error => {
console.error('error:', error);
alert('Error: ' + error.message);
});
}
</script>
Redis、Pub/Sub、背景服務所需的依賴注入
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddControllersWithViews();
// 1. 依賴注入 Redis 生產者 ( 127.0.0.1:6379 是本地的 Redis)
builder.Services.AddSingleton(new RedisPubSubService("127.0.0.1:6379"));
// 2. 依賴注入 Redis 消費者背景服務
builder.Services.AddHostedService<RedisConsumerService>();
</script>
...
...
...
app.Run();
啟動專案時,點擊 Button
Pub/Sub 模式,生產的瞬間,消費者事件就會立刻進行處理
打開範例專案後 Redis Queue(List) 生產者消費者代碼,架構基本分成以下:
1. Service | : | 實現生產者、消費者的 Queue(List) 方法 |
2. 背景服務 | : | 提供生產者將資料加入 Redis Queue 中,實現消費者 Redis DeQueue 取出工作 |
3. Web控制器 | : | 提供 Html 畫面、生產者資料 API 接口 |
4. Html 畫面 | : | 畫面,呼叫生產者進行 Publish 工作 |
5. 初始化配置 | : | Redis、Queue(List)、背景服務所需的依賴注入 |
實現生產者、消費者的 Queue(List) 方法
總共分成以下 2 個區塊:
1. 加入 Queue | : | 將資料放進 Redis 的 Queue 中加入 |
2. 取出 Queue | : | 將資料從 Redis 的 Queue 中取出 |
public class RedisQueueService
{
private readonly IDatabase _database;
private readonly ConnectionMultiplexer _redis;
public RedisQueueService(string connectionString)
{
_redis = ConnectionMultiplexer.Connect(connectionString);
_database = _redis.GetDatabase();
}
/// <summary>
/// 1. 將資料送到 Redis 的 Queue 中
/// </summary>
public void Enqueue(string queueName, string message)
{
_database.ListRightPush(queueName, message);
}
/// <summary>
/// 2. 背景服務會從 Redis 的 Queue 中取出
/// </summary>
public string Dequeue(string queueName)
{
return _database.ListLeftPop(queueName);
}
}
提供生產者將資料加入 Redis Queue 中,實現消費者 Redis DeQueue 取出工作
建立背景工作,只要伺服器啟動,就會開始從 DeQueue 中將資料取出,利用 Redis 持久化資料的特性,避免資料遺失。
/// <summary>
/// 1. 消費者的 Host 在 Server 啟動時,就會保生產者
/// </summary>
public class RedisQueueConsumerService : BackgroundService
{
private readonly RedisQueueService _queueService;
private readonly int _waitMilliSecond = 10000;//等待 10 秒
public RedisQueueConsumerService(RedisQueueService queueService)
{
_queueService = queueService;
}
//2-1. 背景建立 - 保持消費事件
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
// 2-2. 取出資料
var message = _queueService.Dequeue("myQueue");
if (message != null)
{
// 2-3. 消費者處理 Queue 堆積的消息
Console.WriteLine($"從 myQueue 傳來的資料,進行消費 message: {message} 收到消費的時間: {DateTime.Now}");
}
else
{
// 2-4. 如果隊列為空,稍作等待
await Task.Delay(_waitMilliSecond, stoppingToken);
}
}
}
}
畫面,呼叫生產者進行 Publish 工作
public class HomeController : Controller
{
private readonly ILogger<HomeController> _logger;
private readonly RedisQueueService _queueService;
public HomeController(ILogger<HomeController> logger
, RedisQueueService queueService)
{
_queueService = queueService;
_logger = logger;
}
public IActionResult Index()
{
return View();
}
/// <summary>
/// 2. 實現 List Redis 生產者推送訊息
/// </summary>
[HttpPost]
public IActionResult ProduceMessage([FromBody] MessageRequest request)
{
if (string.IsNullOrEmpty(request.Message))
{
return BadRequest("Message is required.");
}
request.Message = request.Message + $@" => 傳送資料的時間 : {DateTime.Now}";
_queueService.Enqueue("myQueue", request.Message);
return Ok(new { message = "Message added to queue: " + request.Message });
}
public class MessageRequest
{
public string Message { get; set; }
}
}
呼叫 API ,實現生產者進行 Queue 工作
@{
ViewData["Title"] = "Home Page";
}
<div class="text-center">
<h1 class="display-4">List Redis 範例說明</h1>
<input type="button" value="點擊送出生產者資料" onclick="ClickButtonMethod()" />
</div>
<script>
// 1. 呼叫 API 生產資料
function ClickButtonMethod() {
fetch('/Home/ProduceMessage', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({ message: 'List Redis 範例說明' })
})
.then(response => {
if (!response.ok) {
// 錯誤處理:檢查非 2xx 狀態碼
throw new Error(`HTTP error! status: ${response.status}`);
}
return response.json(); // 確保響應是 JSON 格式
})
.then(data => {
alert(data.message);
location.reload();
})
.catch(error => {
console.error('error:', error);
alert('Error: ' + error.message);
});
}
</script>
Redis、Pub/Sub、背景服務所需的依賴注入
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddControllersWithViews();
// 1. 依賴注入 Redis 生產者 ( 127.0.0.1:6379 是本地的 Redis)
builder.Services.AddSingleton(new RedisQueueService("127.0.0.1:6379"));
// 2. 依賴注入 Redis 消費者背景服務
builder.Services.AddHostedService<RedisQueueConsumerService>();
</script>
...
...
...
app.Run();
啟動專案時,點擊 Button
Queue(List) 模式,生產後,消費者事件會在背景服務指定的時間觸發時才消費處理
若伺服器當機時,有資料在 Redis 的 Queue 中,當啟動機器會自動完成消費者模式的資料。
藉由 Redis 持久化特性,保持資料不遺失