分享程式代碼相關筆記
目前文章總數:222 篇
最後更新:2026年 03月 07日
資料庫架構中,以下是 XA 與 SAGA 兩者的差異
| 比較項目 | Saga | XA(2PC) |
|---|---|---|
| 核心概念 | 將大交易拆成多個本地交易 + 補償機制 | 使用兩階段提交(Prepare / Commit)確保全域一致性 |
| 一致性模型 | 最終一致性(Eventually Consistent) | 強一致性(Strong Consistency) |
| 交易鎖定時間 | 無長時間鎖定 | 會長時間鎖定資源(直到全域提交) |
| 效能 | 較好(無全域鎖) | 較差(阻塞式提交) |
| 可用性 | 高(失敗可補償) | 較低(Coordinator 故障可能阻塞) |
| 複雜度 | 業務邏輯複雜(需設計補償) | 技術複雜度高(需支援 XA protocol) |
| 對資料庫要求 | 不需要特別支援 | 需要資料庫支援 XA |
| 適合架構 | 微服務架構 | 傳統單體 / 強一致性金融場景 |
| 失敗處理 | 透過補償交易回滾 | 透過全域回滾 |
| 可擴展性 | 高 | 較低(Coordinator 成為瓶頸) |
| 併發能力 | 高 | 低(鎖競爭嚴重) |
| 業務侵入性 | 高(需寫補償邏輯) | 低(由交易管理器處理) |
對比之下,若需要 分散式資料庫 + 效能高 + 非金融 + 開發時間充裕 ,那麼 SAGA 很適合考慮引用
在應用情境下,若非單式體的小型專案,那麼主流大多是 Saga 模式,解決分布式資料庫存儲問題
| 情境 | 建議 |
|---|---|
| 微服務架構 | Saga |
| 高併發電商 | Saga |
| 金融核心帳務 | XA |
| 跨多資料庫強一致 | XA |
| 需要高可用與水平擴展 | Saga |
Saga = 用補償換取高可用與高擴展
XA = 用鎖與阻塞換取強一致性
Saga 模式實現的成本為以下 3 點:
| 成本要點 | 說明 | 補充 |
|---|---|---|
| 1. 補償交易設計 | - 要對業務透徹理解才能實現補償機制 | 對系統的業務邏輯要完整理解,若一知半解會導致主體流程在補償中異常卡住 |
| - 不是所有行為都可逆 | 發送 Email 不可補償 / 扣庫存可以補償(加回)/ 金流請款要增加退款流程 | |
| 2. 冪等設計 | 每個步驟都必須擁有冪等性設計 | 1. 可重試 |
| 2. 不可重複執行造成副作用 | ||
| 3. 能根據 SagaId / BusinessId 判斷是否已完成 | ||
| 3. 過渡狀態管理 | 中間狀態管理 | Order = Created / Payment = Processing / Inventory = Reserved … |
| 每個狀態碼都需知道處理行為(呼應需理解系統的業務邏輯) |
因此對產品系統沒有一定程度的理解,是不容易 設計出正確的 Saga 模型
Saga 需要深度理解系統業務,而且比 XA 還要求高,因此 SAGA 模式,困難的地方是 複雜度提高(開發時間增加)
以下是 XA 與 Saga 的責任區分
| 項目 | XA | Saga |
|---|---|---|
| 一致性 | DB 保證 | 你自己保證 |
| 回滾 | DB rollback | 你設計補償 |
| 錯誤處理 | 交易失敗就 rollback | 你要判斷「現在走到哪一步」 |
| 冪等性 | DB 幫你處理 | 每個步驟都要自己設計 |
簡言之:
SAGA => 把「資料庫原本做的事」全部拿回來自己做。
開發團隊成員若以下其中一項沒有實現,就會造成嚴重的災難
※極有可能出現代碼都是對的,但常常出現 最終沒有一致性
| 1. 補償設計錯 |
| 2. 狀態機設計錯 |
| 3. 邊界條件沒考慮 |
| 4. 少一個冪等檢查 |
| 5. 沒考慮重試 + Timeout + Crash |
Saga 模型的實作方式有 2 種,分別是 Orchestration(中央協調式)、 Choreography(事件驅動式)
本偏代碼範例說明實現的是 Orchestration(中央協調式),以下是 2 者的差異:
| 比較項目 | Choreography(事件驅動式) | Orchestration(中央協調式) |
|---|---|---|
| 核心概念 | 各服務透過事件自行決定下一步 | 由中央 Orchestrator 控制整個流程 |
| 流程控制者 | 沒有中央控制者 | 有一個 Saga Orchestrator |
| 控制方式 | Event-driven(發布 / 訂閱) | Command-driven(下達指令) |
| 耦合度 | 服務之間隱性耦合 | 與 Orchestrator 顯性耦合 |
| 可讀性 | 流程分散,較難看出全貌 | 流程集中,較清楚 |
| 擴展性 | 高(新增服務只需訂閱事件) | 中等(需修改 Orchestrator) |
| 除錯難度 | 高(流程散落各服務) | 較低(集中在協調者) |
| 業務流程複雜度 | 適合簡單流程 | 適合複雜流程 |
| 狀態管理 | 分散在各服務 | 集中管理 Saga 狀態 |
| 失敗處理 | 透過事件觸發補償 | Orchestrator 負責補償流程 |
| 交易可視性 | 低(需追蹤多個事件) | 高(可追蹤 SagaId 全流程) |
| 維護成本 | 流程複雜時成本高 | Orchestrator 可能成為複雜核心 |
| 單點風險 | 無單點 | Orchestrator 可能成為單點(需 HA) |
| 適合團隊成熟度 | 高(需良好事件設計能力) | 中高(需架構治理能力) |
| 常見搭配技術 | Kafka / RabbitMQ 等 MQ | 工作流引擎 / 自建 Saga Coordinator |
簡言之:若是想要系統可維護性高,未來複雜度低、可適用擴展的大型專案,那麼推薦 Orchestration(中央協調式) 開始。
Saga 模型的實作方式有 2 種,分別是 Orchestration(中央協調式)、 Choreography(事件驅動式)
本偏代碼範例說明實現的是 Orchestration(中央協調式),以下是 2 者的差異:
| 情境 | 建議 |
|---|---|
| 流程簡單(下單 → 扣庫存 → 扣款) | Choreography |
| 流程複雜(多分支、多補償、多條件) | Orchestration |
| 團隊缺乏整體流程掌握能力 | Orchestration |
| 極度去中心化架構 | Choreography |
| 需要高度可觀察性與流程追蹤 | Orchestration |
Choreography: 是最佳的解耦合,但難於維護
Orchestration: 是最佳的可維護性,但中心服務會導致系統有高耦合
SAGA 模式的優點如下,實現 最終一致性:
優點結論:高效能
| 1. 不需要全域鎖(無 2PC):不會長時間鎖住多個資料庫,適用於高併發、避免 XA 阻塞問題,並且不會因單點卡死而整筆交易卡住 |
| 2. 高可用、抗故障能力強:某個服務暫時掛掉, 不會讓整個系統停擺 、 可以重試 、 可以補償 |
| 3. 微服務架構:每個服務擁有自己的資料庫、不需要跨 DB 事務、不強耦合 |
| 4. 可橫向擴展:基於本地交易、非同步、最終一致性,因此更容易 scale out |
並遺毒的缺點如下:
缺點結論:高複雜度
| 1. 系統複雜度高:需要設計補償交易、Saga Log、重試機制、冪等性 |
| 2. 中間狀態不一致:在流程中出現處於某種狀態 EX: A 成功 / B 失敗 / 補償尚未執行 |
| 3. 補償不一定能完全還原:補償交易 ≠ 回滾。 EX: 發送簡訊、發送 email (這不可回滾) |
| 4. Debug 與追蹤困難:Orchestration 還可以接受,但在 Choreography (事件驅動) 故障時,排查成本很高 |
SAGA 模式具有 最終一致性、高複雜度 主要特性
因此適合的應用情境如下:
| 電商訂單流程 |
| 庫存預扣 |
| 遊戲資產(非銀行存款) |
| 訂單 + 通知 + 第三方 API |
| 微服務間跨 DB 協調 |
SAGA 模式具有 最終一致性、高複雜度 主要特性
因此不適合的應用情境如下:
| 核心金融帳務(強一致性要求) | 不允許任何時間點不一致,不允許補償式修復 |
| 無法設計補償交易的行為 | 實體商品已出貨 / 實體資產已轉移 (無法「補償」的狀況) |
| 業務不接受短暫不一致 | 股票撮合引擎、即時競價系統 (不能允許「先成功一半」) |
| 業務流程極度簡單 | 單資料(甚至 2 個資料庫)、業務極少、高一致性需求,那麼就不需花費太多成本實現 |
將範例代碼下載後,將代碼根目錄的 .sql 與 docker-compose.yml 放進 Ubuntu 的目錄下,如圖:
輸入以下指令,進行安裝
docker-compose up -d
確認 Mysql-Saga 容器(資料庫)有啟動:
此 dokcer-compose.yml 安裝腳本,安裝了以下內容:
| 1. 安裝 1 個 Mysql 資料庫,並且容器化啟動,密碼都為 password |
| 2. 容器 對宿主機 Port 為 3306(若有衝突要自行調整) |
| 3-1. 容器 安裝 Mysql 資料庫後,執行 init-Balance.sql 語法,實例一個 Balance 資料庫 |
| 3-2. 執行 init-Log.sql 語法,實例一個 Log 資料庫 |
| 3-3. 執行 init-Member.sql 語法,實例一個 Member 資料庫 |
version: '3.8'
services:
mysql-shop:
image: mysql:8.0
container_name: mysql-sage
environment:
MYSQL_ROOT_PASSWORD: password
MYSQL_DATABASE: Member
ports:
- "3016:3306"
volumes:
- ./mysql-bank-a-data:/var/lib/mysql
- ./init-Balance.sql:/docker-entrypoint-initdb.d/init-Balance.sql
- ./init-Log.sql:/docker-entrypoint-initdb.d/init-Log.sql
- ./init-Member.sql:/docker-entrypoint-initdb.d/init-Member.sql
command: --default-authentication-plugin=mysql_native_password
--innodb_lock_wait_timeout=120
--max_connections=1000
networks:
- saga-network
networks:
saga-network:
driver: bridge
預設資料庫有 Member , Log , Balance,工作內容與存在的資料表如下:
| 資料庫 | 表名稱 | 用途 |
| 1. Log | SagaTransaction | 用於生成 SagaId 的追蹤紀錄 |
| 2. Balance | AccountBalance | 用於紀錄用戶在系統的可用額度 |
| BalanceTransaction | 紀錄扣款成功的資料,並且保存必須資訊,用於補償機制 | |
| 3. Member | Member | 用戶個人資訊,並且統計用戶消費額度 |
| Product | 產品資訊,紀錄用戶購買產品時的價格 | |
| Purchase | 購買紀錄,並且保存必須資訊,用於補償機制 |
有預設資料的表包含 AccountBalance, Member, Product
※用於補償機制的表,都可以實現 SAGA 的核心 : 可重試、冪等性、狀態追蹤
※資料表欄位 Status 為了 Demo 說明才使用 Varchar 顯示,實務上請落實團隊規範(通常是 bool / enum / int 等)
目前我們環境架構如下,我們是在一家購物平台,用戶購買了一台電視,消費金額,公司將用戶的金流資訊都存於 Balance 資料庫
個人用戶資訊存於 Member 資料庫,於是我們實現了 Log 資料庫,實現 Saga 模式,完成分布式資料庫 :
購物流程與分布式資料庫工作
| 步驟 | 目的 | 資料傳遞 |
| 1. 用戶下訂單 | 對伺服器發送購物請求 | |
| 2. 伺服器處理 | 驗證資料正確,開始對資料庫 | |
| 3. 生成 SagaId | Log 庫,生成 1 筆 SagaId 表示最初的狀態 | 將 SagaId,傳遞下一步驟 |
| 4. 新增扣款紀錄 | Balance 庫,進行該用戶的金額扣款 | 將 SagaId、購物相關資訊,傳遞下一步驟 |
| 5. 新增消費紀錄 | Member 庫,新增消費紀錄,Member 統計用戶消費總額 | 將 SagaId、購物相關資訊,傳遞下一步驟 |
| 6. 標記完成 | Log 庫,將該筆 SagaId 標記完成 |
架構如圖:
在新增 SagaId 前,資料庫就異常,那麼就直接無視,因為尚未進入系統
新增 SagaId 後(Log),扣款時異常(Balance),這時將此筆 SagaId 標記異常, 直接取消,因為沒有真正扣款
扣款後(Balance),新增消費紀錄時異常(Member),因為有真正扣款,因此 需要將後續的,消費紀錄、用戶消費統計補齊
新增消費紀錄後(Member),更新 SagaId 標記前(Log),這時的 補償只要將該筆 SagaId 補上”COMPLETED”即可
核心代碼主要有 3 大部分
| 1. Framework.Database | : | 參考專案,實現 Mysql 的 UnitOfWork ,並實現 3 種資料庫的工廠方法庫 |
| 2. ShoppingCartService.cs | : | 主專案下,實現正常伺服器與用戶互動的購物主體流程,並且模擬分布式資料庫某個階段異常 |
| 3. QuarzJobForCompensatory.cs | : | 主專案下,實現主體流程某個階段異常時的補償機制 |
※代碼中的 DAO、DTO、Repository 實現等,因代碼龐大忽略介紹,只重點說明 Saga 實現方式。
將範例代碼下載後,並起且動專案
進入 HomeController.cs 後,有以下功能
| 1. FullSAGA() | 正常流程,每個庫(Log, Balance, Member)的交易都正常寫入,沒有異常 |
| 2. Step1SAGA() | 第一步執行前中斷 (未執行) |
| 3. Step2SAGA() | 第二步執行前中斷 (寫入 Log 結束) |
| 4. Step3SAGA() | 第三步執行前中斷 (寫入 Log + 扣款 Balance 表 結束) |
| 5. Step4SAGA() | 第四步執行前中斷 (寫入 Log + 扣款 Balance 表 + 本地 Member 表 結束) |
| 6. GetMemberShoppingData() | Mock (假資料),測試說明用,對應資料庫的 Member 庫的 Member 表會員資料 |
/// <summary>
/// 完整走完
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task<ActionResult> FullSAGA()
{
var resultData = GetMemberShoppingData();
await _shoppingCartService.Shoppinng(resultData);
return Ok("成功! 正常情況下,沒有任何異常的寫入 - SAGA (寫 3 種分散式庫都正常)");
}
/// <summary>
/// 第一步執行前中斷 (未執行)
/// </summary>
[HttpGet]
public async Task<ActionResult> Step1SAGA()
{
var resultData = GetMemberShoppingData();
await _shoppingCartService.Shoppinng(resultData, Enum.InterruptStepEnum.InterruptStep1);
return Ok();
}
/// <summary>
/// 第二步執行前中斷 (寫入 Log 結束)
/// </summary>
[HttpGet]
public async Task<ActionResult> Step2SAGA()
{
var resultData = GetMemberShoppingData();
await _shoppingCartService.Shoppinng(resultData, Enum.InterruptStepEnum.InterruptStep2);
return Ok();
}
/// <summary>
/// 第三步執行前中斷 (寫入 Log + 扣款 Balance 表 結束)
/// </summary>
[HttpGet]
public async Task<ActionResult> Step3SAGA()
{
var resultData = GetMemberShoppingData();
await _shoppingCartService.Shoppinng(resultData, Enum.InterruptStepEnum.InterruptStep3);
return Ok();
}
/// <summary>
/// 第四步執行前中斷 (寫入 Log + 扣款 Balance 表 + 本地 Member 表 結束)
/// </summary>
[HttpGet]
public async Task<ActionResult> Step4SAGA()
{
var resultData = GetMemberShoppingData();
await _shoppingCartService.Shoppinng(resultData, Enum.InterruptStepEnum.InterruptStep4);
return Ok();
}
/// <summary>
/// 取得用戶購買資料 - 範例使用,固定同個用戶
/// </summary>
private RequestModel GetMemberShoppingData()
{
// 假資料
return new RequestModel()
{
MemberId = 1,
ProductId = 1001,
Count = 1
};
}
HomeController.cs 執行後,會觸發 ShoppingCartService.cs ,代碼主要功能內容如下:
| 1. Shoppinng() | 正常流程,沒有異常,依序觸發 CreateLog() -> Deduction() -> UpdateMemberAndProudct() -> FinishLog() |
| 2. CreateLog() | 寫入Log庫,用於 第二步執行前中斷 (寫入 Log 結束) |
| 3. Deduction() | 寫入Balance庫,用於 第三步執行前中斷 (寫入 Log + 扣款 Balance 表 結束) |
| 4. UpdateMemberAndProudct() | 寫入Member庫,用於 第四步執行前中斷 (寫入 Log + 扣款 Balance 表 + 本地 Member 表 結束) |
| 5. FinishLog() | 最終一致性完成,標記此筆 SagaId 完成 |
| 5. GetProductInfo() | 取得產品資訊 |
| 6. GetMemberInfo() | 取得系統內會員資料 |
public class ShoppingCartService : IShoppingCartService
{
private readonly IUnitOfWorkFactory _uowFactory;
private readonly IUnitOfWorkAccessor _uowAccessor;
private readonly ILogger<ShoppingCartService> _logger;
private readonly IBalanceRepository _balanceRepository;
private readonly ILogRepository _logRepository;
private readonly IMemberRepository _memberRespository;
public ShoppingCartService(IUnitOfWorkFactory uowFactory,
IUnitOfWorkAccessor uowAccessor,
ILogger<ShoppingCartService> logger,
IBalanceRepository balanceRepository,
ILogRepository logRepository,
IMemberRepository memberRespository)
{
_uowFactory = uowFactory;
_uowAccessor = uowAccessor;
_logger = logger;
_balanceRepository = balanceRepository;
_logRepository = logRepository;
_memberRespository = memberRespository;
}
/// <summary>
/// 購物車 - 正常流程
/// </summary>
/// <param name="shoppingData">會員購物資料</param>
/// <param name="stepEnum">模擬中斷</param>
/// <returns></returns>
public async Task Shoppinng(RequestModel shoppingData, InterruptStepEnum stepEnum= InterruptStepEnum.None)
{
// 取得資料
var prdouctInfo = await GetProductInfo(shoppingData.ProductId);
var memberInfo = await GetMemberInfo(shoppingData.MemberId);
// 簡易驗證
if (memberInfo == null || prdouctInfo == null)
throw new Exception("傳入參數錯誤");
if (stepEnum == InterruptStepEnum.InterruptStep1)
throw new Exception("第一步執行前中斷 (未執行)");
// 1.Log DB:建立交易紀錄(Pending)
var sagaId = await CreateLog(shoppingData, prdouctInfo);
if (stepEnum == InterruptStepEnum.InterruptStep2)
throw new Exception("第二步執行前中斷 (寫入 Log 結束)");
// 2.Balance DB:扣款
await Deduction(shoppingData, prdouctInfo, sagaId);
if (stepEnum == InterruptStepEnum.InterruptStep3)
throw new Exception("第三步執行前中斷 (寫入 Log + 扣款 Balance 表 結束)");
// 3.Member DB:更新會員消費 / 訂單狀態
await UpdateMemberAndProudct(shoppingData, prdouctInfo, sagaId);
if (stepEnum == InterruptStepEnum.InterruptStep4)
throw new Exception("第四步執行前中斷(寫入 Log + 扣款 Balance 表 + 本地 Member 表 結束)");
// 4.Log DB:更新交易狀態 = Completed
await FinishLog(sagaId);
}
/// <summary>
/// 1. 建立交易紀錄 => DB : Log
/// </summary>
public async Task<string> CreateLog(RequestModel shoppingData, ProductDao product)
{
using var uow = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Log);
_uowAccessor.Current = uow;
var insertData = new SagaTransactionDao()
{
Amount = product.Price * shoppingData.Count,
MemberId = shoppingData.MemberId,
ProductId = product.ProductId,
Status = SagaTransactionStatusEnum.PENDING.ToString(),
};
var sagaId = await _logRepository.CreateLog(insertData);
return sagaId;
}
/// <summary>
/// 2. 扣款 => DB : Balance
/// </summary>
public async Task Deduction(RequestModel shoppingData, ProductDao product, string sagaId)
{
using var uow = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Balance);
_uowAccessor.Current = uow;
var totalCost = product.Price * shoppingData.Count;
try
{
await uow.BeginTransactionAsync();
await _balanceRepository.UpdateBalance(shoppingData.MemberId, totalCost);
await _balanceRepository.CreateBalanceTransaction(shoppingData.MemberId, totalCost, product.ProductId, sagaId);
await uow.CommitAsync();
}
catch (Exception ex)
{
Console.WriteLine(ex);
await uow.RollbackAsync();
}
}
/// <summary>
/// 3. 更新會員消費 / 訂單狀態 => DB : Member
/// </summary>
public async Task UpdateMemberAndProudct(RequestModel shoppingData, ProductDao product, string sagaId)
{
using var uow = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Member);
_uowAccessor.Current = uow;
var totalCost = product.Price * shoppingData.Count;
var insertData = new PurchaseDao()
{
SagaId = sagaId,
Amount = totalCost,
MemberId = shoppingData.MemberId,
ProductId = product.ProductId,
Status = SagaTransactionStatusEnum.COMPLETED.ToString(),
};
try
{
await uow.BeginTransactionAsync();
await _memberRespository.InsertPurchase(insertData);
await _memberRespository.UpdateMemberSpentMoney(shoppingData.MemberId, totalCost);
await uow.CommitAsync();
}
catch (Exception ex)
{
Console.WriteLine(ex);
await uow.RollbackAsync();
}
}
/// <summary>
// 4. 更新交易狀態 = Completed => DB : Log
/// </summary>
public async Task FinishLog(string sagaId)
{
using var uow = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Log);
_uowAccessor.Current = uow;
try
{
await _logRepository.UpdateLogStatus(sagaId, "COMPLETED");
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}
/// <summary>
/// 取得產品資訊
/// </summary>
public async Task<ProductDao> GetProductInfo(long productId)
{
using var uow = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Member);
_uowAccessor.Current = uow;
return await _memberRespository.GetProduct(productId);
}
/// <summary>
/// 取得系統內會員資料
/// </summary>
public async Task<MemberDao> GetMemberInfo(long memberId)
{
using var uow = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Member);
_uowAccessor.Current = uow;
return await _memberRespository.GetMember(memberId);
}
}
開啟 QuartzJobForCompensatory.cs 檔案後,這是由 Quartz.Net 觸發,每 5 分鐘進行一次,模擬主體流程,分布式資料庫各階段異常時如何補償
| 1. Execute() | 呼叫 GetSagaIdCompensatoryStatus() 找出需要補償的 異常 SagaId (Pending 狀態) |
| 2. GetSagaIdCompensatoryStatus() | 回傳異常狀態時間大於 5 分鐘 且 Pending 狀態的 SagaId |
| 3. CompensatoryNotDeduction() | 補償機制 -> 第二步執行前中斷 (寫入 Log 結束) -> 取消此筆訂單 |
| 對應第二部分 -> Step 7:主體架構異常點 - 扣款前 | |
| 4. CompensatoryNotPurchase() | 補償機制 -> 第三步執行前中斷 (寫入 Log + 扣款 Balance 表 結束) -> 繼續處理,因為已經扣錢 |
| 對應第二部分 -> Step 8:主體架構異常點 - 新增消費紀錄前 | |
| 5. CompensatorynotMarkComplate() | 補償機制 -> 第四步執行前中斷(寫入 Log + 扣款 Balance 表 + 本地 Member 表 結束) -> 繼續處理,已經完成只差在標記 |
| 對應第二部分 -> Step 9:主體架構異常點 - 更新標記前 |
public class QuartzJobForCompensatory : IJob
{
private readonly IUnitOfWorkFactory _uowFactory;
private readonly IUnitOfWorkAccessor _uowAccessor;
private readonly ILogger<QuartzJobForCompensatory> _logger;
private readonly IBalanceRepository _balanceRepository;
private readonly ILogRepository _logRepository;
private readonly IMemberRepository _memberRespository;
private readonly IShoppingCartService _shoppingCartService;
public QuartzJobForCompensatory(
IUnitOfWorkFactory uowFactory,
IUnitOfWorkAccessor uowAccessor,
ILogger<QuartzJobForCompensatory> logger,
IBalanceRepository balanceRepository,
ILogRepository logRepository,
IMemberRepository memberRespository,
IShoppingCartService shoppingCartService)
{
_uowFactory = uowFactory;
_uowAccessor = uowAccessor;
_logger = logger;
_balanceRepository = balanceRepository;
_logRepository = logRepository;
_memberRespository = memberRespository;
_shoppingCartService = shoppingCartService;
}
/// <summary>
/// 1. 實際執行
/// </summary>
public async Task Execute(IJobExecutionContext context)
{
var currentTiem = DateTime.Now;
var seqTime = currentTiem.ToString("yyyyMMddHHmmss");
Console.WriteLine("===== 開始處理 =====");
Console.WriteLine("時間紀錄 : " + currentTiem.ToString("yyyy/MM/dd HH:mm:ss") + " 流水時間序 : "+ seqTime);
// 1-1. 取出補償需處理的資料
var compensatoryInfo = await GetSagaIdCompensatoryStatus();
// 1-2. 補償1 : 捨棄未扣款的資料 - 影響 Log 庫
if (compensatoryInfo.notDeductionSagaIds.Any())
{
await CompensatoryNotDeduction(compensatoryInfo.notDeductionSagaIds);
Console.WriteLine($@"[{seqTime}] 補償1 : SagaId : {string.Join(", " , compensatoryInfo.notDeductionSagaIds)}");
}
// 1-3. 補償2 : 繼續將 Balance 已扣款資料統計到 Purchase - 影響 Member 庫
if (compensatoryInfo.notPurchaseSagaIds.Any())
{
await CompensatoryNotPurchase(compensatoryInfo.notPurchaseSagaIds);
Console.WriteLine($@"[{seqTime}] 補償2 : SagaId : {string.Join(", ", compensatoryInfo.notPurchaseSagaIds)}");
}
// 1-4. 補償3 : 將尚未標記 Complate 的 SagaId 處理完成 - 影響 Log 庫
// 標記要包含本次有執行 (寫入 Log + 扣款 Balance 表 結束) 的資料
var executeMarkIds = compensatoryInfo.notMarkComplateSagaIds;
executeMarkIds.AddRange(compensatoryInfo.notPurchaseSagaIds);
if (executeMarkIds.Any())
{
await CompensatorynotMarkComplate(executeMarkIds);
Console.WriteLine($@"[{seqTime}] 補償3 : SagaId : {string.Join(", ", executeMarkIds)}");
}
Console.WriteLine("===== 結束處理 =====");
Console.WriteLine("");
// 上述補償 1~3 每個都是獨立的庫 EX:
// 1. 當補償2 有 SagaId = 'A' 完成寫入 Member 庫後,這時發生中斷
// 2. 下次服務再次觸發,會發現只差標記 Log 已完成,會直接進入 補償3
// ※ 補償1 直接捨棄,若上次未執行,則下次服務觸發仍會在補償1 讓對應 SagaId 標記 FAILED
}
/// <summary>
/// 1-1. 取得補償狀態
/// </summary>
private async Task<CompensatoryDto> GetSagaIdCompensatoryStatus()
{
var reuslt = new CompensatoryDto();
using var uow = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Log);
_uowAccessor.Current = uow;
try
{
var getPendingSagaIds = await GetOverTimeSagaId();
// 流程中尚需 : 扣款 + 購買統計 + 標記完成 (取消整筆訂單)
reuslt.notDeductionSagaIds = await GetNotDeductionSagaIds(getPendingSagaIds);
// 流程中尚需 : 購買統計 + 標記完成 (繼續處理,因為已經扣錢)
reuslt.notPurchaseSagaIds = await GetNotPurchaseSagaIds(getPendingSagaIds.Except(reuslt.notDeductionSagaIds)
.ToList());
// 流程中尚需 : 標記完成 (繼續處理,已經完成只差在標記)
reuslt.notMarkComplateSagaIds = getPendingSagaIds.Except(reuslt.notDeductionSagaIds)
.Except(reuslt.notPurchaseSagaIds)
.ToList();
}
catch (Exception ex)
{
Console.WriteLine(ex);
throw;
}
return reuslt;
/// 取出超時要處理的資料 - SagaId + Status
async Task<List<string>> GetOverTimeSagaId()
{
var getResult = new List<string>();
var sql = $@"
SELECT SagaId
FROM SagaTransaction
WHERE SagaTransaction.STATUS = 'PENDING'
AND NOW() > DATE_ADD(CreatedAt, INTERVAL 5 MINUTE)
;";
try
{
getResult = (await uow.Connection.QueryAsync<string>(sql)).ToList();
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
return getResult;
}
// 取得尚未執行扣款的資料
async Task<List<string>> GetNotDeductionSagaIds(List<string> sourcePendingSagaIds)
{
using var uowb = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Balance);
_uowAccessor.Current = uowb;
var missingIds = new List<string>();
var sql = $@"
SELECT SagaId
FROM BalanceTransaction
WHERE sagaId IN @SagaIds
;";
try
{
var getResult = (await uowb.Connection.QueryAsync<string>(sql, new
{
@SagaIds = sourcePendingSagaIds
})).ToList();
// 排除法
missingIds = sourcePendingSagaIds.Except(getResult).ToList();
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
return missingIds;
}
// 取得尚未執行統計-添加到 Purchase 的資料
async Task<List<string>> GetNotPurchaseSagaIds(List<string> sourcePendingSagaIds)
{
using var uowm = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Member);
_uowAccessor.Current = uowm;
var missingIds = new List<string>();
var sql = $@"
SELECT SagaId
FROM Purchase
WHERE sagaId IN @SagaIds
;";
try
{
var getResult = (await uowm.Connection.QueryAsync<string>(sql, new
{
@SagaIds = sourcePendingSagaIds
})).ToList();
// 排除法
missingIds = sourcePendingSagaIds.Except(getResult).ToList();
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
return missingIds;
}
}
/// <summary>
/// 1-2. 補償機制 -> 第二步執行前中斷 (寫入 Log 結束) -> 取消此筆訂單
/// </summary>
/// <returns></returns>
private async Task CompensatoryNotDeduction(List<string> notDeductionSagaIds)
{
// 因為在 Balance 表中並未建立對應資料,沒有對實際的錢扣款,因此取消後續流程
using var uow = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Log);
_uowAccessor.Current = uow;
try
{
await uow.BeginTransactionAsync();
// 範例說明每筆皆會處理,實際上應改用 Bulk Update SQL 語法
foreach (var sagaId in notDeductionSagaIds)
{
await _logRepository.UpdateLogStatus(sagaId, "FAILED");
}
await uow.CommitAsync();
}
catch (Exception ex)
{
Console.WriteLine(ex);
await uow.RollbackAsync();
}
}
/// <summary>
/// 1-3. 補償機制 -> 第三步執行前中斷 (寫入 Log + 扣款 Balance 表 結束) -> 繼續處理,因為已經扣錢
/// </summary>
/// <returns></returns>
private async Task CompensatoryNotPurchase(List<string> notPurchaseSagaIds)
{
using var uowb = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Balance);
_uowAccessor.Current = uowb;
var collectionItems = (await _balanceRepository.GetBalanceTransactionItems(notPurchaseSagaIds)).ToList();
// 1. 繼續處理,因為已經扣錢
using var uowm = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Member);
_uowAccessor.Current = uowm;
// 2. 處理 Member 庫
try
{
await uowm.BeginTransactionAsync();
// 範例說明每筆皆會處理,實際上應改用 Bulk Update SQL 語法
foreach (var sagaId in notPurchaseSagaIds)
{
var processItem = collectionItems.FirstOrDefault(item => item.SagaId == sagaId);
if (processItem != null)
{
var insertData = new PurchaseDao()
{
SagaId = sagaId,
Amount = processItem.Amount,
MemberId = processItem.MemberId,
ProductId = processItem.ProductId,
Status = SagaTransactionStatusEnum.COMPLETED.ToString(),
};
await _memberRespository.InsertPurchase(insertData);
await _memberRespository.UpdateMemberSpentMoney(insertData.MemberId, insertData.Amount);
}
}
await uowm.CommitAsync();
}
catch (Exception ex)
{
Console.WriteLine(ex);
await uowm.RollbackAsync();
}
}
/// <summary>
/// 1-4. 補償機制 -> 第四步執行前中斷(寫入 Log + 扣款 Balance 表 + 本地 Member 表 結束) -> 繼續處理,已經完成只差在標記
/// </summary>
/// <returns></returns>
private async Task CompensatorynotMarkComplate(List<string> notMarkComplateSagaIds)
{
// 1. 繼續處理,已經完成只差在標記
using var uow = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Log);
_uowAccessor.Current = uow;
try
{
await uow.BeginTransactionAsync();
// 2. 範例說明每筆皆會處理,實際上應改用 Bulk Update SQL 語法
foreach (var sagaId in notMarkComplateSagaIds)
{
await _logRepository.UpdateLogStatus(sagaId, "COMPLETED");
}
await uow.CommitAsync();
}
catch (Exception ex)
{
Console.WriteLine(ex);
await uow.RollbackAsync();
}
}
}
啟動後,有以下 5 個按鈕
| 正常情況下,沒有任何異常的寫入 |
| 1. 第一步執行前中斷 (未執行) |
| 2. 第二步執行前中斷 (寫入 Log 結束) |
| 3. 第三步執行前中斷 (寫入 Log + 扣款 Balance 表 結束) |
| 4. 第四步執行前中斷(寫入 Log + 扣款 Balance 表 + 本地 Member 表 結束) |
執行後,會完整生成一筆資訊, SagaId 會一致於 Log 庫 -> Balance 庫 -> Member 庫
執行後,因為未寫入 Log 資料庫,沒有 SagaId 因此,後續動作都執行無視,視為 Server 端與用戶交互異常
沒有真正進入資料業務寫入
如圖,Log 庫,未生成 SagaId:
執行後,有一筆 SagaId 在 Log 庫中,但是補償服務,檢查確認 沒有扣款
最終將此筆 SagaId 取消,設定為 FAILED
執行後,補償服務,檢查確認 有一筆 SagaId 在 Log 庫中 + 並且 Balance 庫該筆 SagaId 已經扣款會員金額 + ** Member 庫的 Purchase 沒有 SagaId 的消費紀錄**
因為已經有扣款,那麼合理的補償業務邏輯就是繼續完成後續工作 (寫 Member 消費紀錄 + 寫 Log 庫的標記更新)
※此階段若 Member 消費紀錄成功,但是 Log 庫寫入失敗(異常),也不用擔心,會在下一個階段的補償處理, 實現可重試性
最終將此筆 SagaId 在 Member 庫的 Purchase 新增資料,並且更新 Member 庫的 Member 表統計消費金額,並且將 Log 庫的 SagaId 標記為 “COMPLETED”
執行後,補償服務,檢查確認,三個庫的 SagaId 都有相關資訊存在,並且有處理,那麼只需要標記為 “COMPLETED”
最終將此筆 SagaId 於 Log 庫標記為 “COMPLETED”
至此,完成所有資料庫異常的處理 (補償機制)