首頁

目前文章總數:222 篇

  

最後更新:2026年 03月 07日

0002. 資料庫分散式架構 - 實現最終一致性的 SAGA 模式(Saga Pattern)- C# 範例並搭配 MySQL 分散式資料庫實作 Orchestration(中央協調式)

日期:2026年 03月 07日

標籤: Docker Docker-Compose Container Ubuntu Linux MySQL Asp.NET Core Web MVC Quartz.NET

摘要:資料庫分散式架構


應用所需:1. Visual Studio 2022 以上
     2. Mysql Database 8.0 以上並且有 2 個資料庫實例(範例代碼提供 docker-compose.yml)
程式說明:分散式資料庫架構 - SAGA 模式說明,並搭配代碼實現補償機制,達成最終一致性
範例檔案:Githu連結
基本介紹:本篇分為 5 大部分。
     
第一部分:SAGA 介紹
第二部分:SAGA 範例架構設計 - 準備環境
第三部分:SAGA 範例實作 - 主體流程
第四部分:SAGA 範例實作 - 補償服務
第五部分:範例執行結果 DEMO






第一部分:SAGA 介紹

Step 1:為什麼需要 SAGA - 較高的效能

資料庫架構中,以下是 XA 與 SAGA 兩者的差異

比較項目 Saga XA(2PC)
核心概念 將大交易拆成多個本地交易 + 補償機制 使用兩階段提交(Prepare / Commit)確保全域一致性
一致性模型 最終一致性(Eventually Consistent) 強一致性(Strong Consistency)
交易鎖定時間 無長時間鎖定 會長時間鎖定資源(直到全域提交)
效能 較好(無全域鎖) 較差(阻塞式提交)
可用性 高(失敗可補償) 較低(Coordinator 故障可能阻塞)
複雜度 業務邏輯複雜(需設計補償) 技術複雜度高(需支援 XA protocol)
對資料庫要求 不需要特別支援 需要資料庫支援 XA
適合架構 微服務架構 傳統單體 / 強一致性金融場景
失敗處理 透過補償交易回滾 透過全域回滾
可擴展性 較低(Coordinator 成為瓶頸)
併發能力 低(鎖競爭嚴重)
業務侵入性 高(需寫補償邏輯) 低(由交易管理器處理)


對比之下,若需要 分散式資料庫 + 效能高 + 非金融 + 開發時間充裕 ,那麼 SAGA 很適合考慮引用


Step 2:XA 與 SAGA 應用情境

在應用情境下,若非單式體的小型專案,那麼主流大多是 Saga 模式,解決分布式資料庫存儲問題

情境 建議
微服務架構 Saga
高併發電商 Saga
金融核心帳務 XA
跨多資料庫強一致 XA
需要高可用與水平擴展 Saga
Saga = 用補償換取高可用與高擴展
XA = 用鎖與阻塞換取強一致性



Step 3:實現 SAGA 的成本

Saga 模式實現的成本為以下 3 點:

成本要點 說明 補充
1. 補償交易設計 - 要對業務透徹理解才能實現補償機制 對系統的業務邏輯要完整理解,若一知半解會導致主體流程在補償中異常卡住
  - 不是所有行為都可逆 發送 Email 不可補償 / 扣庫存可以補償(加回)/ 金流請款要增加退款流程
2. 冪等設計 每個步驟都必須擁有冪等性設計 1. 可重試
    2. 不可重複執行造成副作用
    3. 能根據 SagaId / BusinessId 判斷是否已完成
3. 過渡狀態管理 中間狀態管理 Order = Created / Payment = Processing / Inventory = Reserved …
    每個狀態碼都需知道處理行為(呼應需理解系統的業務邏輯)


因此對產品系統沒有一定程度的理解,是不容易 設計出正確的 Saga 模型

Step 4:Saga 設計的責任歸屬

Saga 需要深度理解系統業務,而且比 XA 還要求高,因此 SAGA 模式,困難的地方是 複雜度提高(開發時間增加)
以下是 XA 與 Saga 的責任區分

項目 XA Saga
一致性 DB 保證 你自己保證
回滾 DB rollback 你設計補償
錯誤處理 交易失敗就 rollback 你要判斷「現在走到哪一步」
冪等性 DB 幫你處理 每個步驟都要自己設計


簡言之:

SAGA => 把「資料庫原本做的事」全部拿回來自己做。


開發團隊成員若以下其中一項沒有實現,就會造成嚴重的災難
※極有可能出現代碼都是對的,但常常出現 最終沒有一致性

1. 補償設計錯
2. 狀態機設計錯
3. 邊界條件沒考慮
4. 少一個冪等檢查
5. 沒考慮重試 + Timeout + Crash



Step 5:Saga 的 2 種實作方式

Saga 模型的實作方式有 2 種,分別是 Orchestration(中央協調式)、 Choreography(事件驅動式)
本偏代碼範例說明實現的是 Orchestration(中央協調式),以下是 2 者的差異:

