消息队列(Message Queue)是一种应用间的通信方式,一种应用间的异步协作机制。消息的生产者只需将消息发布到 MQ 中,消息消费者只需要从 MQ 中获取消息消费,消息的可靠性由消息系统来保证。
RabbitMQ 简介
RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。
RabbitMQ 特点
- 可靠性(Reliability)
- 灵活的路由(Flexible Routing)
- 消息集群(Clustering)
- 高可用(Highly Available Queues)
- 多种协议(Multi-protocol)
- 多语言客户端(Many Clients)
- 管理界面(Management UI)
- 跟踪机制(Tracing)
- 插件机制(Plugin System)
Exchange 类型
类型 | 描述 |
---|
direct | 路由键完全匹配,单播 |
topic | 路由键模式匹配,路由键可以包含通配符:”#”、”*” |
fanout | 不处理路由键,广播,转发消息最快 |
headers | 不处理路由键,根据发送的消息内容中的 headers 属性进行匹配,其他跟 direct 模式完全一致,性能较其他模式差很多 |
部署
容器化部署,简单方便
docker run -d --hostname rabbit1 --name rabbitmq1 -p 9419:9419 -p 8080:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.7.14-management
docker run -d --hostname rabbit2 --name rabbitmq2 -p 5673:5672 --link rabbitmq1:rabbit1 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.7.14-management
docker run -d --hostname rabbit3 --name rabbitmq3 -p 5674:5672 --link rabbitmq1:rabbit1 --link rabbitmq2:rabbit2 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.7.14-management
docker exec -it rabbitmq1 bash rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl start_app exit
docker exec -it rabbitmq2 bash rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster --ram rabbit@rabbit1 rabbitmqctl start_app exit
docker exec -it rabbitmq3 bash rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster --ram rabbit@rabbit1 rabbitmqctl start_app exit
|
RabbitMQ Exporter
repo: https://github.com/kbudde/rabbitmq_exporter
Start RabbitMQ Exporter:
RABBIT_EXPORTERS=exchange,node,queue RABBIT_CAPABILITIES=bert,no_sort RABBIT_USER=guest RABBIT_PASSWORD=guest OUTPUT_FORMAT=JSON PUBLISH_PORT=8082 RABBIT_URL=http://127.0.0.1:8080 MAX_QUEUES=5000 nohup rabbitmq_exporter &
|
Start in container:
docker run -d --net=container:my-rabbit kbudde/rabbitmq-exporter
|
Golang 客户端 Demo
配置
{ "URL": "amqp://guest:guest@10.5.124.213:5672/", }
|
Client
package rabbitmq
import ( log "github.com/sirupsen/logrus" "github.com/streadway/amqp" "time" )
type Config struct { URL string QueueList map[string][]string }
type Client struct { Config Connection *amqp.Connection Channel *amqp.Channel }
const ( reconnectDelay = 3 * time.Second retryNum = 5 DefaultExchangeName = "default" DefaultQueueName = "default" )
func (c *Client) connect() { var ( err error ) log.WithFields(log.Fields{ "module": "RabbitMQ", }).Info("Attempting to connect.") c.Connection, err = amqp.Dial(c.URL) if err != nil { log.WithFields(log.Fields{ "module": "RabbitMQ", "err": err, }).Error("Failed to connect to RabbitMQ.") return }
c.Channel, err = c.Connection.Channel() if err != nil { log.WithFields(log.Fields{ "module": "RabbitMQ", "err": err, }).Error("Failed to open a channel.") return } }
func (c *Client) isConnected() bool { if c.Connection.IsClosed() || c.Channel == nil { return false } return true } func (c *Client) handleConnect() { for index := 0; index < retryNum; index++ { c.connect() if c.isConnected() { break } else { log.WithFields(log.Fields{ "module": "RabbitMQ", }).Warn("Failed to connect. Retrying...") time.Sleep(reconnectDelay) } } if c.isConnected() { log.WithFields(log.Fields{ "module": "RabbitMQ", }).Info("Connect succeed.") } else { log.WithFields(log.Fields{ "module": "RabbitMQ", }).Fatal("Connect failed.") } }
|
Producer
type Producer struct { Client }
func NewProducer() *Producer { rabbitConfig := new(Config) err := util.ReadConfig("RabbitMQ", rabbitConfig) if err != nil { log.WithFields(log.Fields{ "module": "RabbitMQ", "err": err, }).Fatal("Read config error.") }
producer := &Producer{} producer.URL = rabbitConfig.URL
producer.handleConnect()
return producer }
func (p *Producer) UnsafePushExchange(data map[string]string, exchangeName string) error { if !p.isConnected() { p.handleConnect() p.initExchange() } var ( bytesData []byte err error )
if data == nil { return nil }
publishData := Msg{ Data: data, Time: util.DecodeTime(time.Now().UTC()), }
bytesData, err = json.Marshal(publishData) if err != nil { log.WithFields(log.Fields{ "module": "RabbitMQ", "data": data, "err": err, }).Error("Failed to marshal a message.") return err }
err = p.Channel.Publish( exchangeName, "", false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, ContentType: "application/json", Body: bytesData, }) if err != nil { log.WithFields(log.Fields{ "module": "RabbitMQ", "err": err, }).Error("Failed to publish a message.") } else { log.WithFields(log.Fields{ "module": "RabbitMQ", "data": publishData, }).Info("Publish a message.") } return err }
|
Consumer
type Consumer struct { Client }
func NewConsumer(queueName string) *Consumer { rabbitConfig := new(Config) err := util.ReadConfig("RabbitMQ", rabbitConfig) if err != nil { log.WithFields(log.Fields{ "module": "RabbitMQ", "err": err, }).Fatal("Read config error.") }
if queueName == "" { queueName = DefaultQueueName }
consumer := &Consumer{ queueName: queueName, } consumer.URL = rabbitConfig.URL consumer.QueueList = rabbitConfig.QueueList
consumer.handleConnect()
consumer.initQueue()
return consumer }
func (c *Consumer) Receive(forever chan struct{}) { if !c.isConnected() { c.handleConnect() }
msgs, err := c.Channel.Consume( c.queueName, "", false, false, false, false, nil, ) if err != nil { log.WithFields(log.Fields{ "module": "RabbitMQ", "err": err, }).Info("Failed to register a consumer.") }
go func() { for msg := range msgs { if !c.isConnected() { c.handleConnect() } c.handle(msg.Body) msg.Ack(false) } }()
<-forever log.Info("Consumer is Done.") }
|
参考资料