首頁

目前文章總數:172 篇

  

最後更新:2025年 03月 22日

0090. Kafka 開發指南:.NET Core 上 Kafka 事件流(消息隊列)推送

日期:2025年 04月 12日

標籤: Asp.NET Core Web MVC Docker Docker-Compose Container Ubuntu Linux Kafka Kafka UI Visual Studio

摘要:C# 學習筆記


應用所需:1. Linux Ubuntu (本篇 22.04)
     2. 已安裝 Docker、Kafka、Kafka-UI、Zookeeper
     3. Visual Studio 2022 以上版本
解決問題:1. 如何實現 C# 代碼下的 Kafka 事件流(消息隊列)推送
相關參考:0089. Linux Ubuntu 作業系統,搭配 Docker Compose 快速安裝 Kafka 流程
範例專案:範例代碼
基本介紹:本篇分為 4 部分。
第一部分:Web專案架構
第二部分:代碼說明
第三部分:DEMO 成果
第四部分:分布式 Kafka 特色 - 重放能力






第一部分:Web專案架構

Step 1:範例專案架構

打開範例代碼後,架構基本分成以下:

1. Service - BackGround 背景服務,持續輪詢 Kafka 持續消費指定的 TopicName 將資料寫進此 AP
2. Service - Producer 生產者模式,發送訊息到指定的 Kafka Broker + TopicName 上
3. Service - Consumer 消費者模式,這邊的服務是將從背景服務取到的消費者資料回傳給 Web控制器,用以渲染
4. Web控制器 提供生產者發送訊息 & 頁面 與 消費者檢視頁面
5. Html 畫面 提供 Html 畫面取資料、並且可發送訊息給 Kafka
6. 參數配置 配置 Kafka 的目標 Server Host ,發送的 TopicName 與自己應用端的消費者群組Id
7. 初始化配置 基本的依賴注入,並且引用 Kafka Confluent 套件,並註冊背景服務




第二部分:代碼說明

Step 1: Service - BackGround

若想要詳細的 Kafka 操作,可以參考官方的開發文件,官方寫的非常詳細。
※這篇以開發導向,能快速將代碼與 Kafka 服務器作互動為目標
背景服務會持續輪詢 Kafka ,並將生產者的訊息進行持續消費
方法分成持續進行、關閉時。

public class KafkaConsumerHostedService : BackgroundService
{
    private readonly ILogger<KafkaConsumerHostedService> _logger;
    private readonly IKafkaConsumerService _kafkaConsumerService;
    private readonly KafkaConfigOptions _kafkaConfig;
    private IConsumer<string, string> _consumer;
    private bool _isConnected = false;

    public KafkaConsumerHostedService(
        ILogger<KafkaConsumerHostedService> logger,
        IKafkaConsumerService kafkaConsumerService,
        IOptions<KafkaConfigOptions> kafkaConfigOptions)
    {
        _logger = logger;
        _kafkaConsumerService = kafkaConsumerService;
        _kafkaConfig = kafkaConfigOptions.Value;
    }