比較項目 Choreography(事件驅動式) Orchestration(中央協調式)
核心概念 各服務透過事件自行決定下一步 由中央 Orchestrator 控制整個流程
流程控制者 沒有中央控制者 有一個 Saga Orchestrator
控制方式 Event-driven(發布 / 訂閱) Command-driven(下達指令)
耦合度 服務之間隱性耦合 與 Orchestrator 顯性耦合
可讀性 流程分散,較難看出全貌 流程集中,較清楚
擴展性 高(新增服務只需訂閱事件) 中等(需修改 Orchestrator)
除錯難度 高(流程散落各服務) 較低(集中在協調者)
業務流程複雜度 適合簡單流程 適合複雜流程
狀態管理 分散在各服務 集中管理 Saga 狀態
失敗處理 透過事件觸發補償 Orchestrator 負責補償流程
交易可視性 低(需追蹤多個事件) 高(可追蹤 SagaId 全流程)
維護成本 流程複雜時成本高 Orchestrator 可能成為複雜核心
單點風險 無單點 Orchestrator 可能成為單點(需 HA)
適合團隊成熟度 高(需良好事件設計能力) 中高(需架構治理能力)
常見搭配技術 Kafka / RabbitMQ 等 MQ 工作流引擎 / 自建 Saga Coordinator


簡言之:若是想要系統可維護性高,未來複雜度低、可適用擴展的大型專案,那麼推薦 Orchestration(中央協調式) 開始。

Step 6:Orchestration、Choreography 評估選擇情境

Saga 模型的實作方式有 2 種,分別是 Orchestration(中央協調式)、 Choreography(事件驅動式)
本偏代碼範例說明實現的是 Orchestration(中央協調式),以下是 2 者的差異:

情境 建議
流程簡單(下單 → 扣庫存 → 扣款) Choreography
流程複雜(多分支、多補償、多條件) Orchestration
團隊缺乏整體流程掌握能力 Orchestration
極度去中心化架構 Choreography
需要高度可觀察性與流程追蹤 Orchestration


Choreography: 是最佳的解耦合,但難於維護
Orchestration: 是最佳的可維護性,但中心服務會導致系統有高耦合

Step 7:SAGA - 優缺點

SAGA 模式的優點如下,實現 最終一致性
優點結論:高效能

1. 不需要全域鎖(無 2PC):不會長時間鎖住多個資料庫,適用於高併發、避免 XA 阻塞問題,並且不會因單點卡死而整筆交易卡住
2. 高可用、抗故障能力強:某個服務暫時掛掉, 不會讓整個系統停擺可以重試可以補償
3. 微服務架構:每個服務擁有自己的資料庫、不需要跨 DB 事務、不強耦合
4. 可橫向擴展:基於本地交易、非同步、最終一致性,因此更容易 scale out



並遺毒的缺點如下:
缺點結論:高複雜度

1. 系統複雜度高:需要設計補償交易、Saga Log、重試機制、冪等性
2. 中間狀態不一致:在流程中出現處於某種狀態 EX: A 成功 / B 失敗 / 補償尚未執行
3. 補償不一定能完全還原:補償交易 ≠ 回滾。 EX: 發送簡訊、發送 email (這不可回滾)
4. Debug 與追蹤困難:Orchestration 還可以接受,但在 Choreography (事件驅動) 故障時,排查成本很高



Step 8: SAGA - 適合應用情境

SAGA 模式具有 最終一致性高複雜度 主要特性
因此適合的應用情境如下:

電商訂單流程
庫存預扣
遊戲資產(非銀行存款)
訂單 + 通知 + 第三方 API
微服務間跨 DB 協調



Step 9: SAGA - 不可用情境

SAGA 模式具有 最終一致性高複雜度 主要特性
因此不適合的應用情境如下:

核心金融帳務(強一致性要求) 不允許任何時間點不一致,不允許補償式修復
無法設計補償交易的行為 實體商品已出貨 / 實體資產已轉移 (無法「補償」的狀況)
業務不接受短暫不一致 股票撮合引擎、即時競價系統 (不能允許「先成功一半」)
業務流程極度簡單 單資料(甚至 2 個資料庫)、業務極少、高一致性需求,那麼就不需花費太多成本實現





第二部分:SAGA 範例架構設計 - 準備環境

Step 1:Docker Compose - 下載

範例代碼下載後,將代碼根目錄的 .sql 與 docker-compose.yml 放進 Ubuntu 的目錄下,如圖:




Step 2:Docker Compos - 安裝Mysql

輸入以下指令,進行安裝

docker-compose up -d




確認 Mysql-Saga 容器(資料庫)有啟動:


Step 3:Docker Compos - 內容說明

此 dokcer-compose.yml 安裝腳本,安裝了以下內容:

