分享程式代碼相關筆記
目前文章總數:182 篇
最後更新:2025年 05月 31日
有以下 Kafka 本身的特性,因此適合主主服務,其中最關鍵的是粗體字的部分。
1.Consumer Group 機制 | 自動負載均衡分配 |
消息不會重複消費 | |
故障自動轉移 | |
2. 分區機制 | 天然的並行處理能力 |
每個分區只分配給一個消費者 | |
支持水平擴展 | |
3. 偏移量管理 | 精確控制消息處理進度 |
支持手動提交 | |
故障恢復不丟失消息 |
主主服務的優點:
優點 | 說明 |
---|---|
高資源利用率 | 所有節點都在工作 |
自動故障轉移 | 秒級恢復,無需手動干預 |
水平擴展 | 加節點就能提升性能 |
無單點故障 | 任何節點都可以處理業務 |
維護簡單 | 所有節點代碼相同 |
主主服務的缺點:
缺點 | 說明 |
---|---|
分布式複雜性 | 需要考慮分區依賴 |
調試困難 | 分布式環境調試較複雜 |
網絡依賴 | 對網絡穩定性要求較高 |
主主服務 | 主從服務 |
---|---|
高並發處理需求 | 強一致性需求(如金融交易) |
追求高可用性和快速恢復 | 業務邏輯複雜且有狀態依賴 |
系統負載較高,需要充分利用資源 | 系統負載較低,不需要高並發 |
可以接受最終一致性 | 團隊對分布式系統經驗較少 |
有分布式系統經驗的團隊 | 調試和維護優先考慮 |
簡單結論:高可用性、低成本、分布式場景,建議使用主主服務
1. 生產者 / 消費者 | 各自訂閱 Kafka ,生產者訂閱 Topic Name ; 消費者訂閱 Topic Name,配置共有的 Consumer Group |
2. 生者者發送消息 | 生產者只關注 Topic 並發送給此消息的 Topic |
3. 消費者監聽 | 並且 Consumer Group 接收 Topic 訊息,由 Kafka 自動分配給底下的消費者 |
4. 主服務消費 | 多個主服務可以在 Consumer 形成負載平衡,並且若某個主服務消費失敗,會由另一台主服務接手處理 |
C# Kafka 中 EnableAutoCommit 與 EnableAutoOffsetStore 參數需要設定為 false 才能實現手動處理
打開範例代碼後,架構基本分成以下:
備註:此專案有參考此專案代碼,因此不用重新實作 Kafka
1. Service - BackGround | : | 背景服務,持續輪詢 Kafka 持續消費指定的 TopicName 並使用共同的 ConsumerGroup 進行消費訊息 |
2. Web 控制器 | : | 參考專案後,需將 HomeController 改名成另一個名稱,避免衝突 |
3. 初始化配置 | : | 基本的依賴注入,並且引用 Kafka Confluent 套件,並註冊背景服務 |
這是主主服務最核心部分:背景服務分成 2 部分,第一部分是執行背景工作 - 持續消費 ; 第二部分是關閉時的釋放資源
要實現主主服務架構,必須在 2-1. ~ 2-5. 設定好手動執行消費
然後 3-1. 才能讓主服務正常響應 Kafka 已經消費完畢。
public class KafkaMultiMasterConsumerHostedService : BackgroundService
{
private readonly ILogger<KafkaMultiMasterConsumerHostedService> _logger;
private readonly IKafkaConsumerService _kafkaConsumerService;
private readonly KafkaConfigOptions _kafkaConfig;
private IConsumer<string, string> _consumer;
private readonly IConfiguration _configuration;
private bool _isConnected = false;
public KafkaMultiMasterConsumerHostedService(ILogger<KafkaMultiMasterConsumerHostedService> logger,
IKafkaConsumerService kafkaConsumerService,
IConfiguration configuration,
IOptions<KafkaConfigOptions> kafkaConfigOptions)
{
_logger = logger;
_kafkaConsumerService = kafkaConsumerService;
_kafkaConfig = kafkaConfigOptions.Value;
_configuration = configuration;
}
/// <summary>
/// 一、持續執行的背景工作
/// </summary>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
// 1. 記錄配置資訊
var urls = _configuration["urls"] ?? string.Empty;
_logger.LogInformation($"當前主服務位置: {urls}");
_logger.LogInformation($"嘗試連接到 Kafka 伺服器: {_kafkaConfig.BootstrapServers}");
_logger.LogInformation($"傾聽生產者的主題: {_kafkaConfig.TopicName}");
_logger.LogInformation($"消費者群組多個主主都是用相同的: {_kafkaConfig.ConsumerGroupId}");
// 2. 初始化 Kafka 消費者
var consumerConfig = new ConsumerConfig
{
BootstrapServers = _kafkaConfig.BootstrapServers,
GroupId = _kafkaConfig.ConsumerGroupId,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false,// 2-1. 自動提交 true:啟動 false:關閉,若想要代碼邏輯處理完成在自行設定為消費完成需要使用 false
EnableAutoOffsetStore = false, // 2-2 加入這行:關閉自動偏移量存儲 ,若不關閉,即使手動沒提交也會自動往前 Offset ,導致無法重新消費此筆訊息
SocketTimeoutMs = 10000,
RetryBackoffMs = 1000,
SessionTimeoutMs = 6000, // 2-3. Session Timeout 時間 6 秒
HeartbeatIntervalMs = 2000, // 2-4. 心跳 2秒 (必須 < SessionTimeoutMs)
MaxPollIntervalMs = 10000, // 2-5. 設定輪詢超時超過 10 秒就換機器處理此筆訊息 (必須 >= SessionTimeoutMs)
};
try
{
_consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
_consumer.Subscribe(_kafkaConfig.TopicName);
_logger.LogInformation($"成功連接並訂閱主題: {_kafkaConfig.TopicName}");
}
catch (Exception ex)
{
_logger.LogError($"連接到 Kafka 失敗: {ex.Message}");
_logger.LogWarning("應用程式將繼續運行,但 Kafka 消費功能將不可用");
return;
}
var taskCompletionSource = new TaskCompletionSource<bool>();
_ = Task.Run(async () =>
{
try
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
// 3. 處理生產資料,取出
var consumeResult = _consumer.Consume(TimeSpan.FromSeconds(1));
if (consumeResult != null && !consumeResult.IsPartitionEOF)
{
_consumer.Commit(consumeResult); // 3-1. 完成手動提交給 Kafka 告知此筆訊息已消費
_logger.LogInformation($"收到 Kafka 訊息: 鍵={consumeResult.Message.Key}, 值={consumeResult.Message.Value}");
}
}
catch (ConsumeException ex)
{
// 3-2. 失敗時重新丟回 Kafka 處理剛剛那筆資訊
_logger.LogError($"消費時出錯: {ex.Error.Reason}");
await Task.Delay(5000, stoppingToken);
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.LogError($"未預期錯誤: {ex.Message}");
await Task.Delay(5000, stoppingToken);
}
}
}
catch (Exception ex)
{
_logger.LogError($"消費線程發生未處理錯誤: {ex.Message}");
}
finally
{
taskCompletionSource.TrySetResult(true);
}
}, stoppingToken);
// 4. 主線程等待取消信號
await taskCompletionSource.Task;
}
catch (Exception ex)
{
_logger.LogError($"背景服務執行時發生未處理異常: {ex.Message}");
}
}
/// <summary>
/// 二、停止背景服務時 EX: 關閉、停用
/// </summary>
public override Task StopAsync(CancellationToken stoppingToken)
{
if (_isConnected)
{
_logger.LogInformation("正在關閉 Kafka 消費者");
_consumer?.Close();
_consumer?.Dispose();
}
return base.StopAsync(stoppingToken);
}
}
cs 檔案、Namespace、建構式更名即可,避免與參考專案的 HomeController 衝突
public class MulitMasterHomeController : Controller
{
private readonly ILogger<MulitMasterHomeController> _logger;
public MulitMasterHomeController(ILogger<MulitMasterHomeController> logger)
{
_logger = logger;
}
}
依賴注入所需的 Program.cs 配置,此專案只負責持續運行背景服務。
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddControllersWithViews();
// 1. 註冊 Kafka 配置選項
builder.Services.Configure<KafkaConfigOptions>(builder.Configuration.GetSection("KafkaConfig"));
// 2-1. 註冊 Kafka 服務
builder.Services.AddSingleton<IKafkaConsumerService, KafkaConsumerService>();
builder.Services.AddSingleton<IKafkaProducerService, KafkaProducerService>();
// 2-2. 註冊 Kafka 主服務
builder.Services.AddHostedService<KafkaMultiMasterConsumerHostedService>();
var app = builder.Build();
... 略
app.Run();
每個生產者消息會分配到每個主服務器上進行消費,讓 A , B 服務器都可收到資料,並且不會重複執行。
Visual Studio 2022 -> 發布 -> 某個資料夾
發布檔案後,先將 appsettings.json 中的 ConsumerGroupId 參數, 2 台服務必須相同
"ConsumerGroupId": "multi-master-consumer-group"
另一台也相同配置
"ConsumerGroupId": "multi-master-consumer-group"
dotnet KafkaMultiMasterServerExample.dll --urls=http://localhost:6001
2. 對第 2 台機器執行以下指令
dotnet KafkaMultiMasterServerExample.dll --urls=http://localhost:6002
3. 可以在生產者發送訊息裡面發送 1 筆資料,可以看到目前只有 Port : 6002 的機器收到該消息
然後再持續推送生產者訊息
這次輪到另一台進行消費,實現主主服務的架構
當一條消息在主服務器 A 時,若異常會讓主服務器 B 進行消費。 (左 -> 右)
將代碼手動反饋 Kafka 完成的地方註解,並且增加 Log
_logger.LogInformation($"【故意不手動提交,模擬此主服務異常】 當前主服務位置: {urls}");
//_consumer.Commit(consumeResult); // 3-1. 完成手動提交給 Kafka 告知此筆訊息已消費
Visual Studio -> 發布
將第 1 台機器的部署檔案更新,並且重新啟動
dotnet KafkaMultiMasterServerExample.dll --urls=http://localhost:6001
發送 1 筆 Key: 623 Value:623 的資料
當主服務 1 機器異常時,會將消息還給 Kafka ,然後 Kafka 會再自動負載均衡的將此訊息給其他活著並且訂閱 multi-master-consumer-group 的消費者群組的服務。