分享程式代碼相關筆記
目前文章總數:217 篇
最後更新:2026年 01月 31日
在分布式環境下,多個應用實例可能同時訪問同一個資源(如數據庫記錄、文件等),如果沒有適當的協調機制,會導致以下狀況:
| 1. 數據不一致 | 多個實例同時修改數據,造成相互覆蓋或髒數據 |
| 2. 重複執行 | 同一任務被多個實例重複處理,例如重複發送消息、重複扣款 |
| 3. 資源競爭 | 多個實例爭搶有限資源,造成系統混亂 |
這時有了 Redis 的分布式鎖,可以防止以上的狀況發生,假設電商庫存系統如圖示意:
沒有分布式鎖的狀況:
有分布式鎖的狀況,確保了原子性:
會說明如何透過 主從架構下 + 同時觸發 執行的業務,展示透過 Redis 分布式鎖達到寫入資料庫時保持原子性
會透過 Quartz.NET 精確地讓兩個服務同時觸發同個業務,這時若 Redis 分布式鎖有效,那麼就只會有一筆資料寫進到 Mysql 資料庫中
| 1. 主從服務 | 同時觸發業務 |
| 2. Redis 分布式鎖 | 第一道牆,會說明 阻塞式 & 非阻塞式 差異 |
| 3. Mysql 資料庫 | 正確的使用分布式鎖,可以保證資料庫的寫入原子性 |
範例會有一個工作流的資料表,Demo 寫入的結果。
※實務上經常可以看到像 Log 都會流水號 Id 遞增。
CREATE TABLE `WorkerFlowMessage` (
`WorkerFlowMessageId` INT(10) NOT NULL AUTO_INCREMENT COMMENT '工作流Id',
`ServiceName` VARCHAR(100) NOT NULL COMMENT '工作名稱' COLLATE 'utf8mb4_0900_ai_ci',
`Message` VARCHAR(50) NOT NULL DEFAULT '' COMMENT '訊息' COLLATE 'utf8mb4_0900_ai_ci',
`CreateTime` DATETIME NULL DEFAULT CURRENT_TIMESTAMP COMMENT '建立時間',
PRIMARY KEY (`WorkerFlowMessageId`) USING BTREE
)
COMMENT='分布式鎖工作流紀錄表'
COLLATE='utf8mb4_0900_ai_ci'
ENGINE=InnoDB
;
啟動 3 個 Redis 實例容器, RedLock.NET 有 過半數投票制 ,避免 Redis 單點故障問題
Nuget 安裝以下套件,RedLock.NET 是主流的 Reids 分布式套件,但相依於 ExchangeRedis
RedLock.NET
StackExchange.Redis
Nuget 安裝以下套件,Quartz.NET 是高效能的排程套件,並且可以精確到微秒等級的執行時間
也是模擬併發業務事件的有利輔助套件
Quartz.NET
啟動 1 個 Mysql 實例容器,只會用到一張表來做為 Demo 說明 Redis 分布式鎖應用
打開本篇範例代碼後,架構基本分成以下:
| 1. 資料庫 Framework | : | 實現 IUnitOfWork 、工廠方法,支援 Mysql 交易寫入、回滾,易於擴展 |
| 2. Redis 分布式實作 | : | 實現 RedLock.NET 的阻塞式、非阻塞式分布式鎖 |
| 3. 背景程式 | : | 實現 Quartz.NET Job,業務目的是寫入 Mysql 資料庫 |
| 4. 擴展工具 | : | 實現上述 3 種的 DI 注入封裝方法 |
| 5. 設定檔案 | : | Mysql、Redis 連線設定 |
| 6. 初始化配置 | : | 基本的依賴注入 |
在 Program.cs 中,注入 Quartz.NET ,當程式啟動時,會同時啟動 2 個相同的 Job 在同一時間下
並且每隔 10 秒會在執行一次,模擬併發情境
/// <summary>
/// Quartz.NET 排程 DI
/// </summary>
public static IServiceCollection AddQuartzNETJob(this IServiceCollection services, DateTime startAt)
{
try
{
services.AddQuartz(q =>
{
// 1. 建立 Job
var selfKey = new JobKey(nameof(QuartzJobForDistributed));
q.AddJob<QuartzJobForDistributed>(opts => opts.WithIdentity(selfKey));
// 2. 啟動時間,目的是模擬高併發精準到微秒
var startedDateTime = new DateTimeOffset(startAt,
TimeSpan.FromHours(8));//Tw 時間
// 3-1. 建立 Trigger 1
q.AddTrigger(opts => opts
.ForJob(selfKey)
.WithIdentity("WorkerFlowJob-trigger1")
.UsingJobData("SourceTag", "Server_A") // 標記 - 假設架設在機器A
.StartAt(startedDateTime)
.WithSimpleSchedule(x => x
.WithIntervalInSeconds(10)// 模擬同時每 10s 併發碰撞
.RepeatForever()));
// 3-2. 建立 Trigger 2
q.AddTrigger(opts => opts
.ForJob(selfKey)
.WithIdentity("WorkerFlowJob-trigger2")
.UsingJobData("SourceTag", "Server_B") // 標記 - 假設架設在機器B
.StartAt(startedDateTime)
.WithSimpleSchedule(x => x
.WithIntervalInSeconds(10)// 模擬同時每 10s 併發碰撞
.RepeatForever()));
});
// 4. 新增 Quartz 託管服務,這會負責啟動/停止排程器
services.AddQuartzHostedService(q => q.WaitForJobsToComplete = true);
}
catch (Exception ex)
{
Console.WriteLine($"Quartz.NET DI Error:{ex.Message}");
}
return services;
}
啟動範例代碼後,會進入實際執行服務,以下是無分布式鎖的狀況
public class QuartzJobForDistributed : IJob
{
private readonly IDistributedLock _distributedLock;
private readonly IUnitOfWorkFactory _uowFactory;
private readonly ILogger<QuartzJobForDistributed> _logger;
public QuartzJobForDistributed(IDistributedLock distributedLock,
IUnitOfWorkFactory uowFactory,
ILogger<QuartzJobForDistributed> logger)
{
_distributedLock = distributedLock;
_uowFactory = uowFactory;
_logger = logger;
}
/// <summary>
/// 實際執行
/// </summary>
public async Task Execute(IJobExecutionContext context)
{
// 1. 當前運行的 Job (標記判斷)
var sourceTag = context.MergedJobDataMap.GetString("SourceTag") ?? string.Empty;
// 2. Redis 阻塞鎖 : WaitLockExecuteAsync
Console.WriteLine(DateTime.Now.ToLongTimeString());
// 3. 寫入資料庫,模擬併發碰撞
await InsertData(sourceTag);
}
/// <summary>
/// 寫入到資料庫
/// </summary>
private async Task InsertData(string sourceTag)
{
//略
}
}
啟動後, 2 個 Job 每 10 秒同時啟動,並且會各自寫到資料庫,我們預期只會有一筆資料寫進資料庫中
目標是主從架構,只會有一個服務實際寫入,當其中一個服務在寫入時,其他伺服器可以跳過
重複寫入資料表的資料
替換代碼,呼叫 WaitLockExecuteAsync() 來進行寫入資料庫
public class QuartzJobForDistributed : IJob
{
private readonly IDistributedLock _distributedLock;
private readonly IUnitOfWorkFactory _uowFactory;
private readonly ILogger<QuartzJobForDistributed> _logger;
public QuartzJobForDistributed(IDistributedLock distributedLock,
IUnitOfWorkFactory uowFactory,
ILogger<QuartzJobForDistributed> logger)
{
_distributedLock = distributedLock;
_uowFactory = uowFactory;
_logger = logger;
}
/// <summary>
/// 實際執行
/// </summary>
public async Task Execute(IJobExecutionContext context)
{
var redisKey = $@"{nameof(QuartzJobForDistributed)}";
// 1. 當前運行的 Job (標記判斷)
var sourceTag = context.MergedJobDataMap.GetString("SourceTag") ?? string.Empty;
// 2. Redis 阻塞鎖 : WaitLockExecuteAsync
Console.WriteLine(DateTime.Now.ToLongTimeString());
await _distributedLock.WaitLockExecuteAsync(redisKey, async() =>
{
// 3. 寫入資料庫,模擬併發碰撞
await InsertData(sourceTag);
}, expiryTime: TimeSpan.FromSeconds(10),
waitTime: TimeSpan.FromSeconds(2),
retryTime: TimeSpan.FromSeconds(1));
}
/// <summary>
/// 寫入到資料庫
/// </summary>
private async Task InsertData(string sourceTag)
{
//略
}
}
實際 WaitLockExecuteAsync() 的方法內容如下
每個拿到鎖的排程,都會有最小等待時間 waitTime: wait ,並且執行完畢會自動釋放鎖
不管 expiryTime: expiry 是否到期, 做完立刻釋放
/// <summary>
/// 阻塞式
/// </summary>
public async Task WaitLockExecuteAsync(string redisKey, Func<Task> act, TimeSpan expiry = default, TimeSpan wait = default, TimeSpan retry = default)
{
if (expiry == default(TimeSpan))// 未傳入時,預設 expire 時間
expiry = TimeSpan.FromSeconds(2);
if (wait == default(TimeSpan)) // 未傳入時,預設放棄重試時間
wait = TimeSpan.FromSeconds(1);
if (retry == default(TimeSpan)) // 未傳入時,預設重試間隔時間
retry = TimeSpan.FromMilliseconds(20);
var redisLockKey = $"{_keyPrefix}_{redisKey}";
// 使用 await using 自動管理生命週期
await using var redLock = await _lockFactory.CreateLockAsync(
resource: redisLockKey,
expiryTime: expiry,
waitTime: wait,// 存在最小排隊時間,形成阻塞式鎖
retryTime: retry
);
if (redLock.IsAcquired)
{
await act();
}
}
運行阻塞式鎖的代碼,可以觀察到同時啟動紀錄
仍然會重複插入資料,這是因為 阻塞式鎖 第一個工作拿到鎖做完就立刻釋放,並且第二個鎖在等待排隊一下,然後就進入了
如圖,假設 Server A 先拿到鎖, Server B 會看到被鎖住後會有以下行為
| 1. Server A | : | 17:40 拿到鎖,進入執行工作,1秒內順利執行完成 |
| 2. Server B | : | 17:40 看到被鎖了,但是阻塞式鎖可以等待 2 秒,剛好 ServerA 在1秒內做為,輪到我工作了 |
因此阻塞式鎖適合多執行緒的工作,而非需要原子性的工作
替換代碼,呼叫 TryLockExecuteAsync() 來進行寫入資料庫
public class QuartzJobForDistributed : IJob
{
private readonly IDistributedLock _distributedLock;
private readonly IUnitOfWorkFactory _uowFactory;
private readonly ILogger<QuartzJobForDistributed> _logger;
public QuartzJobForDistributed(IDistributedLock distributedLock,
IUnitOfWorkFactory uowFactory,
ILogger<QuartzJobForDistributed> logger)
{
_distributedLock = distributedLock;
_uowFactory = uowFactory;
_logger = logger;
}
/// <summary>
/// 實際執行
/// </summary>
public async Task Execute(IJobExecutionContext context)
{
var redisKey = $@"{nameof(QuartzJobForDistributed)}";
// 1. 當前運行的 Job (標記判斷)
var sourceTag = context.MergedJobDataMap.GetString("SourceTag") ?? string.Empty;
// 2. Redis 非阻塞鎖 : TryLockExecuteAsync
Console.WriteLine(DateTime.Now.ToLongTimeString());
await _distributedLock.TryLockExecuteAsync(redisKey, async () =>
{
// 3. 寫入資料庫,模擬併發碰撞
await InsertData(sourceTag);
}, expiryTime: TimeSpan.FromSeconds(60) // 要給定合理時間,避免過久鎖住事務,未釋放
);
}
/// <summary>
/// 寫入到資料庫
/// </summary>
private async Task InsertData(string sourceTag)
{
//略
}
}
實際 TryLockExecuteAsync() 的方法內容如下
每個拿到鎖的排程,仍然執行完畢會自動釋放鎖
並且在鎖的其中,其他人看到會立刻放棄,也不排隊 waitTime: TimeSpan.Zero,
await Task.Delay(1000); 這是不讓同時併發的情況下,另一個服務也能訪問,避免誤入臨界值
/// <summary>
/// 非阻塞式
/// </summary>
public async Task TryLockExecuteAsync(string redisKey, Func<Task> act, TimeSpan expiry)
{
var redisLockKey = $"{_keyPrefix}_{redisKey}";
await using var redLock = await _lockFactory.CreateLockAsync(
resource: redisLockKey,
expiryTime: expiry,
// 非阻塞式強制等待時間為 0 ,任何訪問者使用時若正在執行,直接跳過
waitTime: TimeSpan.Zero,
retryTime: TimeSpan.Zero
);
if (redLock.IsAcquired)
{
await act();
await Task.Delay(1000);//至少1秒處理時間,避免執行太快,另一個業務也能拿到
}
}
運行非阻塞式鎖的代碼,可以觀察到同時啟動紀錄
雖然同時觸發,但最終只會有一個執行寫入工作,達成服務的主從目的
兩者特性差異:
| 特性 | 阻塞式鎖 (Blocking / Wait) | 非阻塞式鎖 (Non-blocking / TryLock) |
|---|---|---|
| 行為模式 | 沒搶到鎖時,會在那裡等 (Retry),直到拿到鎖或超時為止。 | 沒搶到鎖時,立刻轉頭就走 (回傳失敗),不浪費時間。 |
| 程式碼體現 | waitTime 設定為 5~10 秒。 | waitTime 設定為 Zero。 |
| 執行次數 | 所有請求最終都會執行(只要沒超時)。 | 只有第一個請求會執行,其餘直接跳過。 |
| 系統負擔 | 較高。因為執行緒會掛在那裡等待並重複嘗試。 | 較低。不佔用等待執行緒。 |
非阻塞 = 高效率,適合「重複性」任務。
阻塞 = 高可靠,適合「交易性」任務。
我們實現了分散式排程器 (Quartz.NET):確保多個節點(Server A/B)不會同時執行同一個重量級任務。
但需要在非阻塞式中添加 Task.Delay(1000),在精確的同個時間點的併發下,避免臨界進入
以下是 RedLock.NET 的特性:
| 特性 | RedLock.NET 的行為 | 備註 |
|---|---|---|
| 節點數量 | 建議 3 台以上獨立節點 | 達成 N/2+1 的過半數共識。 |
| 生命週期 | using 結束立即釋放 | 主動解鎖,不適合用來做「強制冷卻」。 |
| 安全性 | 處理了 Failover 遺失問題 | 比單機 LockTake 更安全。 |
| 時間管理 | 自動續期 (Renew) | 防止業務邏輯跑太久導致鎖失效。 |