1. 安裝 1 個 Mysql 資料庫,並且容器化啟動,密碼都為 password
2. 容器 對宿主機 Port 為 3306(若有衝突要自行調整)
3-1. 容器 安裝 Mysql 資料庫後,執行 init-Balance.sql 語法,實例一個 Balance 資料庫
3-2. 執行 init-Log.sql 語法,實例一個 Log 資料庫
3-3. 執行 init-Member.sql 語法,實例一個 Member 資料庫
version: '3.8'

services:
  mysql-shop:
    image: mysql:8.0
    container_name: mysql-sage
    environment:
      MYSQL_ROOT_PASSWORD: password
      MYSQL_DATABASE: Member
    ports:
      - "3016:3306"
    volumes:
      - ./mysql-bank-a-data:/var/lib/mysql
      - ./init-Balance.sql:/docker-entrypoint-initdb.d/init-Balance.sql
      - ./init-Log.sql:/docker-entrypoint-initdb.d/init-Log.sql
      - ./init-Member.sql:/docker-entrypoint-initdb.d/init-Member.sql
    command: --default-authentication-plugin=mysql_native_password
            --innodb_lock_wait_timeout=120
            --max_connections=1000
    networks:
      - saga-network

networks:
  saga-network:
    driver: bridge



Step 4:資料庫結構

預設資料庫有 Member , Log , Balance,工作內容與存在的資料表如下:

資料庫 表名稱 用途
1. Log SagaTransaction 用於生成 SagaId 的追蹤紀錄
2. Balance AccountBalance 用於紀錄用戶在系統的可用額度
  BalanceTransaction 紀錄扣款成功的資料,並且保存必須資訊,用於補償機制
3. Member Member 用戶個人資訊,並且統計用戶消費額度
  Product 產品資訊,紀錄用戶購買產品時的價格
  Purchase 購買紀錄,並且保存必須資訊,用於補償機制


有預設資料的表包含 AccountBalance, Member, Product
※用於補償機制的表,都可以實現 SAGA 的核心 : 可重試、冪等性、狀態追蹤
※資料表欄位 Status 為了 Demo 說明才使用 Varchar 顯示,實務上請落實團隊規範(通常是 bool / enum / int 等)


Step 5:範例情境說明 - 主體架構

目前我們環境架構如下,我們是在一家購物平台,用戶購買了一台電視,消費金額,公司將用戶的金流資訊都存於 Balance 資料庫
個人用戶資訊存於 Member 資料庫,於是我們實現了 Log 資料庫,實現 Saga 模式,完成分布式資料庫 :
購物流程與分布式資料庫工作

步驟 目的 資料傳遞
1. 用戶下訂單 對伺服器發送購物請求  
2. 伺服器處理 驗證資料正確,開始對資料庫  
3. 生成 SagaId Log 庫,生成 1 筆 SagaId 表示最初的狀態 將 SagaId,傳遞下一步驟
4. 新增扣款紀錄 Balance 庫,進行該用戶的金額扣款 將 SagaId、購物相關資訊,傳遞下一步驟
5. 新增消費紀錄 Member 庫,新增消費紀錄,Member 統計用戶消費總額 將 SagaId、購物相關資訊,傳遞下一步驟
6. 標記完成 Log 庫,將該筆 SagaId 標記完成  


架構如圖:


Step 6:主體架構異常點 - 生成 SagaId 前

在新增 SagaId 前,資料庫就異常,那麼就直接無視,因為尚未進入系統


Step 7:主體架構異常點 - 扣款前

新增 SagaId 後(Log),扣款時異常(Balance),這時將此筆 SagaId 標記異常, 直接取消,因為沒有真正扣款


Step 8:主體架構異常點 - 新增消費紀錄前

扣款後(Balance),新增消費紀錄時異常(Member),因為有真正扣款,因此 需要將後續的,消費紀錄、用戶消費統計補齊


Step 9:主體架構異常點 - 更新標記前

新增消費紀錄後(Member),更新 SagaId 標記前(Log),這時的 補償只要將該筆 SagaId 補上”COMPLETED”即可


Step 10:範例專案說明

核心代碼主要有 3 大部分

1. Framework.Database 參考專案,實現 Mysql 的 UnitOfWork ,並實現 3 種資料庫的工廠方法庫
2. ShoppingCartService.cs 主專案下,實現正常伺服器與用戶互動的購物主體流程,並且模擬分布式資料庫某個階段異常
3. QuarzJobForCompensatory.cs 主專案下,實現主體流程某個階段異常時的補償機制


※代碼中的 DAO、DTO、Repository 實現等,因代碼龐大忽略介紹,只重點說明 Saga 實現方式。




第三部分:SAGA 範例實作 - 主體流程

Step 1:主體代碼 - 控制器

範例代碼下載後,並起且動專案
進入 HomeController.cs 後,有以下功能

