分享程式代碼相關筆記
目前文章總數:172 篇
最後更新:2025年 03月 22日
打開範例代碼後,架構基本分成以下:
1. Service - BackGround | : | 背景服務,持續輪詢 Kafka 持續消費指定的 TopicName 將資料寫進此 AP |
2. Service - Producer | : | 生產者模式,發送訊息到指定的 Kafka Broker + TopicName 上 |
3. Service - Consumer | : | 消費者模式,這邊的服務是將從背景服務取到的消費者資料回傳給 Web控制器,用以渲染 |
4. Web控制器 | : | 提供生產者發送訊息 & 頁面 與 消費者檢視頁面 |
5. Html 畫面 | : | 提供 Html 畫面取資料、並且可發送訊息給 Kafka |
6. 參數配置 | : | 配置 Kafka 的目標 Server Host ,發送的 TopicName 與自己應用端的消費者群組Id |
7. 初始化配置 | : | 基本的依賴注入,並且引用 Kafka Confluent 套件,並註冊背景服務 |
若想要詳細的 Kafka 操作,可以參考官方的開發文件,官方寫的非常詳細。
※這篇以開發導向,能快速將代碼與 Kafka 服務器作互動為目標。
背景服務會持續輪詢 Kafka ,並將生產者的訊息進行持續消費
方法分成持續進行、關閉時。
public class KafkaConsumerHostedService : BackgroundService
{
private readonly ILogger<KafkaConsumerHostedService> _logger;
private readonly IKafkaConsumerService _kafkaConsumerService;
private readonly KafkaConfigOptions _kafkaConfig;
private IConsumer<string, string> _consumer;
private bool _isConnected = false;
public KafkaConsumerHostedService(
ILogger<KafkaConsumerHostedService> logger,
IKafkaConsumerService kafkaConsumerService,
IOptions<KafkaConfigOptions> kafkaConfigOptions)
{
_logger = logger;
_kafkaConsumerService = kafkaConsumerService;
_kafkaConfig = kafkaConfigOptions.Value;
}
/// <summary>
/// 一、持續執行的背景工作
/// </summary>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
// 1. 記錄配置資訊
_logger.LogInformation($"嘗試連接到 Kafka 伺服器: {_kafkaConfig.BootstrapServers}");
_logger.LogInformation($"主題: {_kafkaConfig.TopicName}");
_logger.LogInformation($"消費者群組: {_kafkaConfig.ConsumerGroupId}");
// 2. 初始化 Kafka 消費者
var consumerConfig = new ConsumerConfig
{
BootstrapServers = _kafkaConfig.BootstrapServers,
GroupId = _kafkaConfig.ConsumerGroupId,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true,
SocketTimeoutMs = 10000,
RetryBackoffMs = 1000
};
// 2-2. 無法連接時,直接結束方法,不進入迴圈 (這篇範例假定 Kafka Server 是正常運行,不做其他例外處理)
try
{
_consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
_consumer.Subscribe(_kafkaConfig.TopicName);
_logger.LogInformation($"成功連接並訂閱主題: {_kafkaConfig.TopicName}");
}
catch (Exception ex)
{
_logger.LogError($"連接到 Kafka 失敗: {ex.Message}");
_logger.LogWarning("應用程式將繼續運行,但 Kafka 消費功能將不可用");
return;
}
// 3-1. 開啟 TaskCompletionSource 來控制執行流程
var taskCompletionSource = new TaskCompletionSource<bool>();
// 3-2. 啟用線程執行 Kafka 消費操作 (避免阻塞主程式)
_ = Task.Run(async () =>
{
try
{
// 3-3. 不斷檢查是否有觸發生產者資料
while (!stoppingToken.IsCancellationRequested)
{
try
{
// 4-1. 處理生產資料,取出
var consumeResult = _consumer.Consume(TimeSpan.FromSeconds(1));
if (consumeResult != null && !consumeResult.IsPartitionEOF)
{
_logger.LogInformation($"收到 Kafka 訊息: 鍵={consumeResult.Message.Key}, 值={consumeResult.Message.Value}");
// 4-2. 處理訊息...
_kafkaConsumerService.AddReceivedMessage(new KafkaMessageViewModel()
{
Key = consumeResult.Message.Key,
Message = consumeResult.Message.Value,
Topic = consumeResult.Topic,
Offset = consumeResult.TopicPartitionOffset.Offset
});
}
}
catch (ConsumeException ex)
{
_logger.LogError($"消費時出錯: {ex.Error.Reason}");
// 4-3. While 內發生錯誤後暫停一段時間,避免迴圈過快消耗資源
await Task.Delay(5000, stoppingToken);
}
catch (OperationCanceledException)
{
// 4-4. 確定中止強制跳出
break;
}
catch (Exception ex)
{
_logger.LogError($"未預期錯誤: {ex.Message}");
// 4-5. While 內發生錯誤後暫停一段時間,避免迴圈過快消耗資源
await Task.Delay(5000, stoppingToken);
}
}
}
catch (Exception ex)
{
_logger.LogError($"消費線程發生未處理錯誤: {ex.Message}");
}
finally
{
taskCompletionSource.TrySetResult(true);
}
}, stoppingToken);
// 5. 主線程等待取消信號
await taskCompletionSource.Task;
}
catch (Exception ex)
{
_logger.LogError($"背景服務執行時發生未處理異常: {ex.Message}");
}
}
/// <summary>
/// 二、停止背景服務時 EX: 關閉、停用
/// </summary>
public override Task StopAsync(CancellationToken stoppingToken)
{
if (_isConnected)
{
_logger.LogInformation("正在關閉 Kafka 消費者");
_consumer?.Close();
_consumer?.Dispose();
}
return base.StopAsync(stoppingToken);
}
}
生產者工作 - 將從前端傳來的訊息,發送到 Kafka 上,前端可攜帶 TopicName, Key, Value
目標 Server Host 都會從 Appsettings.json 取得。
public class KafkaProducerService : IKafkaProducerService
{
private readonly IProducer<string, string> _producer;
private readonly KafkaConfigOptions _kafkaConfig;
/// <summary>
/// 1. 建構式,準備連線對象
/// </summary>
public KafkaProducerService(IOptions<KafkaConfigOptions> kafkaConfigOptions)
{
_kafkaConfig = kafkaConfigOptions.Value;
var config = new ProducerConfig
{
BootstrapServers = _kafkaConfig.BootstrapServers,
Acks = Acks.All // Acks 參數定義了生產者發送訊息後,需要多少個 Broker 確認訊息已被接收,才能視為成功發送。
};
_producer = new ProducerBuilder<string, string>(config).Build();
}
/// <summary>
/// 2. 生產者 : 發送到 Kafka Broker 上
/// </summary>
public async Task<KafkaProduceResult> ProduceMessageAsync(
string topic, string key, string message)
{
try
{
var deliveryResult = await _producer.ProduceAsync(
topic,
new Message<string, string>
{
Key = string.IsNullOrEmpty(key) ? Guid.NewGuid().ToString() : key,
Value = message
});
return new KafkaProduceResult
{
IsSuccess = true,
TopicPartitionOffset = deliveryResult.TopicPartitionOffset.ToString()
};
}
catch (ProduceException<string, string> ex)
{
return new KafkaProduceResult
{
IsSuccess = false,
ErrorMessage = ex.Error.Reason
};
}
}
}
消費者 - 會由被竟服務持續將值寫進 AP (Application Program)中
當前端需要資料顯示時再從 GetReceivedMessages() 方法返回。
public class KafkaConsumerService : IKafkaConsumerService
{
private readonly ConcurrentQueue<KafkaMessageViewModel> _receivedMessages = new ConcurrentQueue<KafkaMessageViewModel>();
private const int MaxStoredMessages = 100;
public List<KafkaMessageViewModel> GetReceivedMessages()
{
return _receivedMessages.ToList();
}
/// <summary>
/// 將訊息加入隊列中
/// </summary>
/// <param name="message"></param>
public void AddReceivedMessage(KafkaMessageViewModel message)
{
_receivedMessages.Enqueue(message);
// 保持訊息數量不超過上限
while (_receivedMessages.Count > MaxStoredMessages)
{
_receivedMessages.TryDequeue(out _);
}
}
}
包含了檢視生產者頁面、消費者頁面,以及生產者專用的建立生產者訊息API
/// <summary>
/// 1. View 頁面 - 生產者的資訊 (發送資料)
/// </summary>
public IActionResult Produce()
{
return View(new KafkaMessageViewModel());
}
/// <summary>
/// 2. 生產者 - 發送資料到 Kafka
/// </summary>
[HttpPost]
public async Task<IActionResult> Produce(KafkaMessageViewModel model)
{
// 1. 用 C# 特性,自動驗證 Model 上的欄位
if (ModelState.IsValid)
{
//2. 將資料傳送到 Kafka 上 - 關鍵是 Topic : 主題 ; 鍵值 : Key 才能正確傳送到對象
var result = await _producerService.ProduceMessageAsync(model.Topic, model.Key, model.Message);
//3. 返回回傳結果
if (result.IsSuccess)
{
TempData["SuccessMessage"] = $"訊息發送成功!位置: {result.TopicPartitionOffset}";
}
else
{
TempData["ErrorMessage"] = $"訊息發送失敗: {result.ErrorMessage}";
}
return RedirectToAction(nameof(Produce));
}
return View(model);
}
/// <summary>
/// 3. View 頁面 - 消費者的資訊 (當前收到的資料)
/// </summary>
public IActionResult Consume()
{
// 1. 取得發送過的 Kafka 資料 (這篇簡單示範,還有其他持久化消費對列的方式)
var receivedMessages = _consumerService.GetReceivedMessages();
return View(receivedMessages);
}
生產者頁面,利用 Partial 建立生產者訊息,並且發送後顯示發送的資訊。
@using KafkaAspCoreWebExample.Models
@model KafkaMessageViewModel
@{
ViewData["Title"] = "發送 Kafka 訊息";
}
<h1>發送 Kafka 訊息</h1>
@if (TempData["SuccessMessage"] != null)
{
<div class="alert alert-success">
@TempData["SuccessMessage"]
</div>
}
@if (TempData["ErrorMessage"] != null)
{
<div class="alert alert-danger">
@TempData["ErrorMessage"]
</div>
}
<div class="row">
<div class="col-md-6">
<div class="card">
<div class="card-header">
<h5>發送訊息表單</h5>
</div>
<div class="card-body">
<form asp-action="Produce" method="post">
<div class="form-group mb-3">
<label asp-for="Topic" class="control-label"></label>
<input asp-for="Topic" class="form-control" value="test-topic" />
<span asp-validation-for="Topic" class="text-danger"></span>
</div>
<div class="form-group mb-3">
<label asp-for="Key" class="control-label"></label>
<input asp-for="Key" class="form-control" />
<small class="form-text text-muted">如果不提供鍵值,將自動生成一個唯一識別碼</small>
</div>
<div class="form-group mb-3">
<label asp-for="Message" class="control-label"></label>
<textarea asp-for="Message" class="form-control" rows="5"></textarea>
<span asp-validation-for="Message" class="text-danger"></span>
</div>
<div class="form-group">
<button type="submit" class="btn btn-primary">發送訊息</button>
<a asp-action="Index" class="btn btn-secondary">返回</a>
</div>
</form>
</div>
</div>
</div>
<div class="col-md-6">
<div class="card">
<div class="card-header">
<h5>使用說明</h5>
</div>
<div class="card-body">
<p>填寫上面的表單以發送訊息到 Kafka:</p>
<ul>
<li><strong>主題</strong>:填寫您想要發送訊息的 Kafka 主題名稱</li>
<li><strong>鍵值</strong>:(可選)用於指定訊息的鍵,影響分區分配</li>
<li><strong>訊息內容</strong>:要發送的實際訊息</li>
</ul>
<p>發送訊息後,您可以在「接收訊息」頁面查看已接收的訊息。</p>
</div>
</div>
</div>
</div>
@section Scripts {
@{
await Html.RenderPartialAsync("_ValidationScriptsPartial");
}
}
消費者頁面,純檢視,互動只有刷新畫面
@using KafkaAspCoreWebExample.Models
@model List<KafkaMessageViewModel>
@{
ViewData["Title"] = "接收 Kafka 訊息";
}
<h1>接收 Kafka 訊息</h1>
<div class="mb-3">
<a asp-action="Index" class="btn btn-secondary">返回</a>
<a asp-action="Consume" class="btn btn-primary">重新整理</a>
</div>
@if (Model.Any())
{
<div class="table-responsive">
<table class="table table-striped table-bordered">
<thead>
<tr>
<th>接收時間</th>
<th>主題</th>
<th>鍵值</th>
<th>訊息內容</th>
<th>偏移量</th>
</tr>
</thead>
<tbody>
@foreach (var message in Model.OrderByDescending(m => m.ReceivedAt))
{
<tr>
<td>@message.ReceivedAt?.ToString("yyyy-MM-dd HH:mm:ss")</td>
<td>@message.Topic</td>
<td>@message.Key</td>
<td>@message.Message</td>
<td>@message.Offset</td>
</tr>
}
</tbody>
</table>
</div>
}
else
{
<div class="alert alert-info">
尚未接收到任何訊息。請確保消費者服務正在運行,並且已經有訊息發送到訂閱的主題。
</div>
}
<div class="card mt-4">
<div class="card-header">
<h5>使用說明</h5>
</div>
<div class="card-body">
<p>此頁面顯示已從 Kafka 接收的訊息。背景服務正在監聽配置的主題,並將接收到的訊息添加到此列表中。</p>
<p>點擊「重新整理」按鈕可以更新訊息列表。</p>
<p><strong>注意</strong>:只顯示最近接收的最多 100 條訊息。</p>
</div>
</div>
配置主機指向位置,其中 TopicName、ConsumerGroudId 會有其他持久化的方式保存,不會每個 AP 都固定
這不利於橫向擴展,這邊只是範例,寫在 Config 中
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"AllowedHosts": "*",
"KafkaConfig": {
"BootstrapServers": "192.168.51.100:9092",
"TopicName": "test-topic",
"ConsumerGroupId": "aspnet-core-consumer-group"
}
}
基本的依賴注入配置
using KafkaAspCoreWebExample.Services.Background;
using KafkaAspCoreWebExample.Services;
using KafkaAspCoreWebExample.Models;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddControllersWithViews();
// 1. 註冊 Kafka 配置選項
builder.Services.Configure<KafkaConfigOptions>(builder.Configuration.GetSection("KafkaConfig"));
// 2-1. 註冊 Kafka 服務
builder.Services.AddSingleton<IKafkaConsumerService, KafkaConsumerService>();
builder.Services.AddSingleton<IKafkaProducerService, KafkaProducerService>();
// 2-2. 註冊 Kafka 背景服務
builder.Services.AddHostedService<KafkaConsumerHostedService>();
var app = builder.Build();
// 略......
app.Run();
這邊使用的是 Kafka Confluent 套件,來實現與 kafka Server 的交互
※Confluent.Kafka 是免費且官方授權的,但是如果上到 Confluent Cloud 雲端託管,將會進行計費
使用 Visual Studio 開啟專案 -> 進入 Debug 模式 -> 進入 Kafka 首頁 -> 會有 2 個頁面可以操作
在頁面上填寫基本資訊 TopicName、Key,Value 後,點擊發送送出
送出成功後,會提示成功資訊
這時可以進入 Kafak-UI 上確認 TopicName,依序點擊
可以看到生產者的訊息
進入消費者檢視頁面,可以看到此消費者群組,消費該 TopicName 所有的生產者訊息
依序點擊 -> Consumer -> Consumer 群組Id
可以看到 Current Offset 與 End Offset 一致,表示此 Consumer 的群組,已經消費所有資訊了
這時再從新啟動專案 -> 進入 Debug 模式 -> 消費者檢視頁面
因為都已經被消費完畢,因此此群組無法再取得未消費的訊息
開啟專案的 Appsettings.json 檔,將 ConsumerGroupId 隨意改個名稱
"ConsumerGroupId": "aspnet-core-consumer-group2"
可以看到這個群組 aspnet-core-consumer-group2 進行所有訊息的重新消費
為何可以重複消費呢? => 因為Kafka 的設計理念是作為一個分佈式事件日誌系統,因此具有以下特性:
1. 偏移量跟踪: | Kafka 把消息存儲在持久化的日誌中 |
每個消費者群組維護自己的偏移量(offset),標記已讀取的位置 | |
更改 Consumer Group ID 等於創建了一個全新的消費者群組,沒有任何偏移量記錄 | |
2. 數據保留機制: | Kafka 會根據配置的保留策略保留消息(時間或大小限制) |
即使消息被消費,也不會被刪除,直到保留期過期 | |
這允許多個消費者群組獨立讀取相同的消息流 | |
3. 重新消費能力: | 消費者可以重置自己的偏移量到任何位置 |
創建新的 Consumer Group ID 會從配置的起始位置(通常是最早或最新)開始消費 |
Kafka 優先考慮高吞吐量和事件重放能力,因此都以持久化為核心,那麼成本勢必較保證一次性消費的 RabbitMQ 高出不少成本