Message Queue 概念
Message Queue 是常用的一種資料結構(Queue),是採用 FIFO(First In First Out) 資料先進先出的模式,來分散處理資料,可以是做為內部 RPC 溝通或內部微服務建立之訊息系統。
Message Queue 使用情境範例:
RabbitMQ
RabbitMQ 是熱門的 Message Queue 系統之一,是使用 Erlang 寫的,其連接工具包含多種程式語言。
https://www.rabbitmq.com
佈署 RabbitMQ Server
從 RabbitMQ 網站下載 Server 安裝後,若是 Windows 系統,則打開 RabbitMQ Service start 程式,連接端會預設從 localhost:5672 這裡服務。
Golang 的部分需要先裝 plugin:
go get github.com/streadway/amqp
使用 Golang 基本連接程序是先建立 RabbitMQ 連接,然後是建立一個 Channel,在本文章的程式中你可以盡量跳過去看這兩個步驟,省去讀程式的麻煩,
然後本文章指定的 err 為了避免程式碼過多混淆,不寫 if err != nil{...} ,程式碼中須自行實作錯誤處理。
角色說明
在 RabbitMQ 系統架構中,有 Exchange 和 Queue 兩個大角色,Exchange 是一個交換器,交換器可以控制訊息發送的方式,必須是 RabbitMQ 的三種訊息發送機制的其中一種 (稍後會作介紹),此 Exchange 底下就會有很多 Queue,所以如果訊息是希望分類、有機制性的發送,最好可以把 Queue 綁定到 Exchange 上。
Queue 如果沒有綁定任何 Exchange ,那訊息就只會是單純的 Work Queue,沒有任何機制可言。
RabbitMQ 的系統服務架構為下圖 (有 Exchange 綁定的 Queue):
圖中是一個 RabbitMQ 服務的架構,有綠色標記的區塊是說明 RabbitMQ 其中的一個訊息傳遞的模式: direct。
名詞解釋:
Producer: 訊息生產者 (就是 message sender 的程式)
Producing: 訊息發送處理
Routing Key: 從 Producer 發送的訊息,其中包含 RoutingKey,那可以幫助把訊息綁定到正確的 Exchange 上,然後對應 Binding Key 把訊息送到正確的 Queue 上。
Binding Key: Queue 在 Exchange 裡面建立一個 Binding Key,使訊息可以按照 Key 正確發送到 Queue
Consumer: 訊息接收者 (就是 client 的程式)
Consuming: 訊息接受處理
Exchange 交換器有三種訊息發送機制:
- Fanout 廣播機制: 針對該 Exchange 下所有 Queue 做廣播
- Direct 廣播到指定的 Binding Key: 針對 Exchange 下指定的 Binding Key 做廣播
- Topic 廣播道指定的 Binding Key 而且 Key 可以受到正規表達式匹配: 真對 Exchange 下匹配到字串的 Binding Key 做廣播
單純的 Work Queue
Queue 中的機制是 Producer(sender) 發送訊息丟到 Queue 中時,等待 Consumer(client) 來領取並且在領取完該 Queue 的單一值之後,把領過的值清掉 ;然而,在 RabbitMQ 中的 Queue 要是碰到關機或當機後,重新開機後 Queue 的內容會是清空的,解決此問題可以利用"持久化" 來處理,將會在後面的小節詳細說明。
需要特別注意廣播與單純的 Work Queue 不一樣,如果是有 Exchange 的模式,則如果出現兩個一樣的 Consumer Queue 在等待同一個 Key ,那麼兩個人會同時收到這個廣播訊息。
(注意,這裡是兩個不同的 Queue ID,但等待的 Key 相同)
但是如果是 Work Queue ,則若兩個人等待同一個 Queue ,則兩人會輪流接收到訊息(1,2,1,2...)。
(注意,這裡是兩個一樣的人,等待同一個 Queue ID)
從上圖可以看出來 Producer 傳遞訊息時, Consumer A,B 是如何取得訊息的。
發送端 go:
package main
import (
"fmt"
"github.com/streadway/amqp"
)
//this is producer
func main() {
//start connection
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
fmt.Println(err)
}
defer conn.Close()
//open channel
channel, err := conn.Channel()
if err != nil {
fmt.Println(err)
}
defer channel.Close()
//declare queue first
q, err := channel.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
//demo to send message to queue "hello"
err = channel.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte("Hello World!"),
})
fmt.Println("Message sent.")
}
接收端 go:
package main
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
func main() {
//open connection
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
fmt.Println(err)
}
defer conn.Close()
//open channel
channel, err := conn.Channel()
if err != nil {
fmt.Println(err)
}
defer channel.Close()
//declare queue first
q, err := channel.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
//declare want to receive message from queue "hello"
msgs, err := channel.Consume(
q.Name, // queue (q.Name is "hello")
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
fmt.Println(err)
}
//using go routine to wait msgs to receive
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
//blocking program in here, because we still want to receive value from msgs
forever := make(chan bool)
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
從程式碼中可以看到發送端或接收端都有在定義 channel 之後,做了 channel.QueueDeclare(),這是因為 RabbitMQ 的 Queue 或 Exchange 都是惰性的,若沒有先定義開出來一個 Queue ,則會在使用上出現錯誤,所以在一開始連接進 RabbitMQ 時不管是接收端或是發送端,都先開一個 Queue 出來等待 ,若先前 Queue 已經存在並在使用,這樣的 QueueDeclare() 也不會影響。
然後在接收端的 channel.Consume 其實就是在宣告要等待該 Queue 上面的訊息進來,然後會在 go func 的 go routine 中迭代 msgs 出來,連接中的時候 msgs 會持續 block 執行緒直到下一個訊息出來,而因為本文程式碼選擇使用 go routine 做等待,因此必須在最後程式做阻塞,用 golang 的 channel 機制或無窮迴圈塞住皆可。
另外,可以自行實驗,在上面的程式碼中,可以打開兩個 client 端來測試接收,測試發送訊息的接收優先順序。
Queue 的 QoS 機制
在分散式系統中,可能會發生訊息過多,導致工作量增倍的情形,這個時候可以使用 QoS 機制來要求 RabbitMQ 在該 Message 處理完之前,不要再塞訊息到 Consumer(Client) 來。
使用方式是在定義完 Channel 之後,在底下加入程式:
err = channel.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
交換器 Exchange 三種工作模式
RabbitMQ 有三種交換器的模式可以使用,其跟純 Work Queue 在程式上的差異是除了 QueueDeclare 之外,還需要在開一個 ExchangeDeclare() 然後把定義完的 Queue 綁定到 Exchange 上。
所以基本順序應該是在 channel 定義完之後做:
- ExchangeDeclare
- QueueDeclare
- QueueBind
Fanout 廣播模式
廣播模式會對該 Exchange 下不管是誰的 Queue 都廣播,而且需要注意,這個廣播機制是實時的,如果廣播的時候,Consumer 沒有開機來等待,就不會收到訊息(一般情況下會放在 queue 中等待領取)。
(注意,接收此廣播的 Queue ,其類型也應該為 fanout)。
發送端go:
package main
import (
"fmt"
"github.com/streadway/amqp"
)
//this is producer
func main() {
exchangeName := "school"
//start connection
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
fmt.Println(err)
}
defer conn.Close()
//open channel
channel, err := conn.Channel()
if err != nil {
fmt.Println(err)
}
defer channel.Close()
//declare EXCHANGE first
err = channel.ExchangeDeclare(
exchangeName,
"fanout",
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
//boradcast message (fanout mode)
err = channel.Publish(
exchangeName, // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte("Hello Everyone!!!!"),
})
fmt.Println("Message sent.")
}
接收端 go:
package main
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
func main() {
exchangeName := "school" //*********************
//open connection
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
fmt.Println(err)
}
defer conn.Close()
//open channel
channel, err := conn.Channel()
if err != nil {
fmt.Println(err)
}
defer channel.Close()
//declare EXCHANGE first
err = channel.ExchangeDeclare(
exchangeName,
"fanout",
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
//must declare queue
//if you dont given name, then it will randomly given one.
q, err := channel.QueueDeclare(
"", // name
false, // durable
false, // delete when usused
true, // exclusive
false, // no-wait
nil, // arguments
)
//bind queue to target exchange of queue.
err = channel.QueueBind(
q.Name, // queue name
"", // routing key
exchangeName, // exchange
false,
nil)
//declare want to receive message from queue "xxxxx"
msgs, err := channel.Consume(
q.Name, // queue (q.Name is randomly, because we didn't given any name.)
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
//using go routine to wait msgs to receive
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
//blocking program in here, because we still want to receive value from msgs
forever := make(chan bool)
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
從程式碼中可以發現這裡的 QueueDeclare 已經不在是固定的名稱,表示著其實使用 Exchange 的情景可能是很多的 Consumer ,而且使用各自獨立不共用的 Queue 來處理。
Direct 指定廣播模式
Direct 指定廣播模式下,只對 Binding Key 完全匹配的 Queue 做廣播,注意,他是從 Exchange 去分配訊息到 Queue 的運作方式;Binding Key 的說明會在程式碼之後解說。
實驗的部分請注意,本文章使用的 exchange 名稱都是 "school" ,但由於執行先前的 fanout 廣播模式時, RabbitMQ 已經存在上一個 fanout 的 exchange 叫做 "school" ,因此請您自行該改
exchangeName 變數,使各實驗的 exchangeName 不要一樣,否則將會收不到任何訊息。
發送端 go:
package main
import (
"fmt"
"github.com/streadway/amqp"
)
//this is producer
func main() {
exchangeName := "school" //**************
directName := "class.3-3" //*************
//start connection
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
fmt.Println(err)
}
defer conn.Close()
//open channel
channel, err := conn.Channel()
if err != nil {
fmt.Println(err)
}
defer channel.Close()
//declare EXCHANGE first
err = channel.ExchangeDeclare(
exchangeName,
"direct",
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
//direct message (direct mode)
err = channel.Publish(
exchangeName, // exchange
directName, // routing key (queue name)
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte("Hello Class 3-3 Student!"),
})
fmt.Println("Message sent.")
}
接收端 go:package main
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
func main() {
exchangeName := "school"
directName := "class.3-3"
//open connection
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
fmt.Println(err)
}
defer conn.Close()
//open channel
channel, err := conn.Channel()
if err != nil {
fmt.Println(err)
}
defer channel.Close()
//declare EXCHANGE first
err = channel.ExchangeDeclare(
exchangeName,
"direct",
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
//must declare queue
//if you dont given name, then it will randomly given one.
q, err := channel.QueueDeclare(
"", // name
false, // durable
false, // delete when usused
true, // exclusive
false, // no-wait
nil, // arguments
)
//bind queue to target exchange of queue.
err = channel.QueueBind(
q.Name, // queue name
directName, // routing key
exchangeName, // exchange
false,
nil)
//declare want to receive message from queue "xxxxx"
msgs, err := channel.Consume(
q.Name, // queue (q.Name is randomly, because we didn't given any name.)
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
//using go routine to wait msgs to receive
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
//blocking program in here, because we still want to receive value from msgs
forever := make(chan bool)
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
從程式碼中可以特別注意到 Binding Key 其實就是指要收到的 Routing Key ,這個 Key 是最上面的 directName 叫做 "class.3-3" ,在接收端來看,在 Queue 綁定到 Exchange 的時候特別在 Routing Key 那個參數位置放上了 "class.3-3" ,其實綁定後,就是一個 Binding Key 與 Exchange 的連結。
然後在發送端的 Publish 就特別在參數中放入了 Routing Key "class.3-3" 指定要廣播給有 "class.3-3" Binding Key 的 Queue。
Topic 指定匹配廣播模式
Topic 跟 Direct 是相似的,差別只在於 Direct 只對完全匹配到字串 "class.3-3" 的 Queue 做發送,Topic 匹配的方式可以是一個表達式,像是 "3-grade.*",這樣就會匹配到 "3-grade"(三年級) 下的全部班級,像是 "3-grade.class3","3-grade.class2","3-grade.class1" 這樣,但 "3-grade.*" 只能匹配到 點 (.) 後面的一個值,如果想匹配 "3-grade.class2.response.teacher", "3-grade.class2.studnet" 本字串裡面所有 response 底下的人 (情景就是負責 3-2 班的老師們) 以及學生們,則使用 "3-grade.class2.#" 的表達式。
而 Direct 必須要完全匹配 "class.3-3" 才能廣播,如果是 "class.3-3.stuednet" 則沒辦法被 direct 廣播到。
然而,在使用 Topic 的匹配模式時,需要注意,匹配的字不是從 sender 送出 routine key去匹配,而是在 clinet 端綁定 queue 到 exchange 時,其 binding key 才是匹配的字串,如下圖:
發送端 go:
package main
import (
"fmt"
"github.com/streadway/amqp"
)
//this is producer
func main() {
exchangeName := "schoola"
topicName := "school.3-grade.TimeToLeaveSchool"
//start connection
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
fmt.Println(err)
}
defer conn.Close()
//open channel
channel, err := conn.Channel()
if err != nil {
fmt.Println(err)
}
defer channel.Close()
//declare EXCHANGE first
err = channel.ExchangeDeclare(
exchangeName,
"topic",
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
//direct message (direct mode)
err = channel.Publish(
exchangeName, // exchange
topicName, // routing key (queue name)
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte("Hello 3 Grade Student! you can leave school!"),
})
fmt.Println("Message sent.")
}
接收端 go:
package main
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
func main() {
exchangeName := "schoola"
topicName := "school.3-grade.*" //"#" means all, "*" means the one after dot "."
//open connection
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
fmt.Println(err)
}
defer conn.Close()
//open channel
channel, err := conn.Channel()
if err != nil {
fmt.Println(err)
}
defer channel.Close()
//declare EXCHANGE first
err = channel.ExchangeDeclare(
exchangeName,
"topic",
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
//must declare queue
//if you dont given name, then it will randomly given one.
q, err := channel.QueueDeclare(
"", // name
false, // durable
false, // delete when usused
true, // exclusive
false, // no-wait
nil, // arguments
)
//bind queue to target exchange of queue.
err = channel.QueueBind(
q.Name, // queue name
topicName, // routing key
exchangeName, // exchange
false,
nil)
//declare want to receive message from queue "xxxxx"
msgs, err := channel.Consume(
q.Name, // queue (q.Name is randomly, because we didn't given any name.)
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
//using go routine to wait msgs to receive
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
//blocking program in here, because we still want to receive value from msgs
forever := make(chan bool)
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
從程式碼可以看到,匹配不是從 sender 用 routine key 的表達式(. , #)去匹配 binding key 的字串,而是 binding key 看到 routine key 的字串去匹配應該分配給哪個 binding key 的 queue 訊息。
RabbitMQ 消息持久化
在上面的小節提及到 RabbitMQ 在關機後或當機重開時,訊息佇列會被清空掉,要解決這個問題,則可以使用持久化機制,方法很簡單,就是把 QueueDeclare 或 ExchangeDeclare 裡面的 durable 參數改為 ture 即可。
程式碼為:
q, err := ch.QueueDeclare(
"hello", // name
ture, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
err = channel.ExchangeDeclare(
"school",
"topic",
ture, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
可以看到 queue 或 exchange 程式碼中的 durable 參數改為 true,此後消息就會盡量儲存在硬碟空間中,但官方也說明不保證會在當機時復原所有完整的消息。
也就是說至少會有幾個情形:
- 如果是崩潰導致 RabbitMQ 重新啟動,不一定能夠救回訊息
- 訊息必須已經傳到有持久化設定的 Exchange 上
- 訊息必須已經傳到有持久化設定的 Queue 上
常見狀況
本文章的實驗其中 exchange 名稱幾乎都一樣,名稱一樣的 exchange 無法被重新定義,所以在 fanout, direct, topic 實驗時,要注意 exchange 應該自行修改為不一樣的名稱。
Reference:
https://www.rabbitmq.com/getstarted.html
https://www.rabbitmq.com/tutorials/tutorial-two-go.html
https://www.cnblogs.com/vipstone/p/9275256.html
https://blog.csdn.net/sinat_18882775/article/details/50964471
https://www.cnblogs.com/limich/p/7477200.html
https://andyyou.github.io/2017/06/08/rabbitmq-notes/
沒有留言:
張貼留言