1. FullSAGA() 正常流程,每個庫(Log, Balance, Member)的交易都正常寫入,沒有異常
2. Step1SAGA() 第一步執行前中斷 (未執行)
3. Step2SAGA() 第二步執行前中斷 (寫入 Log 結束)
4. Step3SAGA() 第三步執行前中斷 (寫入 Log + 扣款 Balance 表 結束)
5. Step4SAGA() 第四步執行前中斷 (寫入 Log + 扣款 Balance 表 + 本地 Member 表 結束)
6. GetMemberShoppingData() Mock (假資料),測試說明用,對應資料庫的 Member 庫的 Member 表會員資料
/// <summary>
/// 完整走完
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task<ActionResult> FullSAGA()
{
    var resultData = GetMemberShoppingData();
    await _shoppingCartService.Shoppinng(resultData);
    return Ok("成功! 正常情況下,沒有任何異常的寫入 - SAGA (寫 3 種分散式庫都正常)");
   
}
/// <summary>
/// 第一步執行前中斷 (未執行)
/// </summary>
[HttpGet]
public async Task<ActionResult> Step1SAGA()
{
    var resultData = GetMemberShoppingData();
    await _shoppingCartService.Shoppinng(resultData, Enum.InterruptStepEnum.InterruptStep1);
    return Ok();
}
/// <summary>
/// 第二步執行前中斷 (寫入 Log 結束)
/// </summary>
[HttpGet]
public async Task<ActionResult> Step2SAGA()
{
    var resultData = GetMemberShoppingData();
    await _shoppingCartService.Shoppinng(resultData, Enum.InterruptStepEnum.InterruptStep2);
    return Ok();
}
/// <summary>
/// 第三步執行前中斷 (寫入 Log + 扣款 Balance 表 結束)
/// </summary>
[HttpGet]
public async Task<ActionResult> Step3SAGA()
{
    var resultData = GetMemberShoppingData();
    await _shoppingCartService.Shoppinng(resultData, Enum.InterruptStepEnum.InterruptStep3);
    return Ok();
}
/// <summary>
/// 第四步執行前中斷 (寫入 Log + 扣款 Balance 表 + 本地 Member 表 結束)
/// </summary>
[HttpGet]
public async Task<ActionResult> Step4SAGA()
{
    var resultData = GetMemberShoppingData();
    await _shoppingCartService.Shoppinng(resultData, Enum.InterruptStepEnum.InterruptStep4);
    return Ok();
}
/// <summary>
/// 取得用戶購買資料 - 範例使用,固定同個用戶
/// </summary>
private RequestModel GetMemberShoppingData()
{
    // 假資料 
    return new RequestModel()
    {
        MemberId = 1,
        ProductId = 1001,
        Count = 1
    };
}




Step 2:主體代碼 - 控制器

HomeController.cs 執行後,會觸發 ShoppingCartService.cs ,代碼主要功能內容如下:

1. Shoppinng() 正常流程,沒有異常,依序觸發 CreateLog() -> Deduction() -> UpdateMemberAndProudct() -> FinishLog()
2. CreateLog() 寫入Log庫,用於 第二步執行前中斷 (寫入 Log 結束)
3. Deduction() 寫入Balance庫,用於 第三步執行前中斷 (寫入 Log + 扣款 Balance 表 結束)
4. UpdateMemberAndProudct() 寫入Member庫,用於 第四步執行前中斷 (寫入 Log + 扣款 Balance 表 + 本地 Member 表 結束)
5. FinishLog() 最終一致性完成,標記此筆 SagaId 完成
5. GetProductInfo() 取得產品資訊
6. GetMemberInfo() 取得系統內會員資料
public class ShoppingCartService : IShoppingCartService
{
    private readonly IUnitOfWorkFactory _uowFactory;
    private readonly IUnitOfWorkAccessor _uowAccessor;
    private readonly ILogger<ShoppingCartService> _logger;
    private readonly IBalanceRepository _balanceRepository;
    private readonly ILogRepository _logRepository;
    private readonly IMemberRepository _memberRespository;

    public ShoppingCartService(IUnitOfWorkFactory uowFactory,
        IUnitOfWorkAccessor uowAccessor,
        ILogger<ShoppingCartService> logger,
        IBalanceRepository balanceRepository,
        ILogRepository logRepository,
        IMemberRepository memberRespository)
    {
        _uowFactory = uowFactory;
        _uowAccessor = uowAccessor;
        _logger = logger;
        _balanceRepository = balanceRepository;
        _logRepository = logRepository;
        _memberRespository = memberRespository;
    }