    /// <summary>
    /// 一、持續執行的背景工作
    /// </summary>        
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            try
            {
                // 1. 記錄配置資訊
                _logger.LogInformation($"嘗試連接到 Kafka 伺服器: {_kafkaConfig.BootstrapServers}");
                _logger.LogInformation($"主題: {_kafkaConfig.TopicName}");
                _logger.LogInformation($"消費者群組: {_kafkaConfig.ConsumerGroupId}");

                // 2. 初始化 Kafka 消費者
                var consumerConfig = new ConsumerConfig
                {
                    BootstrapServers = _kafkaConfig.BootstrapServers,
                    GroupId = _kafkaConfig.ConsumerGroupId,
                    AutoOffsetReset = AutoOffsetReset.Earliest,
                    EnableAutoCommit = true,
                    SocketTimeoutMs = 10000,
                    RetryBackoffMs = 1000
                };

                // 2-2. 無法連接時,直接結束方法,不進入迴圈 (這篇範例假定 Kafka Server 是正常運行,不做其他例外處理)
                try
                {
                    _consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
                    _consumer.Subscribe(_kafkaConfig.TopicName);
                    _logger.LogInformation($"成功連接並訂閱主題: {_kafkaConfig.TopicName}");
                }
                catch (Exception ex)
                {
                    _logger.LogError($"連接到 Kafka 失敗: {ex.Message}");
                    _logger.LogWarning("應用程式將繼續運行,但 Kafka 消費功能將不可用");
                    return;
                }

                // 3-1. 開啟 TaskCompletionSource 來控制執行流程
                var taskCompletionSource = new TaskCompletionSource<bool>();

                // 3-2. 啟用線程執行 Kafka 消費操作 (避免阻塞主程式)
                _ = Task.Run(async () =>
                {
                    try
                    {
                        // 3-3. 不斷檢查是否有觸發生產者資料
                        while (!stoppingToken.IsCancellationRequested)
                        {
                            try
                            {
                                // 4-1. 處理生產資料,取出
                                var consumeResult = _consumer.Consume(TimeSpan.FromSeconds(1));

                                if (consumeResult != null && !consumeResult.IsPartitionEOF)
                                {
                                    _logger.LogInformation($"收到 Kafka 訊息: 鍵={consumeResult.Message.Key}, 值={consumeResult.Message.Value}");

                                    // 4-2. 處理訊息...
                                    _kafkaConsumerService.AddReceivedMessage(new KafkaMessageViewModel()
                                    {
                                        Key = consumeResult.Message.Key,
                                        Message = consumeResult.Message.Value,
                                        Topic = consumeResult.Topic,
                                        Offset = consumeResult.TopicPartitionOffset.Offset
                                    });
                                }
                            }
                            catch (ConsumeException ex)
                            {
                                _logger.LogError($"消費時出錯: {ex.Error.Reason}");

                                // 4-3. While 內發生錯誤後暫停一段時間,避免迴圈過快消耗資源
                                await Task.Delay(5000, stoppingToken);
                            }
                            catch (OperationCanceledException)
                            {
                                // 4-4. 確定中止強制跳出
                                break;
                            }
                            catch (Exception ex)
                            {
                                _logger.LogError($"未預期錯誤: {ex.Message}");
                                // 4-5. While 內發生錯誤後暫停一段時間,避免迴圈過快消耗資源
                                await Task.Delay(5000, stoppingToken);
                            }
                        }
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError($"消費線程發生未處理錯誤: {ex.Message}");
                    }
                    finally
                    {
                        taskCompletionSource.TrySetResult(true);
                    }
                }, stoppingToken);

                // 5. 主線程等待取消信號
                await taskCompletionSource.Task;
            }
            catch (Exception ex)
            {
                _logger.LogError($"背景服務執行時發生未處理異常: {ex.Message}");
            }
        }

    /// <summary>
    /// 二、停止背景服務時 EX: 關閉、停用
    /// </summary>        
    public override Task StopAsync(CancellationToken stoppingToken)
        {
            if (_isConnected)
            {
                _logger.LogInformation("正在關閉 Kafka 消費者");
                _consumer?.Close();
                _consumer?.Dispose();
            }
            return base.StopAsync(stoppingToken);
        }
}



Step 2: Service - Producer

生產者工作 - 將從前端傳來的訊息,發送到 Kafka 上,前端可攜帶 TopicName, Key, Value
目標 Server Host 都會從 Appsettings.json 取得。

public class KafkaProducerService : IKafkaProducerService
{
    private readonly IProducer<string, string> _producer;
    private readonly KafkaConfigOptions _kafkaConfig;

    /// <summary>
    /// 1. 建構式,準備連線對象
    /// </summary>        
    public KafkaProducerService(IOptions<KafkaConfigOptions> kafkaConfigOptions)
        {
            _kafkaConfig = kafkaConfigOptions.Value;

            var config = new ProducerConfig
            {
                BootstrapServers = _kafkaConfig.BootstrapServers,
                Acks = Acks.All // Acks 參數定義了生產者發送訊息後,需要多少個 Broker 確認訊息已被接收,才能視為成功發送。
            };

            _producer = new ProducerBuilder<string, string>(config).Build();
        }

