Golang 消息队列之 RabbitMQ

消息队列(Message Queue)是一种应用间的通信方式,一种应用间的异步协作机制。消息的生产者只需将消息发布到 MQ 中,消息消费者只需要从 MQ 中获取消息消费,消息的可靠性由消息系统来保证。

RabbitMQ 简介

RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。

RabbitMQ 特点

  1. 可靠性(Reliability)
  2. 灵活的路由(Flexible Routing)
  3. 消息集群(Clustering)
  4. 高可用(Highly Available Queues)
  5. 多种协议(Multi-protocol)
  6. 多语言客户端(Many Clients)
  7. 管理界面(Management UI)
  8. 跟踪机制(Tracing)
  9. 插件机制(Plugin System)

Exchange 类型

类型描述
direct路由键完全匹配,单播
topic路由键模式匹配,路由键可以包含通配符:”#”、”*”
fanout不处理路由键,广播,转发消息最快
headers不处理路由键,根据发送的消息内容中的 headers 属性进行匹配,其他跟 direct 模式完全一致,性能较其他模式差很多

部署

容器化部署,简单方便

docker run -d --hostname rabbit1 --name rabbitmq1 -p 9419:9419 -p 8080:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.7.14-management

docker run -d --hostname rabbit2 --name rabbitmq2 -p 5673:5672 --link rabbitmq1:rabbit1 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.7.14-management

docker run -d --hostname rabbit3 --name rabbitmq3 -p 5674:5672 --link rabbitmq1:rabbit1 --link rabbitmq2:rabbit2 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.7.14-management

docker exec -it rabbitmq1 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
exit

docker exec -it rabbitmq2 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rabbit1
rabbitmqctl start_app
exit

docker exec -it rabbitmq3 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rabbit1
rabbitmqctl start_app
exit

RabbitMQ Exporter

repo: https://github.com/kbudde/rabbitmq_exporter

Start RabbitMQ Exporter:

RABBIT_EXPORTERS=exchange,node,queue RABBIT_CAPABILITIES=bert,no_sort RABBIT_USER=guest RABBIT_PASSWORD=guest OUTPUT_FORMAT=JSON PUBLISH_PORT=8082 RABBIT_URL=http://127.0.0.1:8080 MAX_QUEUES=5000 nohup rabbitmq_exporter &

Start in container:

docker run -d --net=container:my-rabbit kbudde/rabbitmq-exporter

Golang 客户端 Demo

配置

{
"URL": "amqp://guest:guest@10.5.124.213:5672/",
}

Client

package rabbitmq

import (
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"time"
)

type Config struct {
URL string
QueueList map[string][]string
}

type Client struct {
Config
Connection *amqp.Connection
Channel *amqp.Channel
}

const (
reconnectDelay = 3 * time.Second // reconnectDelay
retryNum = 5 // retryNum
DefaultExchangeName = "default"
DefaultQueueName = "default"
)

func (c *Client) connect() {
var (
err error
)
log.WithFields(log.Fields{
"module": "RabbitMQ",
}).Info("Attempting to connect.")
c.Connection, err = amqp.Dial(c.URL)
if err != nil {
log.WithFields(log.Fields{
"module": "RabbitMQ",
"err": err,
}).Error("Failed to connect to RabbitMQ.")
return
}

c.Channel, err = c.Connection.Channel()
if err != nil {
log.WithFields(log.Fields{
"module": "RabbitMQ",
"err": err,
}).Error("Failed to open a channel.")
return
}
}

func (c *Client) isConnected() bool {
if c.Connection.IsClosed() || c.Channel == nil {
return false
}
return true
}
func (c *Client) handleConnect() {
// retry connect
for index := 0; index < retryNum; index++ {
c.connect()
if c.isConnected() {
break
} else {
log.WithFields(log.Fields{
"module": "RabbitMQ",
}).Warn("Failed to connect. Retrying...")
time.Sleep(reconnectDelay)
}
}
if c.isConnected() {
log.WithFields(log.Fields{
"module": "RabbitMQ",
}).Info("Connect succeed.")
} else {
log.WithFields(log.Fields{
"module": "RabbitMQ",
}).Fatal("Connect failed.")
}
}

Producer

type Producer struct {
Client
}

func NewProducer() *Producer {
rabbitConfig := new(Config)
err := util.ReadConfig("RabbitMQ", rabbitConfig)
if err != nil {
log.WithFields(log.Fields{
"module": "RabbitMQ",
"err": err,
}).Fatal("Read config error.")
}

producer := &Producer{}
producer.URL = rabbitConfig.URL

producer.handleConnect()

return producer
}

func (p *Producer) UnsafePushExchange(data map[string]string, exchangeName string) error {
// check connect
if !p.isConnected() {
p.handleConnect()
p.initExchange()
}
// parse data
var (
bytesData []byte
err error
)

if data == nil {
return nil
}

publishData := Msg{
Data: data,
Time: util.DecodeTime(time.Now().UTC()),
}

bytesData, err = json.Marshal(publishData)
if err != nil {
log.WithFields(log.Fields{
"module": "RabbitMQ",
"data": data,
"err": err,
}).Error("Failed to marshal a message.")
return err
}

err = p.Channel.Publish(
exchangeName, // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "application/json",
Body: bytesData,
})
if err != nil {
log.WithFields(log.Fields{
"module": "RabbitMQ",
"err": err,
}).Error("Failed to publish a message.")
} else {
log.WithFields(log.Fields{
"module": "RabbitMQ",
"data": publishData,
}).Info("Publish a message.")
}
return err
}

Consumer

type Consumer struct {
Client
}

func NewConsumer(queueName string) *Consumer {
rabbitConfig := new(Config)
err := util.ReadConfig("RabbitMQ", rabbitConfig)
if err != nil {
log.WithFields(log.Fields{
"module": "RabbitMQ",
"err": err,
}).Fatal("Read config error.")
}

if queueName == "" {
queueName = DefaultQueueName
}

consumer := &Consumer{
queueName: queueName,
}
consumer.URL = rabbitConfig.URL
consumer.QueueList = rabbitConfig.QueueList

consumer.handleConnect()

consumer.initQueue()

return consumer
}

func (c *Consumer) Receive(forever chan struct{}) {
// check connect
if !c.isConnected() {
c.handleConnect()
}

msgs, err := c.Channel.Consume(
c.queueName, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.WithFields(log.Fields{
"module": "RabbitMQ",
"err": err,
}).Info("Failed to register a consumer.")
}

go func() {
for msg := range msgs {
if !c.isConnected() {
c.handleConnect()
}
c.handle(msg.Body)
msg.Ack(false)
}
}()

<-forever
log.Info("Consumer is Done.")
}

参考资料