    /// <summary>
    /// 購物車 - 正常流程
    /// </summary>
    /// <param name="shoppingData">會員購物資料</param>
    /// <param name="stepEnum">模擬中斷</param>
    /// <returns></returns>
    public async Task Shoppinng(RequestModel shoppingData, InterruptStepEnum stepEnum= InterruptStepEnum.None)
        {
            // 取得資料
            var prdouctInfo = await GetProductInfo(shoppingData.ProductId);
            var memberInfo = await GetMemberInfo(shoppingData.MemberId);

            // 簡易驗證
            if (memberInfo == null || prdouctInfo == null)
                throw new Exception("傳入參數錯誤");

            if (stepEnum == InterruptStepEnum.InterruptStep1)
                throw new Exception("第一步執行前中斷 (未執行)");
            // 1.Log DB:建立交易紀錄(Pending)
            var sagaId = await CreateLog(shoppingData, prdouctInfo);


            if (stepEnum == InterruptStepEnum.InterruptStep2)
                throw new Exception("第二步執行前中斷 (寫入 Log 結束)");
            // 2.Balance DB:扣款
            await Deduction(shoppingData, prdouctInfo, sagaId);


            if (stepEnum == InterruptStepEnum.InterruptStep3)
                throw new Exception("第三步執行前中斷 (寫入 Log + 扣款 Balance 表 結束)");
            // 3.Member DB:更新會員消費 / 訂單狀態
            await UpdateMemberAndProudct(shoppingData, prdouctInfo, sagaId);
            

            if (stepEnum == InterruptStepEnum.InterruptStep4)
                throw new Exception("第四步執行前中斷(寫入 Log + 扣款 Balance 表 + 本地 Member 表 結束)");
            // 4.Log DB:更新交易狀態 = Completed

            await FinishLog(sagaId);
        }

    /// <summary>
    /// 1. 建立交易紀錄 => DB : Log
    /// </summary>
    public async Task<string> CreateLog(RequestModel shoppingData, ProductDao product)
        {
            using var uow = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Log);            
            _uowAccessor.Current = uow;

            var insertData = new SagaTransactionDao() 
            {
                 Amount = product.Price * shoppingData.Count,
                 MemberId = shoppingData.MemberId,
                 ProductId = product.ProductId,
                 Status = SagaTransactionStatusEnum.PENDING.ToString(),                  
            };
            var sagaId = await _logRepository.CreateLog(insertData);

            return sagaId;
        }

    /// <summary>
    /// 2. 扣款 => DB : Balance
    /// </summary>
    public async Task Deduction(RequestModel shoppingData, ProductDao product, string sagaId)
        {            
            using var uow = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Balance);
            _uowAccessor.Current = uow;
            var totalCost = product.Price * shoppingData.Count;
            try
            {
                await uow.BeginTransactionAsync();

                await _balanceRepository.UpdateBalance(shoppingData.MemberId, totalCost);                
                
                await _balanceRepository.CreateBalanceTransaction(shoppingData.MemberId, totalCost, product.ProductId, sagaId);

                await uow.CommitAsync();

            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
                await uow.RollbackAsync();
            }

        }

    /// <summary>
    /// 3. 更新會員消費 / 訂單狀態 => DB : Member
    /// </summary>
    public async Task UpdateMemberAndProudct(RequestModel shoppingData, ProductDao product, string sagaId)
        {
            using var uow = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Member);
            _uowAccessor.Current = uow;
            var totalCost = product.Price * shoppingData.Count;

            var insertData = new PurchaseDao()
            {
                SagaId = sagaId,
                Amount = totalCost,
                MemberId = shoppingData.MemberId,
                ProductId = product.ProductId,
                Status = SagaTransactionStatusEnum.COMPLETED.ToString(),
            };

            try
            {
                await uow.BeginTransactionAsync();

                await _memberRespository.InsertPurchase(insertData);
                await _memberRespository.UpdateMemberSpentMoney(shoppingData.MemberId, totalCost);

                await uow.CommitAsync();

            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
                await uow.RollbackAsync();
            }

        }

    /// <summary>
    // 4. 更新交易狀態 = Completed => DB : Log
    /// </summary>
    public async Task FinishLog(string sagaId)
        {
            using var uow = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Log);
            _uowAccessor.Current = uow;

            try
            {
                await _logRepository.UpdateLogStatus(sagaId, "COMPLETED");
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
        }


    /// <summary>
    /// 取得產品資訊
    /// </summary>
    public async Task<ProductDao> GetProductInfo(long productId)
        {
            using var uow = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Member);            
            _uowAccessor.Current = uow;
            return await _memberRespository.GetProduct(productId);
        }

    /// <summary>
    /// 取得系統內會員資料
    /// </summary>
    public async Task<MemberDao> GetMemberInfo(long memberId)
        {
            using var uow = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Member);
            _uowAccessor.Current = uow;
            return await _memberRespository.GetMember(memberId);
        }
}



第四部分:SAGA 範例實作 - 補償服務

Step 1:補償服務 - 補償機制

開啟 QuartzJobForCompensatory.cs 檔案後,這是由 Quartz.Net 觸發,每 5 分鐘進行一次,模擬主體流程,分布式資料庫各階段異常時如何補償