    /// <summary>
    /// 2. 生產者 : 發送到 Kafka  Broker 上
    /// </summary>        
    public async Task<KafkaProduceResult> ProduceMessageAsync(
        string topic, string key, string message)
        {
            try
            {
                var deliveryResult = await _producer.ProduceAsync(
                    topic,
                    new Message<string, string>
                    {
                        Key = string.IsNullOrEmpty(key) ? Guid.NewGuid().ToString() : key,
                        Value = message
                    });

                return new KafkaProduceResult
                {
                    IsSuccess = true,
                    TopicPartitionOffset = deliveryResult.TopicPartitionOffset.ToString()
                };
            }
            catch (ProduceException<string, string> ex)
            {
                return new KafkaProduceResult
                {
                    IsSuccess = false,
                    ErrorMessage = ex.Error.Reason
                };
            }
        }
}



Step 3: Service - Consumer

消費者 - 會由被竟服務持續將值寫進 AP (Application Program)中
當前端需要資料顯示時再從 GetReceivedMessages() 方法返回。

public class KafkaConsumerService : IKafkaConsumerService
{
    private readonly ConcurrentQueue<KafkaMessageViewModel> _receivedMessages = new ConcurrentQueue<KafkaMessageViewModel>();
    private const int MaxStoredMessages = 100;
    public List<KafkaMessageViewModel> GetReceivedMessages()
    {
        return _receivedMessages.ToList();
    }

    /// <summary>
    /// 將訊息加入隊列中
    /// </summary>
    /// <param name="message"></param>
    public void AddReceivedMessage(KafkaMessageViewModel message)
    {

        _receivedMessages.Enqueue(message);

        // 保持訊息數量不超過上限
        while (_receivedMessages.Count > MaxStoredMessages)
        {
            _receivedMessages.TryDequeue(out _);
        }
    }
}



Step 4: Web控制器

包含了檢視生產者頁面、消費者頁面,以及生產者專用的建立生產者訊息API

/// <summary>
/// 1. View 頁面 - 生產者的資訊 (發送資料)
/// </summary>        
public IActionResult Produce()
{
    return View(new KafkaMessageViewModel());
}

/// <summary>
/// 2. 生產者 - 發送資料到 Kafka 
/// </summary>        
[HttpPost]
public async Task<IActionResult> Produce(KafkaMessageViewModel model)
{
    // 1. 用 C# 特性,自動驗證 Model 上的欄位
    if (ModelState.IsValid)
    {
        //2. 將資料傳送到 Kafka  上 - 關鍵是 Topic : 主題 ; 鍵值 : Key  才能正確傳送到對象 
        var result = await _producerService.ProduceMessageAsync(model.Topic, model.Key, model.Message);
        //3. 返回回傳結果
        if (result.IsSuccess)
        {
            TempData["SuccessMessage"] = $"訊息發送成功!位置: {result.TopicPartitionOffset}";
        }
        else
        {
            TempData["ErrorMessage"] = $"訊息發送失敗: {result.ErrorMessage}";
        }
        return RedirectToAction(nameof(Produce));
    }
    return View(model);
}

/// <summary>
/// 3. View 頁面 - 消費者的資訊 (當前收到的資料)
/// </summary>   
public IActionResult Consume()
{
    // 1. 取得發送過的 Kafka 資料 (這篇簡單示範,還有其他持久化消費對列的方式)
    var receivedMessages = _consumerService.GetReceivedMessages();
    return View(receivedMessages);
}



Step 5: Html 畫面 - Producer

生產者頁面,利用 Partial 建立生產者訊息,並且發送後顯示發送的資訊。

@using KafkaAspCoreWebExample.Models
@model KafkaMessageViewModel
@{
    ViewData["Title"] = "發送 Kafka 訊息";
}

<h1>發送 Kafka 訊息</h1>

@if (TempData["SuccessMessage"] != null)
{
    <div class="alert alert-success">
        @TempData["SuccessMessage"]
    </div>
}

@if (TempData["ErrorMessage"] != null)
{
    <div class="alert alert-danger">
        @TempData["ErrorMessage"]
    </div>
}

