首頁

目前文章總數:182 篇

  

最後更新:2025年 05月 31日

0095. Kafka 水平擴展消費處理 - 主主架構(Multi-Master),如何實現與配置方式

日期:2025年 06月 14日

標籤: Docker Docker-Compose Container Ubuntu Linux Kafka Kafka UI Multi Master Service

摘要:C# 學習筆記


應用所需:1. Linux Ubuntu (本篇 22.04)
     2. 已安裝 Docker、Kafka、Kafka-UI
     3. Visual Studio 2022 以上版本
解決問題:1. 如何實現主主服務架構下,多個伺服器同時消費 Kafka 的訊息隊列,並且每個主服務處理相同的生產者群組隊列的訊息時,每個主服務不會重複處理
     2. 展示在主主服務架構下 Kafka 的 Consumer 如何自動實現負載均衡
相關參考:0089. Linux Ubuntu 上使用 Docker Compose 快速部署 Kafka
相關參考:0090. Kafka 開發指南:.NET Core 上 Kafka 事件流(消息隊列)推送
相關參考:0093. Kafka Kraft 模式部署與高可用性模擬測試(附完整 docker-compose)
範例專案:範例代碼
基本介紹:本篇分為 5 部分。
第一部分:Kafka - 主主服務架構
第二部分:Web專案架構
第三部分:代碼說明
第四部分:DEMO 成果 - 主主服務架構 - 不重複消費
第五部分:DEMO 成果 - 主主服務架構 - 自動負載均衡






第一部分:Kafka - 主主服務架構

Step 1:Kafka 適合主主架構簡介

有以下 Kafka 本身的特性,因此適合主主服務,其中最關鍵的是粗體字的部分。

1.Consumer Group 機制 自動負載均衡分配
  消息不會重複消費
  故障自動轉移
2. 分區機制 天然的並行處理能力
  每個分區只分配給一個消費者
  支持水平擴展
3. 偏移量管理 精確控制消息處理進度
  支持手動提交
  故障恢復不丟失消息



Step 2:主主服務的優缺點

主主服務的優點:

優點 說明
高資源利用率 所有節點都在工作
自動故障轉移 秒級恢復,無需手動干預
水平擴展 加節點就能提升性能
無單點故障 任何節點都可以處理業務
維護簡單 所有節點代碼相同


主主服務的缺點:

缺點 說明
分布式複雜性 需要考慮分區依賴
調試困難 分布式環境調試較複雜
網絡依賴 對網絡穩定性要求較高



Step 3:主主架構 與 主從架構 - 如何選擇

主主服務 主從服務
高並發處理需求 強一致性需求(如金融交易)
追求高可用性和快速恢復 業務邏輯複雜且有狀態依賴
系統負載較高,需要充分利用資源 系統負載較低,不需要高並發
可以接受最終一致性 團隊對分布式系統經驗較少
有分布式系統經驗的團隊 調試和維護優先考慮


簡單結論:高可用性、低成本、分布式場景,建議使用主主服務

Step 4:主主架構下的 - 生產者與消費者

1. 生產者 / 消費者 各自訂閱 Kafka ,生產者訂閱 Topic Name ; 消費者訂閱 Topic Name,配置共有的 Consumer Group
2. 生者者發送消息 生產者只關注 Topic 並發送給此消息的 Topic
3. 消費者監聽 並且 Consumer Group 接收 Topic 訊息,由 Kafka 自動分配給底下的消費者
4. 主服務消費 多個主服務可以在 Consumer 形成負載平衡,並且若某個主服務消費失敗,會由另一台主服務接手處理


C# Kafka 中 EnableAutoCommit 與 EnableAutoOffsetStore 參數需要設定為 false 才能實現手動處理



第二部分:Web專案架構

Step 1:範例專案架構

打開範例代碼後,架構基本分成以下:
備註:此專案有參考此專案代碼,因此不用重新實作 Kafka

1. Service - BackGround 背景服務,持續輪詢 Kafka 持續消費指定的 TopicName 並使用共同的 ConsumerGroup 進行消費訊息
2. Web 控制器 參考專案後,需將 HomeController 改名成另一個名稱,避免衝突
3. 初始化配置 基本的依賴注入,並且引用 Kafka Confluent 套件,並註冊背景服務




第三部分:代碼說明

Step 1: Service - BackGround

這是主主服務最核心部分:背景服務分成 2 部分,第一部分是執行背景工作 - 持續消費 ; 第二部分是關閉時的釋放資源
要實現主主服務架構,必須在 2-1. ~ 2-5. 設定好手動執行消費
然後 3-1. 才能讓主服務正常響應 Kafka 已經消費完畢。