1. Execute() 呼叫 GetSagaIdCompensatoryStatus() 找出需要補償的 異常 SagaId (Pending 狀態)
2. GetSagaIdCompensatoryStatus() 回傳異常狀態時間大於 5 分鐘 且 Pending 狀態的 SagaId
3. CompensatoryNotDeduction() 補償機制 -> 第二步執行前中斷 (寫入 Log 結束) -> 取消此筆訂單
  對應第二部分 -> Step 7:主體架構異常點 - 扣款前
4. CompensatoryNotPurchase() 補償機制 -> 第三步執行前中斷 (寫入 Log + 扣款 Balance 表 結束) -> 繼續處理,因為已經扣錢
  對應第二部分 -> Step 8:主體架構異常點 - 新增消費紀錄前
5. CompensatorynotMarkComplate() 補償機制 -> 第四步執行前中斷(寫入 Log + 扣款 Balance 表 + 本地 Member 表 結束) -> 繼續處理,已經完成只差在標記
  對應第二部分 -> Step 9:主體架構異常點 - 更新標記前
    public class QuartzJobForCompensatory : IJob
    {
        private readonly IUnitOfWorkFactory _uowFactory;
        private readonly IUnitOfWorkAccessor _uowAccessor;
        private readonly ILogger<QuartzJobForCompensatory> _logger;
        private readonly IBalanceRepository _balanceRepository;
        private readonly ILogRepository _logRepository;
        private readonly IMemberRepository _memberRespository;
        private readonly IShoppingCartService _shoppingCartService;

        public QuartzJobForCompensatory(
            IUnitOfWorkFactory uowFactory,
            IUnitOfWorkAccessor uowAccessor,
            ILogger<QuartzJobForCompensatory> logger,
            IBalanceRepository balanceRepository,
            ILogRepository logRepository,
            IMemberRepository memberRespository,
            IShoppingCartService shoppingCartService)
        {
            _uowFactory = uowFactory;
            _uowAccessor = uowAccessor;
            _logger = logger;
            _balanceRepository = balanceRepository;
            _logRepository = logRepository;
            _memberRespository = memberRespository;
            _shoppingCartService = shoppingCartService;
        }

        /// <summary>
        /// 1. 實際執行
        /// </summary>
        public async Task Execute(IJobExecutionContext context)
        {
            var currentTiem = DateTime.Now;
            var seqTime = currentTiem.ToString("yyyyMMddHHmmss");
            Console.WriteLine("===== 開始處理 =====");
            Console.WriteLine("時間紀錄 : " + currentTiem.ToString("yyyy/MM/dd HH:mm:ss") + " 流水時間序 : "+ seqTime);

            // 1-1. 取出補償需處理的資料
            var compensatoryInfo = await GetSagaIdCompensatoryStatus();

            // 1-2. 補償1 : 捨棄未扣款的資料 - 影響 Log 庫
            if (compensatoryInfo.notDeductionSagaIds.Any())
            {
                await CompensatoryNotDeduction(compensatoryInfo.notDeductionSagaIds);
                Console.WriteLine($@"[{seqTime}] 補償1 : SagaId : {string.Join(", " , compensatoryInfo.notDeductionSagaIds)}");
            }

            // 1-3. 補償2 : 繼續將 Balance 已扣款資料統計到 Purchase - 影響 Member 庫
            if (compensatoryInfo.notPurchaseSagaIds.Any())
            {
                await CompensatoryNotPurchase(compensatoryInfo.notPurchaseSagaIds);
                Console.WriteLine($@"[{seqTime}] 補償2 : SagaId : {string.Join(", ", compensatoryInfo.notPurchaseSagaIds)}");

            }

            // 1-4. 補償3 : 將尚未標記 Complate 的 SagaId 處理完成 - 影響 Log 庫
            // 標記要包含本次有執行 (寫入 Log + 扣款 Balance 表 結束) 的資料
            var executeMarkIds = compensatoryInfo.notMarkComplateSagaIds;
            executeMarkIds.AddRange(compensatoryInfo.notPurchaseSagaIds);
            if (executeMarkIds.Any())
            {
                await CompensatorynotMarkComplate(executeMarkIds);
                Console.WriteLine($@"[{seqTime}] 補償3  : SagaId : {string.Join(", ", executeMarkIds)}");
            }

            Console.WriteLine("===== 結束處理 =====");
            Console.WriteLine("");
            // 上述補償 1~3 每個都是獨立的庫 EX:
            // 1. 當補償2 有 SagaId = 'A' 完成寫入 Member 庫後,這時發生中斷
            // 2. 下次服務再次觸發,會發現只差標記 Log 已完成,會直接進入 補償3
            // ※ 補償1 直接捨棄,若上次未執行,則下次服務觸發仍會在補償1 讓對應 SagaId 標記 FAILED
        }

        /// <summary>
        /// 1-1. 取得補償狀態
        /// </summary>
        private async Task<CompensatoryDto> GetSagaIdCompensatoryStatus()
        {
            var reuslt = new CompensatoryDto();

            using var uow = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Log);
            _uowAccessor.Current = uow;
            try
            {
                var getPendingSagaIds = await GetOverTimeSagaId();

                // 流程中尚需 : 扣款 + 購買統計 + 標記完成 (取消整筆訂單)
                reuslt.notDeductionSagaIds = await GetNotDeductionSagaIds(getPendingSagaIds);

                // 流程中尚需 : 購買統計 + 標記完成 (繼續處理,因為已經扣錢)
                reuslt.notPurchaseSagaIds = await GetNotPurchaseSagaIds(getPendingSagaIds.Except(reuslt.notDeductionSagaIds)
                                                                                                       .ToList());
                // 流程中尚需 : 標記完成 (繼續處理,已經完成只差在標記)
                reuslt.notMarkComplateSagaIds = getPendingSagaIds.Except(reuslt.notDeductionSagaIds)
                                                                 .Except(reuslt.notPurchaseSagaIds)
                                                                 .ToList();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
                throw;
            }
            return reuslt;

            /// 取出超時要處理的資料 - SagaId + Status
            async Task<List<string>> GetOverTimeSagaId()
            {
                var getResult = new List<string>();
                var sql = $@"
SELECT SagaId 
  FROM SagaTransaction 
 WHERE SagaTransaction.STATUS = 'PENDING'
   AND NOW() > DATE_ADD(CreatedAt, INTERVAL 5 MINUTE)
;";
                try
                {
                    getResult = (await uow.Connection.QueryAsync<string>(sql)).ToList();
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                }
                return getResult;
            }

            // 取得尚未執行扣款的資料
            async Task<List<string>> GetNotDeductionSagaIds(List<string> sourcePendingSagaIds)
            {
                using var uowb = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Balance);
                _uowAccessor.Current = uowb;
                var missingIds = new List<string>();
                var sql = $@"
SELECT SagaId 
  FROM BalanceTransaction 
 WHERE sagaId IN @SagaIds
;";
                try
                {
                    var getResult = (await uowb.Connection.QueryAsync<string>(sql, new
                    {
                        @SagaIds = sourcePendingSagaIds
                    })).ToList();
                    // 排除法
                    missingIds = sourcePendingSagaIds.Except(getResult).ToList();
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                }

                return missingIds;
            }

            // 取得尚未執行統計-添加到 Purchase 的資料
            async Task<List<string>> GetNotPurchaseSagaIds(List<string> sourcePendingSagaIds)
            {
                using var uowm = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Member);
                _uowAccessor.Current = uowm;
                var missingIds = new List<string>();
                var sql = $@"