<div class="row">
    <div class="col-md-6">
        <div class="card">
            <div class="card-header">
                <h5>發送訊息表單</h5>
            </div>
            <div class="card-body">
                <form asp-action="Produce" method="post">
                    <div class="form-group mb-3">
                        <label asp-for="Topic" class="control-label"></label>
                        <input asp-for="Topic" class="form-control" value="test-topic" />
                        <span asp-validation-for="Topic" class="text-danger"></span>
                    </div>
                    <div class="form-group mb-3">
                        <label asp-for="Key" class="control-label"></label>
                        <input asp-for="Key" class="form-control" />
                        <small class="form-text text-muted">如果不提供鍵值,將自動生成一個唯一識別碼</small>
                    </div>
                    <div class="form-group mb-3">
                        <label asp-for="Message" class="control-label"></label>
                        <textarea asp-for="Message" class="form-control" rows="5"></textarea>
                        <span asp-validation-for="Message" class="text-danger"></span>
                    </div>
                    <div class="form-group">
                        <button type="submit" class="btn btn-primary">發送訊息</button>
                        <a asp-action="Index" class="btn btn-secondary">返回</a>
                    </div>
                </form>
            </div>
        </div>
    </div>
    <div class="col-md-6">
        <div class="card">
            <div class="card-header">
                <h5>使用說明</h5>
            </div>
            <div class="card-body">
                <p>填寫上面的表單以發送訊息到 Kafka:</p>
                <ul>
                    <li><strong>主題</strong>:填寫您想要發送訊息的 Kafka 主題名稱</li>
                    <li><strong>鍵值</strong>:(可選)用於指定訊息的鍵,影響分區分配</li>
                    <li><strong>訊息內容</strong>:要發送的實際訊息</li>
                </ul>
                <p>發送訊息後,您可以在「接收訊息」頁面查看已接收的訊息。</p>
            </div>
        </div>
    </div>
</div>

@section Scripts {
    @{
        await Html.RenderPartialAsync("_ValidationScriptsPartial");
    }
}



Step 6: Html 畫面 - Consumer

消費者頁面,純檢視,互動只有刷新畫面

@using KafkaAspCoreWebExample.Models
@model List<KafkaMessageViewModel>
@{
    ViewData["Title"] = "接收 Kafka 訊息";
}

<h1>接收 Kafka 訊息</h1>

<div class="mb-3">
    <a asp-action="Index" class="btn btn-secondary">返回</a>
    <a asp-action="Consume" class="btn btn-primary">重新整理</a>
</div>

@if (Model.Any())
{
    <div class="table-responsive">
        <table class="table table-striped table-bordered">
            <thead>
                <tr>
                    <th>接收時間</th>
                    <th>主題</th>
                    <th>鍵值</th>
                    <th>訊息內容</th>
                    <th>偏移量</th>
                </tr>
            </thead>
            <tbody>
                @foreach (var message in Model.OrderByDescending(m => m.ReceivedAt))
                {
                    <tr>
                        <td>@message.ReceivedAt?.ToString("yyyy-MM-dd HH:mm:ss")</td>
                        <td>@message.Topic</td>
                        <td>@message.Key</td>
                        <td>@message.Message</td>
                        <td>@message.Offset</td>
                    </tr>
                }
            </tbody>
        </table>
    </div>
}
else
{
    <div class="alert alert-info">
        尚未接收到任何訊息。請確保消費者服務正在運行,並且已經有訊息發送到訂閱的主題。
    </div>
}

<div class="card mt-4">
    <div class="card-header">
        <h5>使用說明</h5>
    </div>
    <div class="card-body">
        <p>此頁面顯示已從 Kafka 接收的訊息。背景服務正在監聽配置的主題,並將接收到的訊息添加到此列表中。</p>
        <p>點擊「重新整理」按鈕可以更新訊息列表。</p>
        <p><strong>注意</strong>:只顯示最近接收的最多 100 條訊息。</p>
    </div>
</div>



Step 7: 參數配置