public class KafkaMultiMasterConsumerHostedService : BackgroundService
{
    private readonly ILogger<KafkaMultiMasterConsumerHostedService> _logger;
    private readonly IKafkaConsumerService _kafkaConsumerService;
    private readonly KafkaConfigOptions _kafkaConfig;
    private IConsumer<string, string> _consumer;
    private readonly IConfiguration _configuration;
    private bool _isConnected = false;

    public KafkaMultiMasterConsumerHostedService(ILogger<KafkaMultiMasterConsumerHostedService> logger,
        IKafkaConsumerService kafkaConsumerService,
        IConfiguration configuration,
        IOptions<KafkaConfigOptions> kafkaConfigOptions)
    {
        _logger = logger;
        _kafkaConsumerService = kafkaConsumerService;
        _kafkaConfig = kafkaConfigOptions.Value;
        _configuration = configuration;
    }

    /// <summary>
    /// 一、持續執行的背景工作
    /// </summary>        
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        try
        {
            // 1. 記錄配置資訊
            var urls = _configuration["urls"] ?? string.Empty;
            _logger.LogInformation($"當前主服務位置: {urls}");
            _logger.LogInformation($"嘗試連接到 Kafka 伺服器: {_kafkaConfig.BootstrapServers}");
            _logger.LogInformation($"傾聽生產者的主題: {_kafkaConfig.TopicName}");
            _logger.LogInformation($"消費者群組多個主主都是用相同的: {_kafkaConfig.ConsumerGroupId}");
            
            
            // 2. 初始化 Kafka 消費者
            var consumerConfig = new ConsumerConfig
            {
                BootstrapServers = _kafkaConfig.BootstrapServers,
                GroupId = _kafkaConfig.ConsumerGroupId,
                AutoOffsetReset = AutoOffsetReset.Earliest,
                EnableAutoCommit = false,// 2-1. 自動提交 true:啟動 false:關閉,若想要代碼邏輯處理完成在自行設定為消費完成需要使用 false
                EnableAutoOffsetStore = false,   // 2-2 加入這行:關閉自動偏移量存儲 ,若不關閉,即使手動沒提交也會自動往前 Offset ,導致無法重新消費此筆訊息
                SocketTimeoutMs = 10000,
                RetryBackoffMs = 1000,
                SessionTimeoutMs = 6000,        // 2-3. Session Timeout 時間 6 秒
                HeartbeatIntervalMs = 2000,     // 2-4. 心跳 2秒 (必須 < SessionTimeoutMs)
                MaxPollIntervalMs = 10000,      // 2-5. 設定輪詢超時超過 10 秒就換機器處理此筆訊息 (必須 >= SessionTimeoutMs)                    
            };

          
            try
            {
                _consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
                _consumer.Subscribe(_kafkaConfig.TopicName);
                _logger.LogInformation($"成功連接並訂閱主題: {_kafkaConfig.TopicName}");
            }
            catch (Exception ex)
            {
                _logger.LogError($"連接到 Kafka 失敗: {ex.Message}");
                _logger.LogWarning("應用程式將繼續運行,但 Kafka 消費功能將不可用");
                return;
            }

            var taskCompletionSource = new TaskCompletionSource<bool>();
            _ = Task.Run(async () =>
            {
                try
                {
                    while (!stoppingToken.IsCancellationRequested)
                    {
                        try
                        {
                            // 3. 處理生產資料,取出
                            var consumeResult = _consumer.Consume(TimeSpan.FromSeconds(1));

                            if (consumeResult != null && !consumeResult.IsPartitionEOF)
                            {
                                _consumer.Commit(consumeResult); // 3-1. 完成手動提交給 Kafka 告知此筆訊息已消費
                                _logger.LogInformation($"收到 Kafka 訊息: 鍵={consumeResult.Message.Key}, 值={consumeResult.Message.Value}");
                            }
                        }
                        catch (ConsumeException ex)
                        {
                            // 3-2. 失敗時重新丟回 Kafka 處理剛剛那筆資訊
                            _logger.LogError($"消費時出錯: {ex.Error.Reason}");                                
                            await Task.Delay(5000, stoppingToken);
                        }
                        catch (OperationCanceledException)
                        {                                
                            break;
                        }
                        catch (Exception ex)
                        {
                            _logger.LogError($"未預期錯誤: {ex.Message}");                                
                            await Task.Delay(5000, stoppingToken);
                        }
                    }
                }
                catch (Exception ex)
                {
                    _logger.LogError($"消費線程發生未處理錯誤: {ex.Message}");
                }
                finally
                {
                    taskCompletionSource.TrySetResult(true);
                }
            }, stoppingToken);

            // 4. 主線程等待取消信號
            await taskCompletionSource.Task;
        }
        catch (Exception ex)
        {
            _logger.LogError($"背景服務執行時發生未處理異常: {ex.Message}");
        }
    }

    /// <summary>
    /// 二、停止背景服務時 EX: 關閉、停用
    /// </summary>        
    public override Task StopAsync(CancellationToken stoppingToken)
    {
        if (_isConnected)
        {
            _logger.LogInformation("正在關閉 Kafka 消費者");
            _consumer?.Close();
            _consumer?.Dispose();
        }
        return base.StopAsync(stoppingToken);
    }
}