SELECT SagaId 
  FROM Purchase 
 WHERE sagaId IN @SagaIds
;";
                try
                {
                    var getResult = (await uowm.Connection.QueryAsync<string>(sql, new
                    {
                        @SagaIds = sourcePendingSagaIds
                    })).ToList();
                    // 排除法
                    missingIds = sourcePendingSagaIds.Except(getResult).ToList();
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                }
                return missingIds;
            }
        }

        /// <summary>
        /// 1-2. 補償機制 -> 第二步執行前中斷 (寫入 Log 結束) -> 取消此筆訂單
        /// </summary>
        /// <returns></returns>
        private async Task CompensatoryNotDeduction(List<string> notDeductionSagaIds)
        {
            // 因為在 Balance 表中並未建立對應資料,沒有對實際的錢扣款,因此取消後續流程
            using var uow = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Log);
            _uowAccessor.Current = uow;
            try
            {
                await uow.BeginTransactionAsync();

                // 範例說明每筆皆會處理,實際上應改用 Bulk Update SQL 語法
                foreach (var sagaId in notDeductionSagaIds)
                {
                    await _logRepository.UpdateLogStatus(sagaId, "FAILED");
                }

                await uow.CommitAsync();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
                await uow.RollbackAsync();
            }
        }

        /// <summary>
        /// 1-3. 補償機制 -> 第三步執行前中斷 (寫入 Log + 扣款 Balance 表 結束) -> 繼續處理,因為已經扣錢
        /// </summary>
        /// <returns></returns>
        private async Task CompensatoryNotPurchase(List<string> notPurchaseSagaIds)
        {
            using var uowb = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Balance);
            _uowAccessor.Current = uowb;
            var collectionItems = (await _balanceRepository.GetBalanceTransactionItems(notPurchaseSagaIds)).ToList();

            // 1. 繼續處理,因為已經扣錢
            using var uowm = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Member);
            _uowAccessor.Current = uowm;

            // 2. 處理 Member 庫
            try
            {
                
                await uowm.BeginTransactionAsync();

                // 範例說明每筆皆會處理,實際上應改用 Bulk Update SQL 語法
                foreach (var sagaId in notPurchaseSagaIds)
                {
                    var processItem = collectionItems.FirstOrDefault(item => item.SagaId == sagaId);

                    if (processItem != null)
                    {
                        var insertData = new PurchaseDao()
                        {
                            SagaId = sagaId,
                            Amount = processItem.Amount,
                            MemberId = processItem.MemberId,
                            ProductId = processItem.ProductId,
                            Status = SagaTransactionStatusEnum.COMPLETED.ToString(),
                        };

                        await _memberRespository.InsertPurchase(insertData);
                        await _memberRespository.UpdateMemberSpentMoney(insertData.MemberId, insertData.Amount);
                    }
                }

                await uowm.CommitAsync();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
                await uowm.RollbackAsync();
            }
        }

        /// <summary>
        /// 1-4. 補償機制 -> 第四步執行前中斷(寫入 Log + 扣款 Balance 表 + 本地 Member 表 結束) -> 繼續處理,已經完成只差在標記
        /// </summary>
        /// <returns></returns>
        private async Task CompensatorynotMarkComplate(List<string> notMarkComplateSagaIds)
        {
            // 1. 繼續處理,已經完成只差在標記
            using var uow = await _uowFactory.CreateAsync(MysqlDbConnectionEnum.Log);
            _uowAccessor.Current = uow;
            try
            {
                await uow.BeginTransactionAsync();

                // 2. 範例說明每筆皆會處理,實際上應改用 Bulk Update SQL 語法
                foreach (var sagaId in notMarkComplateSagaIds)
                {
                    await _logRepository.UpdateLogStatus(sagaId, "COMPLETED");
                }

                await uow.CommitAsync();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
                await uow.RollbackAsync();
            }
        }

    }





