縮網址服務實作記錄(3) - 使用 RabbitMQ 以事件驅動的方式分析縮網址的資訊來源


縮網址服務為 https://url-ins.com/shorten/ 🔗 ,有任何想法或回饋,可以在 SurveyCake 🔗 留下寶貴的意見。(為了維持主機的維運,在頁面內放入 Google Adsense 廣告。)

在完成短網址的轉址服務,接下來,就要開始針對轉址的請求來源與動作,進行數據的統計。

數據的統計,往往會依需求開始無限的延展。從最簡單的總訪問次數,時段訪問次數,訪問來源地區等等。

最快的方式,當然是直接寫成現成轉址的 API 中,但個人覺得這種作法,對後續開發/維運都不是一種好的方式。

其一、直接在 Web API 內實作數據統計的情境。每當只是增加一種訪問數據分析,就要去異常到穩定運行服務。此方式的修改成本包含但不限於:改 A 壞 B 的風險、再測驗的 QA 成本、部署的成本、等等。

其二、轉址用的 API 的 HTTP Method 為 GET,在 GET 的行為內,夾雜著 INSERT/UPDATE 的行為。在後續維護、盤查、或調整程式時,都有很大機率遺漏這個地方。

設計架構時,由於已確定訪問數據分析會有多次調整。為了減少後續的開發維護成本,透過生產者與消費者模型(producer-consumers pattern),讓服務與數據分析的行為解耦。且數據分析結果無須即時呈現。

最終採用 Message Queue 的機制,並使用 RabbitMQ 作為 Message Queue 的解決方案。

這一篇文章的內容,主要的內容包含以下項目。

  • RabbitMQ 的安裝設定、與基本使用說明。
  • 在 ASP.NET Core Web API 中發送事件。
  • 使用 .NET Core Console 串接 RabbitMQ 事件。

開發環境

開發框架: .NET 8 SDK 解決方案: RabbitMQ 3.12.9

RabbitMQ 的安裝、設定與使用

現行的 Message Queue 解決方案有很多,例如 RabbitMQ、Apache ActiveMQ、Apache RocketMQ 等等,但在這邊,基於使用情境的考量,選擇使用 RabbitMQ。

運行 RabbitMQ 的 Container

RabbitMQ 的 Docker Image 可以直接從 Docker Hub 🔗 取得。

在這邊,我們選擇使用 rabbitmq:3-management 這個 Tag 版本,方便使用 RabbitMQ 提供的 Web 管理站台。

若依 RabbitMQ 在 Docker Hub 的 Overview 內的文件,直接使用官方的範例,只會得到一個無法進入使用者操作介面的 Container。

docker run -d --hostname my-rabbit --name some-rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password rabbitmq:3-management

這是因為範例中,並沒有指定 Bind 的 Port,而且在 Docker Hub 的 Overview 內的文件,並沒有詳細的說明使用 PORT。

當進一步查看 RabbitMQ Tag 為 latest 內容時,可以看到 RabbitMQ 的 Expose 設定。它開放 15691/TCP、15692/TCP、25672/TCP、4369/TCP、5671/TCP、5672/TCP 這 6 個 PORT。

rabbitmq:latest 預設開放的 Exposed Ports
rabbitmq:latest 預設開放的 Exposed Ports
rabbitmq:3-management 開放的 Exposed Ports
rabbitmq:3-management 開放的 Exposed Ports
  • 15671: 用於 HTTP API clients, 管理介面與 rabbitmqadmin 使用。HTTPS(TLS)
  • 15672: 用於 HTTP API clients, 管理介面與 rabbitmqadmin 使用。HTTP(no-TLS)
  • 15691: 提供 Prometheus metrics 使用。HTTPS(TLS)
  • 15692: 提供 Prometheus metrics 使用。HTTP(no-TLS)
  • 25672: RabbitMQ Node 內部通訊使用。
  • 4369: 用於 RabbitMQ Node 與 CLI Tools。
  • 5671: AMQP 1.0 用戶端,TLS 加密
  • 5672: AMQP 0-9-1 用戶端,不加密。

除了使用 AMQP 0-9-1 收發訊息外,同時使用使用介面。因此指定 5672 與 15672 兩個 PORT。

docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password rabbitmq:3-management
RabbitMQ 啟動後,Web 操作畫面
RabbitMQ 啟動後,Web 操作畫面
當點開 Ports and contexts 時,也會看到 Port 的用途
當點開 Ports and contexts 時,也會看到 Port 的用途

CloudAMQP 🔗 提供的流程圖,當 Producer 準備發送訊息給 RabbitMQ 時,首先會建立一個 channel,並在 channel 內指定使用的 ExchangeRouteKey。訊息會依不同 Exchange 類型,來判斷要派發到那一個 Queue

而 Consumer 要從 Queue 取得訊息時,也需要先建立 channel,並告知要從那一個 Queue 取得訊息。

AMQP (圖片來源: https://www.cloudamqp.com)
AMQP (圖片來源: https://www.cloudamqp.com)

Exchange 交換器

首先,來聊聊 Exchange,它是訊息的路由器,負責接收 Producer 訊息,並依 Exchange Type 與 Route Key 來決定將訊息派發到某個 Queue 之中。

Exchange Type 共有 DirectFanoutTopicHeader 四種類型。

Exchange typeDefault pre-declared names說明
Direct(Empty string) and amq.direct最簡單的使用方式,直接使用 Route Key 來派發到對應的 Queue
Fanoutamq.fanout將訊息發到所有 Bind 此 Exchange 的 Queue
Topicamq.topic通常用於 publish/subscribe pattern 的使用情境
Headersamq.match (and amq.headers in RabbitMQ)會忽略 Route Key,使用 Header 內的多個屬性來進行路由。
RabbitMQ Management 的 Exchange 頁面
RabbitMQ Management 的 Exchange 頁面

從 RabbitMQ Management 的 Exchange 頁面,可以看預設的 Exchange。

如果預設的 Exchange 不符合使用,可以直接在 Add a new exchange 區塊進行設定,也可以直接在程式中宣告。

// 宣告一個 name = logs-direct 且 Direct type 的 exchange
channel.ExchangeDeclare("logs-direct", ExchangeType.Direct);

Queue 佇列

Queue 是訊息的暫存區,用來存放訊息,直到 Consumer 前來處理。一個 Queue 可以綁定到一個或多個 Exchange。

RabbitMQ Management 的 Queue 頁面
RabbitMQ Management 的 Queue 頁面
channel.QueueDeclare(queue: "task_queue",
                     durable: true,
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);

在宣告 Queue 時,使用的參數如下

參數名稱說明
durable當重新啟動 RabbitMQ 後,Queue 是否還存在
exclusive獨佔模式,只能給一個連線使用,當該連線關閉後,自行刪除 Queue
autoDelete當最後一個 Consumer 離開 Queue 後,Queue 會自行刪除
arguments選項,可用於設定訊息的長度限制、TTL 等等

Channel 通道

Channel 是實際進行訊息發布、消費的通道。Connection 下可以開啟多個 Channel 進行操作。

var channel = connection.CreateModel(); // 建立 channel
channel.BasicPublish(...);

實際應用時,若 Queue 使用預設 Direct 的 Exchange, 在使用時,直接指定 BasicPublish 內的 exchange 為空字串即可。

var channel = connection.CreateModel();

channel.QueueDeclare("queue1", true, false, false, null);

channel.BasicPublish(exchange: "", routingkey: "routingKey", null, body);

若是要使用其他的 Exchange,則需先宣告 Exchange 和 Queue,並進行綁定。

var channel = connection.CreateModel();

channel.ExchangeDeclare("logs-direct", ExchangeType.Direct);

channel.QueueDeclare("queue1", true, false, false, null);
channel.QueueBind("queue1", "logs-direct", "routingKey");

channel.BasicPublish("logs-direct", "routingKey", null, body);

Producer: 從 ASP.NET Core Middleware 發送事件

ASP.NET Core Web API 作為 Event 的發送端,我們需要決定在什麼時機點進行 Event 訊息的發送。

考量只有呼叫轉址 API 的當下,才觸發分析事件,所以將發送 Event 訊息的時機點,定在 ApiControler 之中。

首先,先安裝 RabbitMQ.Client 的 Nuget 套件。

dotnet add package RabbitMQ.Client --version 6.7.0

接下來,我們希望使用 RabbitMQ.Client 時,可以使用 DI 的方式,直接取得 RabbitMQ 的 Connection

Program.cs 中,進行 RabbitMQ.Client.IConnection 的 DI 注入。

services.AddSingleton<RabbitMQ.Client.IConnection>(sp =>
{
    var factory = new RabbitMQ.Client.ConnectionFactory() { HostName = "localhost" };
    return factory.CreateConnection();
});

最後,就可以到 API Controller 中,進行 Event 訊息的發送。

此時,我們將請求來源的 IP 與 id 作為 Event 的 payload,發送到 redirect-event 的 RabbitMQ Queue 之中。

private readonly IConnection _connection;

public MyController(IConnection connection)
{
    this._connection = connection;

}

[HttpGet("{id}")]
public IActionResult Redirect(string id)
{
	var model = new
	{
	    Id = id,
	    IP = this.HttpContext.Connection.RemoteIpAddress?.ToString()
	};
    var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(model));

	var channel = connection.CreateModel();

	// 宣告 Queue
	channel.QueueDeclare(routingKey,
						 durable: true,
						 exclusive: false,
						 autoDelete: false,
						 arguments: null);

	// 發送訊息
    channel.BasicPublish(exchange: "",
						 routingKey: "redirect-event",
						 basicProperties: null,
						 body: body);
	channel.close();

    return Ok();
}

當外部呼叫 API 時,就會將 ID 與 IP 資料物件,轉為 JSON 發送到 redirect-event 的 RabbitMQ Queue。

Consumer: 使用 .NET Core Console 監控並處理分析事件

完成了 Event 訊息的發送到後,接下來就要來要實作 Consumer,負責處理 Event 事件。

而 Consumer 的部份,選擇使用 Console Application 開發。

老樣子,先安裝 RabbitMQ.Client 的 Nuget 套件,接著 Program.cs 內,進行 RabbitMQ Customer 的設定,以便接收 Event。

RabbitMQ 官方很貼心的提供了 Example,調整 Example Code 的內容,就可以直接使用。

// 設定 connection 與 channel
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

// 設定 channel 一次只能取一筆訊息
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

// 設定 consumer
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
	// 取得 Body 資料,並轉為字串
    byte[] body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);

	// import: 實作處理資料的區塊
	Process(message);

    // here channel could also be accessed as ((EventingBasicConsumer)sender).Model
    // 回報 RabbitMQ, 特定 delivery Tag 的 Event 處理完成。
    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};