配置主機指向位置,其中 TopicName、ConsumerGroudId 會有其他持久化的方式保存,不會每個 AP 都固定
這不利於橫向擴展,這邊只是範例,寫在 Config 中

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  },
  "AllowedHosts": "*",
  "KafkaConfig": {
    "BootstrapServers": "192.168.51.100:9092",
    "TopicName": "test-topic",
    "ConsumerGroupId": "aspnet-core-consumer-group"
  }
}



Step 8: 初始化配置

基本的依賴注入配置

using KafkaAspCoreWebExample.Services.Background;
using KafkaAspCoreWebExample.Services;
using KafkaAspCoreWebExample.Models;

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.
builder.Services.AddControllersWithViews();

// 1. 註冊 Kafka 配置選項
builder.Services.Configure<KafkaConfigOptions>(builder.Configuration.GetSection("KafkaConfig"));

// 2-1. 註冊 Kafka 服務
builder.Services.AddSingleton<IKafkaConsumerService, KafkaConsumerService>();
builder.Services.AddSingleton<IKafkaProducerService, KafkaProducerService>();

// 2-2. 註冊 Kafka 背景服務
builder.Services.AddHostedService<KafkaConsumerHostedService>();

var app = builder.Build();

// 略......

app.Run();



Step 9: 安裝 Nuget 套件

這邊使用的是 Kafka Confluent 套件,來實現與 kafka Server 的交互
※Confluent.Kafka 是免費且官方授權的,但是如果上到 Confluent Cloud 雲端託管,將會進行計費


第三部分:DEMO 成果

Step 1:啟動專案

使用 Visual Studio 開啟專案 -> 進入 Debug 模式 -> 進入 Kafka 首頁 -> 會有 2 個頁面可以操作


Step 2:專案 - 生產者測試頁面 - 填寫

在頁面上填寫基本資訊 TopicName、Key,Value 後,點擊發送送出


Step 3:專案 - 生產者測試頁面 - 送出

送出成功後,會提示成功資訊


Step 4:確認 Kafka-UI

這時可以進入 Kafak-UI 上確認 TopicName,依序點擊
可以看到生產者的訊息


Step 5:專案 - 消費者檢視頁面 - 完成DEMO

進入消費者檢視頁面,可以看到此消費者群組,消費該 TopicName 所有的生產者訊息


第四部分:分布式 Kafka 特色 - 重放能力

Step 1:Kafak-UI 檢視消費者訊息 - 1

依序點擊 -> Consumer -> Consumer 群組Id


Step 2:Kafak-UI 檢視消費者訊息 - 2

可以看到 Current Offset 與 End Offset 一致,表示此 Consumer 的群組,已經消費所有資訊了


Step 3:專案 - 進行重新消費 - 重啟專案

這時再從新啟動專案 -> 進入 Debug 模式 -> 消費者檢視頁面
因為都已經被消費完畢,因此此群組無法再取得未消費的訊息


Step 4:專案 - 進行重新消費 - 更換消費者群組Id

開啟專案的 Appsettings.json 檔,將 ConsumerGroupId 隨意改個名稱

"ConsumerGroupId": "aspnet-core-consumer-group2"




Step 5:專案 - 進行重新消費 - 消費者檢視頁面

可以看到這個群組 aspnet-core-consumer-group2 進行所有訊息的重新消費


Step 6:Kafka 的事件串流模型

為何可以重複消費呢? => 因為Kafka 的設計理念是作為一個分佈式事件日誌系統,因此具有以下特性:

1. 偏移量跟踪: Kafka 把消息存儲在持久化的日誌中
  每個消費者群組維護自己的偏移量(offset),標記已讀取的位置
  更改 Consumer Group ID 等於創建了一個全新的消費者群組,沒有任何偏移量記錄
   
2. 數據保留機制: Kafka 會根據配置的保留策略保留消息(時間或大小限制)
  即使消息被消費,也不會被刪除,直到保留期過期
  這允許多個消費者群組獨立讀取相同的消息流
   
3. 重新消費能力: 消費者可以重置自己的偏移量到任何位置
  創建新的 Consumer Group ID 會從配置的起始位置(通常是最早或最新)開始消費


Kafka 優先考慮高吞吐量事件重放能力,因此都以持久化為核心,那麼成本勢必較保證一次性消費的 RabbitMQ 高出不少成本