Step 2: Web 控制器

cs 檔案、Namespace、建構式更名即可,避免與參考專案的 HomeController 衝突

public class MulitMasterHomeController : Controller
{
    private readonly ILogger<MulitMasterHomeController> _logger;
    public MulitMasterHomeController(ILogger<MulitMasterHomeController> logger)
    {
        _logger = logger;
    }
}



Step 3: 初始化配置

依賴注入所需的 Program.cs 配置,此專案只負責持續運行背景服務。

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.
builder.Services.AddControllersWithViews();

// 1. 註冊 Kafka 配置選項
builder.Services.Configure<KafkaConfigOptions>(builder.Configuration.GetSection("KafkaConfig"));

// 2-1. 註冊 Kafka 服務
builder.Services.AddSingleton<IKafkaConsumerService, KafkaConsumerService>();
builder.Services.AddSingleton<IKafkaProducerService, KafkaProducerService>();

// 2-2. 註冊 Kafka 主服務
builder.Services.AddHostedService<KafkaMultiMasterConsumerHostedService>();


var app = builder.Build();

... 略

app.Run();



第四部分:DEMO 成果 - 主主服務架構 - 不重複消費

Step 1:預期結果

每個生產者消息會分配到每個主服務器上進行消費,讓 A , B 服務器都可收到資料,並且不會重複執行。


Step 2:部署相同專案代碼 - 第 1 台

Visual Studio 2022 -> 發布 -> 某個資料夾
發布檔案後,先將 appsettings.json 中的 ConsumerGroupId 參數, 2 台服務必須相同

 "ConsumerGroupId": "multi-master-consumer-group"




Step 3:部署相同專案代碼 - 第 2 台

另一台也相同配置

 "ConsumerGroupId": "multi-master-consumer-group"




Step 4:啟動 2 台主服務 & 一筆生產者訊息

  1. 對第 1 台機器執行以下指令
dotnet KafkaMultiMasterServerExample.dll --urls=http://localhost:6001



2. 對第 2 台機器執行以下指令

dotnet KafkaMultiMasterServerExample.dll --urls=http://localhost:6002



3. 可以在生產者發送訊息裡面發送 1 筆資料,可以看到目前只有 Port : 6002 的機器收到該消息


Step 5:再次發送生產者訊息

然後再持續推送生產者訊息


Step 6:另一台機器收到此消息 - Demo成功

這次輪到另一台進行消費,實現主主服務的架構


第五部分:DEMO 成果 - 主主服務架構 - 自動負載均衡

Step 1:預期結果

當一條消息在主服務器 A 時,若異常會讓主服務器 B 進行消費。 (左 -> 右)


Step 2:模擬 1 台服務消費失敗的狀況

將代碼手動反饋 Kafka 完成的地方註解,並且增加 Log

_logger.LogInformation($"【故意不手動提交,模擬此主服務異常】 當前主服務位置: {urls}");
//_consumer.Commit(consumeResult); // 3-1. 完成手動提交給 Kafka 告知此筆訊息已消費




Step 3:模擬代碼 - 部署到第 1 台機器

Visual Studio -> 發布


將第 1 台機器的部署檔案更新,並且重新啟動

dotnet KafkaMultiMasterServerExample.dll --urls=http://localhost:6001




Step 4:發送生產者消息

發送 1 筆 Key: 623 Value:623 的資料


Step 5:異常時,另台機器重新消費此訊息 - Demo成功

當主服務 1 機器異常時,會將消息還給 Kafka ,然後 Kafka 會再自動負載均衡的將此訊息給其他活著並且訂閱 multi-master-consumer-group 的消費者群組的服務。