// 開始接收訊息
channel.BasicConsume(queue: "task_queue",
                     autoAck: false,
                     consumer: consumer);

Console.ReadLine();

要注意的是,如果指定的 Queue Name 不存在,在運行到 channel.BasicConsume 時,會出現以下的錯誤訊息

RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text='NOT_FOUND - no queue 'task_queue' in vhost '/'', classId=60, methodId=20

使用 docker compose 建立一個完整服務

我們分別將 Web API、RabbitMQ、Console App 三個服務包為 Docker Image,透過 docker-compose 整合,達到 Web API 發送訊息,Console App 處理訊息的流程。

下述 docker-compose.yml 為本機環境的設定檔。提供參考。

version: '3'

services:
  webapi:
    image: registry.gitlab.com/url-insight/url-insight/api:latest
    restart: always
    expose: 
      - 80
      - 443
    environment:
      - ASPNETCORE_ENVIRONMENT=Production
      - ConnectionStrings:${DB_ConnctionString}
    networks:
      - backend-api
    depends_on:
      - postgres
  
  postgres:
    image: postgres:16
    restart: always
    environment:
      - POSTGRES_USER=${PSG_USER}$
      - POSTGRES_PASSWORD=${PSG_PWD}
    expose:
      - 5432
    volumes:
      - postgres-data:/var/lib/postgresql/data
    networks:
      - backend

  rabbit:
    image: rabbitmq:3-management
    restart: always
    ports:
      - 15672:15672
      - 5672:5672
    networks:
      - backend-api

  event-proc:
    image: url-insight/event-processor:latest
    restart: always
    environment:
      - ConnectionStrings:${DB_ConnctionString}
      - RabbitMQ:Host=rabbit
    networks:
      - backend-api
    depends_on:
      - rabbit

networks:
  backend-api:

volumes:
  postgres-data:

補充資料

▶ 延伸閱讀

▶ 外部文章


Series
縮網址服務實作記錄