第五部分:範例執行結果 DEMO

Step 1:啟動專案

啟動後,有以下 5 個按鈕

正常情況下,沒有任何異常的寫入
1. 第一步執行前中斷 (未執行)
2. 第二步執行前中斷 (寫入 Log 結束)
3. 第三步執行前中斷 (寫入 Log + 扣款 Balance 表 結束)
4. 第四步執行前中斷(寫入 Log + 扣款 Balance 表 + 本地 Member 表 結束)




Step 2: Demo 執行按鈕 - 正常情況下,沒有任何異常的寫入

執行後,會完整生成一筆資訊, SagaId 會一致於 Log 庫 -> Balance 庫 -> Member 庫




Step 3:Demo 執行按鈕 - 第一步執行前中斷 (未執行)

執行後,因為未寫入 Log 資料庫,沒有 SagaId 因此,後續動作都執行無視,視為 Server 端與用戶交互異常
沒有真正進入資料業務寫入
如圖,Log 庫,未生成 SagaId:


Step 4:Demo 執行按鈕 - 第二步執行前中斷 (寫入 Log 結束) - 開始

執行後,有一筆 SagaId 在 Log 庫中,但是補償服務,檢查確認 沒有扣款


Step 5:Demo 執行按鈕 - 第二步執行前中斷 (寫入 Log 結束) - 補償方式:取消

最終將此筆 SagaId 取消,設定為 FAILED


Step 6:Demo 執行按鈕 - 第三步執行前中斷 (寫入 Log + 扣款 Balance 表 結束) - 開始

執行後,補償服務,檢查確認 有一筆 SagaId 在 Log 庫中 + 並且 Balance 庫該筆 SagaId 已經扣款會員金額 + ** Member 庫的 Purchase 沒有 SagaId 的消費紀錄**
因為已經有扣款,那麼合理的補償業務邏輯就是繼續完成後續工作 (寫 Member 消費紀錄 + 寫 Log 庫的標記更新)
※此階段若 Member 消費紀錄成功,但是 Log 庫寫入失敗(異常),也不用擔心,會在下一個階段的補償處理, 實現可重試性


Step 7:Demo 執行按鈕 - 第三步執行前中斷 (寫入 Log + 扣款 Balance 表 結束) - 補償方式:繼續完成

最終將此筆 SagaId 在 Member 庫的 Purchase 新增資料,並且更新 Member 庫的 Member 表統計消費金額,並且將 Log 庫的 SagaId 標記為 “COMPLETED”


Step 8:Demo 執行按鈕 - 第四步執行前中斷(寫入 Log + 扣款 Balance 表 + 本地 Member 表 結束) - 開始

執行後,補償服務,檢查確認,三個庫的 SagaId 都有相關資訊存在,並且有處理,那麼只需要標記為 “COMPLETED”


Step 9:Demo 執行按鈕 - 第四步執行前中斷(寫入 Log + 扣款 Balance 表 + 本地 Member 表 結束) - 補償方式:繼續完成

最終將此筆 SagaId 於 Log 庫標記為 “COMPLETED”
至此,完成所有資料庫異常的處理 (補償機制)