在我的新课程中,使用到了rabbitMQ消息队列
使用消息队列是为了将我们的服务进行解耦
并且支持高并发
消息队列的优势

解耦
例如在我的威胁诱捕项目中,系统服务和矩阵管理服务只需要发布订阅消息
每个节点去订阅消息即可,这样三个服务之间就完全解耦了
异步
使用消息队列还有一个特别明显的好处,那就是可以将同步的逻辑异步化
削峰
在未使用消息队列的系统中,系统面对突发大流量会导致系统崩溃。
在使用消息队列之后,可以将大量消息积压在消息队列中,下游服务慢慢的从消息队列中消费即可
只要是消息队列,上面的优势都有,只是具体的技能点不一样
rabbitMQ安装
rabbitMQ的安装非常简单
version: '3' # 使用Docker Compose版本3.8
services:
rabbitmq:
image: rabbitmq:3.9-management # 使用带有管理插件的RabbitMQ镜像
container_name: rabbitmq # 容器名称
ports:
- "5672:5672" # RabbitMQ的AMQP端口
- "15672:15672" # RabbitMQ的管理界面端口
environment:
RABBITMQ_DEFAULT_USER: admin # 设置默认用户名
RABBITMQ_DEFAULT_PASS: password # 设置默认密码
5672是AMQP的端口,也就是我们使用代码去和消息队列交互的端口
15672是web端口,我们可以在web界面上管理消息队列
安全配置
上面的配置,很容易通过网络抓包,就能看到消息内容
很多时候需要使用tls进行消息加密
配置稍微麻烦一点点,大家如果配置过http的ssl,下面的操作应该就能理解了
编写一个shell脚本,用来生成ca证书、服务端证书、客户端证书
ca.sh
# 创建根证书颁发机构(CA)
openssl genrsa -out ca_key.pem 2048
openssl req -x509 -new -nodes -key ca_key.pem -days 3650 -out ca_certificate.pem -subj "/CN=MyCA"
# 创建服务端私钥
openssl genrsa -out server_key.pem 2048
# 1. 创建配置文件 ssl.conf
cat > ssl.conf <<EOF
[req]
req_extensions = v3_req
distinguished_name = req_distinguished_name
[req_distinguished_name]
[v3_req]
basicConstraints = CA:FALSE
keyUsage = nonRepudiation, digitalSignature, keyEncipherment
subjectAltName = @alt_names
[alt_names]
IP.1 = 192.168.80.164
DNS.1 = rabbitmq-server
EOF
# 2. 生成服务器证书请求(包含SAN)
openssl req -new -key server_key.pem -out server.csr -subj "/CN=rabbitmq-server" -config ssl.conf
# 3. 使用CA签名(包含SAN扩展)
openssl x509 -req -in server.csr \
-CA ca_certificate.pem \
-CAkey ca_key.pem \
-CAcreateserial \
-out server_certificate.pem \
-days 365 \
-extensions v3_req \
-extfile ssl.conf
# 生成客户端私钥(client_key.pem)和证书签名请求(client.csr)
openssl genrsa -out client_key.pem 2048
openssl req -new -key client_key.pem -out client.csr -subj "/CN=rabbitmq-client"
# 使用根 CA 对客户端证书请求进行签名,生成客户端证书(client_certificate.pem)
openssl x509 -req -in client.csr -CA ca_certificate.pem -CAkey ca_key.pem -CAcreateserial -out client_certificate.pem -days 365
生成证书的时候,要把192.168.80.164这个ip地址换成你的虚拟机ip
rabbitmq.conf
listeners.ssl.default = 5671
ssl_options.cacertfile = /etc/rabbitmq/ssl/ca_certificate.pem
ssl_options.certfile = /etc/rabbitmq/ssl/server_certificate.pem
ssl_options.keyfile = /etc/rabbitmq/ssl/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = false
management.ssl.port = 15671
management.ssl.cacertfile = /etc/rabbitmq/ssl/ca_certificate.pem
management.ssl.certfile = /etc/rabbitmq/ssl/server_certificate.pem
management.ssl.keyfile = /etc/rabbitmq/ssl/server_key.pem
docker-compose配置
version: '3'
services:
rabbitmq:
image: rabbitmq:3.9-management # 使用带有管理插件的RabbitMQ镜像
container_name: rabbitmq # 容器名称
ports:
- "5671:5671" # RabbitMQ的AMQP端口
- "15671:15671" # RabbitMQ的管理界面端口
environment:
RABBITMQ_DEFAULT_USER: admin # 设置默认用户名
RABBITMQ_DEFAULT_PASS: password # 设置默认密码
RABBITMQ_CONFIG_FILE: /etc/rabbitmq/rabbitmq.conf
volumes:
- ./config/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
- ./ssl:/etc/rabbitmq/ssl # SSL证书目录
消息队列各种模式
下载库
go get github.com/streadway/amqp
简单模式
一对一的消息传递,一个生产者将消息发送到队列,一个消费者从队列中接收消息。
生产者
- 连接mq
- 创建通道
- 声明队列
- 发送消息
package main
import (
"fmt"
"github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"log"
)
func main() {
// 连接 RabbitMQ
conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
if err != nil {
log.Fatalf("无法连接到 RabbitMQ: %v", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("无法打开通道: %v", err)
}
defer ch.Close()
queueName := "xxx"
// 声明队列
q, err := ch.QueueDeclare(
queueName, // 队列名称
false, // 持久性
false, // 自动删除
false, // 排他性
false, // 非阻塞
nil, // 其他参数
)
if err != nil {
log.Fatalf("无法声明队列: %v", err)
}
// 发送消息
body := fmt.Sprintf("msg-%d", 1)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Body: []byte(body),
})
if err != nil {
logrus.Errorf("发送失败: %v", err)
return
}
fmt.Printf("发送消息: %s\n", body)
}
消费者
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
func main() {
// 连接 RabbitMQ
conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
if err != nil {
log.Fatalf("无法连接到 RabbitMQ: %v", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("无法打开通道: %v", err)
}
defer ch.Close()
queueName := "xxx"
// 声明队列
q, err := ch.QueueDeclare(
queueName, // 队列名称
false, // 持久性
false, // 自动删除
false, // 排他性
false, // 非阻塞
nil, // 其他参数
)
if err != nil {
log.Fatalf("无法声明队列: %v", err)
}
// 接收消息
msgs, err := ch.Consume(
q.Name, // 队列
"", // 消费者
false, // 自动确认
false, // 排他性
false, // 非本地
false, // 非阻塞
nil, // 其他参数
)
if err != nil {
log.Fatalf("无法注册消费者: %v", err)
}
fmt.Println("等待接收消息")
for d := range msgs {
fmt.Printf("收到消息: %s\n", d.Body)
fmt.Println("回复消息")
d.Ack(false)
}
}
SSL安全配置
只需要在连接mq的时候,配置证书和密钥就好了
// 1. 加载客户端证书和密钥(双向认证时需要)
cert, err := tls.LoadX509KeyPair("D:\\IT\\fengfeng\\honey\\honey_app\\deploy\\rabbitMQ\\ssl\\client_certificate.pem", "D:\\IT\\fengfeng\\honey\\honey_app\\deploy\\rabbitMQ\\ssl\\client_key.pem")
if err != nil {
log.Fatalf("加载客户端证书失败: %v", err)
}
// 2. 加载CA证书(验证服务器证书)
caCert, err := os.ReadFile("D:\\IT\\fengfeng\\honey\\honey_app\\deploy\\rabbitMQ\\ssl\\ca_certificate.pem")
if err != nil {
log.Fatalf("读取CA证书失败: %v", err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
// 3. 配置TLS
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert}, // 客户端证书(双向认证时需要)
RootCAs: caCertPool, // 信任的CA
InsecureSkipVerify: false, // 必须验证服务器证书
}
// 连接 RabbitMQ
conn, err := amqp.DialTLS("amqps://admin:password@192.168.80.165:5671/", tlsConfig)
if err != nil {
log.Fatalf("无法连接到 RabbitMQ: %v", err)
}
注意mq的协议是amqps,端口是5671
工作队列模式
生产者生产消息,消息被轮询(Round-Robin) 分配给不同的消费者,避免单个消费者压力过大。
在简单模式的基础上,如果有多个消费者,默认就是工作队列模式
消息基本上会被均分给不同的消费者
如果要配置不同的消费者接收消息的能力,可以进行如下配置
// 消费者1
err = ch.Qos(
1, // prefetchCount: 未确认消息上限
0, // prefetchSize: 不限制消息大小
false, // global: 仅对当前消费者生效
)
// 消费者2
err = ch.Qos(
2, // prefetchCount: 未确认消息上限
0, // prefetchSize: 不限制消息大小
false, // global: 仅对当前消费者生效
)
这样,消费者2就会多接收一点消息
必须在ch.Consume()之前配置
发布订阅模式
生产者生产消息,多个消费者订阅消息
在 RabbitMQ 中,发布订阅模式(Publish/Subscribe Pattern) 是一种消息广播机制,生产者将消息发送到交换器(Exchange),交换器将消息复制到所有绑定的队列,每个队列对应一个消费者
生产者
- 声明fanout类型的交换器
- 将消息发给交换器
package main
import (
"fmt"
"github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"log"
)
func main() {
// 连接 RabbitMQ
conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
if err != nil {
log.Fatalf("无法连接到 RabbitMQ: %v", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("无法打开通道: %v", err)
}
defer ch.Close()
exChangeName := "logs"
// 声明 fanout 类型的交换器
err = ch.ExchangeDeclare(
exChangeName, // 交换器名称
"fanout", // 交换器类型
true, // 持久化
false, // 非自动删除
false, // 内部使用
false, // 不等待服务器确认
nil, // 额外参数
)
if err != nil {
log.Fatalf("声明交换器失败 %s", err)
}
for i := 0; i < 20; i++ {
// 发送消息
body := fmt.Sprintf("msg-%d", i)
err = ch.Publish(
exChangeName, // 交换器名称
"", // 路由键(fanout 类型忽略)
false, // mandatory
false, // immediate
amqp.Publishing{
Body: []byte(body),
})
if err != nil {
logrus.Errorf("发送失败: %v", err)
return
}
fmt.Printf("发送消息: %s\n", body)
}
}
消费者
- 声明交换器
- 创建临时队列
- 将队列绑定到交换器
- 消费消息
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
func main() {
// 连接 RabbitMQ
conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
if err != nil {
log.Fatalf("无法连接到 RabbitMQ: %v", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("无法打开通道: %v", err)
}
defer ch.Close()
exChangeName := "logs"
// 声明 fanout 类型的交换器
err = ch.ExchangeDeclare(
exChangeName, // 交换器名称
"fanout", // 交换器类型
true, // 持久化
false, // 非自动删除
false, // 内部使用
false, // 不等待服务器确认
nil, // 额外参数
)
if err != nil {
log.Fatalf("声明交换器失败 %s", err)
}
// 创建临时队列(自动生成唯一名称,连接关闭后自动删除)
q, err := ch.QueueDeclare(
"", // 空名称,让 RabbitMQ 自动生成
false, // 非持久化
true, // 排他性(仅当前连接可见)
true, // 自动删除
false, // 不等待服务器确认
nil, // 额外参数
)
if err != nil {
log.Fatalf("声明临时队列失败 %s", err)
}
// 将队列绑定到交换器(关键步骤)
err = ch.QueueBind(
q.Name, // 队列名称
"", // 路由键(fanout 类型忽略)
exChangeName, // 交换器名称
false, // 不等待服务器确认
nil, // 额外参数
)
// 消费消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者标签
true, // 自动确认(简单示例,生产环境建议手动确认)
false, // 非排他性
false, // 非本地消费者
false, // 不等待服务器确认
nil, // 额外参数
)
if err != nil {
log.Fatalf("创建消费者失败 %s", err)
}
fmt.Println("准备接收消息")
for d := range msgs {
log.Printf("[x] 收到消息: %s\n", d.Body)
}
}
这种模式,消费者只能消费它运行之后生产者生产的消息
发布订阅模式 缓存消息
上面的发布订阅模式
如果先运行生产者,再运行消费者,那么消费者是获取不到之前生产者上传的消息的
原因也很简单,因为生产者是直接发给交换器的,交换器只是做分发的,本身是不会存储消息的
改造也很简单
生产者生产消息的时候,提前绑定消费者的队列
消费者消费消息的时候,也绑定到那个队列就好了
生产者
package main
import (
"fmt"
"github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"log"
)
func main() {
// 连接 RabbitMQ
conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
if err != nil {
log.Fatalf("无法连接到 RabbitMQ: %v", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("无法打开通道: %v", err)
}
defer ch.Close()
exChangeName := "logs1"
// 声明 fanout 类型的交换器
err = ch.ExchangeDeclare(
exChangeName, // 交换器名称
"fanout", // 交换器类型
true, // 持久化
false, // 非自动删除
false, // 内部使用
false, // 不等待服务器确认
nil, // 额外参数
)
if err != nil {
log.Fatalf("声明交换器失败 %s", err)
}
q1, err := ch.QueueDeclare(
"log_queue_1",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("声明队列失败 %s", err)
}
q2, err := ch.QueueDeclare(
"log_queue_2",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("声明队列失败 %s", err)
}
// 将队列绑定到交换器(关键步骤)
err = ch.QueueBind(
q1.Name, // 队列名称
"", // 路由键(fanout 类型忽略)
exChangeName, // 交换器名称
false, // 不等待服务器确认
nil, // 额外参数
)
err = ch.QueueBind(
q2.Name, // 队列名称
"", // 路由键(fanout 类型忽略)
exChangeName, // 交换器名称
false, // 不等待服务器确认
nil, // 额外参数
)
if err != nil {
log.Fatalf("队列绑定交换器失败 %s", err)
}
for i := 0; i < 20; i++ {
// 发送消息
body := fmt.Sprintf("msg-%d", i)
err = ch.Publish(
exChangeName, // 交换器名称
"", // 路由键(fanout 类型忽略)
false, // mandatory
false, // immediate
amqp.Publishing{
Body: []byte(body),
})
if err != nil {
logrus.Errorf("发送失败: %v", err)
return
}
fmt.Printf("发送消息: %s\n", body)
}
}
消费者
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
func main() {
// 连接 RabbitMQ
conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
if err != nil {
log.Fatalf("无法连接到 RabbitMQ: %v", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("无法打开通道: %v", err)
}
defer ch.Close()
exChangeName := "logs1"
// 声明 fanout 类型的交换器
err = ch.ExchangeDeclare(
exChangeName, // 交换器名称
"fanout", // 交换器类型
true, // 持久化
false, // 非自动删除
false, // 内部使用
false, // 不等待服务器确认
nil, // 额外参数
)
if err != nil {
log.Fatalf("声明交换器失败 %s", err)
}
// 消费消息
msgs, err := ch.Consume(
"log_queue_2", // 队列名称
"", // 消费者标签
true, // 自动确认(简单示例,生产环境建议手动确认)
false, // 非排他性
false, // 非本地消费者
false, // 不等待服务器确认
nil, // 额外参数
)
if err != nil {
log.Fatalf("创建消费者失败 %s", err)
}
fmt.Println("准备接收消息")
for d := range msgs {
log.Printf("[x] 收到消息: %s\n", d.Body)
}
}
路由模式
生产者指定消费者进行消费
基于消息的 “路由键(Routing Key)” 精确匹配消息分发,生产者发送消息时指定路由键,消费者通过绑定队列与路由键的关系接收特定消息。
交换器的类型必须是direct
发送消息的时候指定路由键即可
生产者
package main
import (
"fmt"
"github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"log"
)
func main() {
// 连接 RabbitMQ
conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
if err != nil {
log.Fatalf("无法连接到 RabbitMQ: %v", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("无法打开通道: %v", err)
}
defer ch.Close()
exChangeName := "logs_direct"
// 声明 fanout 类型的交换器
err = ch.ExchangeDeclare(
exChangeName, // 交换器名称
"direct", // 交换器类型
true, // 持久化
false, // 非自动删除
false, // 内部使用
false, // 不等待服务器确认
nil, // 额外参数
)
if err != nil {
log.Fatalf("声明交换器失败 %s", err)
}
for i := 0; i < 20; i++ {
// 发送消息
body := fmt.Sprintf("msg-%d", i)
err = ch.Publish(
exChangeName, // 交换器名称
"node001", //
false, // mandatory
false, // immediate
amqp.Publishing{
Body: []byte(body),
})
if err != nil {
logrus.Errorf("发送失败: %v", err)
return
}
fmt.Printf("发送消息: %s\n", body)
}
for i := 0; i < 10; i++ {
// 发送消息
body := fmt.Sprintf("msg-%d", i)
err = ch.Publish(
exChangeName, // 交换器名称
"node002", //
false, // mandatory
false, // immediate
amqp.Publishing{
Body: []byte(body),
})
if err != nil {
logrus.Errorf("发送失败: %v", err)
return
}
fmt.Printf("发送消息: %s\n", body)
}
}
消费者
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
func main() {
// 连接 RabbitMQ
conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
if err != nil {
log.Fatalf("无法连接到 RabbitMQ: %v", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("无法打开通道: %v", err)
}
defer ch.Close()
exChangeName := "logs_direct"
// 声明 fanout 类型的交换器
err = ch.ExchangeDeclare(
exChangeName, // 交换器名称
"direct", // 交换器类型
true, // 持久化
false, // 非自动删除
false, // 内部使用
false, // 不等待服务器确认
nil, // 额外参数
)
if err != nil {
log.Fatalf("声明交换器失败 %s", err)
}
// 创建临时队列(自动生成唯一名称,连接关闭后自动删除)
q, err := ch.QueueDeclare(
"", // 空名称,让 RabbitMQ 自动生成
false, // 非持久化
true, // 排他性(仅当前连接可见)
true, // 自动删除
false, // 不等待服务器确认
nil, // 额外参数
)
if err != nil {
log.Fatalf("声明临时队列失败 %s", err)
}
// 将队列绑定到交换器(关键步骤)
err = ch.QueueBind(
q.Name, // 队列名称
"node002", // 路由键(fanout 类型忽略)
exChangeName, // 交换器名称
false, // 不等待服务器确认
nil, // 额外参数
)
// 消费消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者标签
true, // 自动确认(简单示例,生产环境建议手动确认)
false, // 非排他性
false, // 非本地消费者
false, // 不等待服务器确认
nil, // 额外参数
)
if err != nil {
log.Fatalf("创建消费者失败 %s", err)
}
fmt.Println("准备接收消息")
for d := range msgs {
log.Printf("[x] 收到消息: %s\n", d.Body)
}
}
主题模式
基于路由键的 “模糊匹配” 分发,支持通配符,比路由模式更灵活
绑定队列时可使用通配符:
*:匹配一个单词(如order.*匹配order.create,但不匹配order.create.success)。#:匹配零个或多个单词(如order.#匹配order.create、order.create.success)。
生产者
package main
import (
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明主题类型的交换器
err = ch.ExchangeDeclare(
"logs_topic", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
// 设置路由键,默认为"anonymous.info"
body := bodyFrom(os.Args)
routingKey := severityFrom(os.Args)
// 发布消息到交换器
err = ch.Publish(
"logs_topic", // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s:%s", routingKey, body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 3) || os.Args[2] == "" {
s = "hello"
} else {
s = strings.Join(args[2:], " ")
}
return s
}
func severityFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "anonymous.info"
} else {
s = os.Args[1]
}
return s
}
消费者
package main
import (
"log"
"os"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明主题类型的交换器
err = ch.ExchangeDeclare(
"logs_topic", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
// 声明临时队列
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// 获取命令行参数作为绑定键
if len(os.Args) < 2 {
log.Printf("Usage: %s [binding_key]...", os.Args[0])
os.Exit(0)
}
for _, s := range os.Args[1:] {
log.Printf("Binding queue %s to exchange %s with routing key %s",
q.Name, "logs_topic", s)
err = ch.QueueBind(
q.Name, // queue name
s, // routing key
"logs_topic", // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s: %s", d.RoutingKey, d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
先跑消费者
go run 14.主题模式-消费者.go "kernal.*" # 接收内核相关的所有服务信息
go run 14.主题模式-消费者.go "*.error" # 接收所有服务的error信息
go run 14.主题模式-消费者.go "#" # 接收所有
然后跑生产者
go run 13.主题模式-生产者.go "auth.error" "认证服务错误"
go run 13.主题模式-生产者.go "kernal.error" "系统内核错误"
go run 13.主题模式-生产者.go "auth.warning" "认证警告"
死信队列
处理 “无法被正常消费” 的消息(如消费超时、消费失败且重试次数耗尽),将其转移到专门的 “死信队列”,便于后续排查问题。
- 为普通队列设置死信交换器(
x-dead-letter-exchange)和死信路由键(x-dead-letter-routing-key)。 - 当消息满足死信条件(如被拒绝
basic.reject且requeue=false、TTL 过期、队列满)时,会被路由到死信队列。
正常生产者
- 先声明普通交换器
- 声明普通队列,通过额外参数设置死信交换器
- 绑定普通队列到普通交换器
- 发送消息到普通队列
package main
import (
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明普通交换器
err = ch.ExchangeDeclare(
"normal_exchange", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
// 声明普通队列并设置死信交换器
args := amqp.Table{
"x-dead-letter-exchange": "dlx_exchange",
"x-dead-letter-routing-key": "dlx_key",
}
_, err = ch.QueueDeclare(
"normal_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
args, // arguments
)
failOnError(err, "Failed to declare a queue")
// 绑定普通队列到普通交换器
err = ch.QueueBind(
"normal_queue", // queue name
"normal_routing_key", // routing key
"normal_exchange", // exchange
false,
nil,
)
failOnError(err, "Failed to bind a queue")
// 发送消息到普通队列
body := bodyFrom(os.Args)
err = ch.Publish(
"normal_exchange", // exchange
"normal_routing_key", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
正常消费者
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明普通交换器
err = ch.ExchangeDeclare(
"normal_exchange", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
// 声明普通队列并设置死信交换器
args := amqp.Table{
"x-dead-letter-exchange": "dlx_exchange",
"x-dead-letter-routing-key": "dlx_key",
}
_, err = ch.QueueDeclare(
"normal_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
args, // arguments
)
failOnError(err, "Failed to declare a queue")
// 绑定普通队列到普通交换器
err = ch.QueueBind(
"normal_queue", // queue name
"normal_routing_key", // routing key
"normal_exchange", // exchange
false,
nil,
)
failOnError(err, "Failed to bind a queue")
// 消费普通队列中的消息
msgs, err := ch.Consume(
"normal_queue", // queue
"", // consumer
false, // auto-ack设为false,手动处理确认
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] Received %s", d.Body)
// 拒绝消息,不重新入队,触发死信机制
d.Reject(false)
log.Printf(" [x] Rejected message and sent to DLX")
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
死信消费者
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明死信交换器
err = ch.ExchangeDeclare(
"dlx_exchange", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
// 声明死信队列
_, err = ch.QueueDeclare(
"dlx_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// 绑定死信队列到死信交换器
err = ch.QueueBind(
"dlx_queue", // queue name
"dlx_key", // routing key
"dlx_exchange", // exchange
false,
nil,
)
failOnError(err, "Failed to bind a queue")
// 消费死信队列中的消息
msgs, err := ch.Consume(
"dlx_queue", // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] DLX Received %s", d.Body)
}
}()
log.Printf(" [*] Waiting for DLX messages. To exit press CTRL+C")
<-forever
}
延迟队列模式
消息发送后不会立即被消费,而是延迟指定时间后才被处理
上面讲了死信队列,那么生产者可以让消息过期,这样消息就进入了死信队列,而且是按时间顺序进入的
这样消费者消费死信队列的数据即可
生产者
package main
import (
"log"
"time"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %v", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明死信交换器
dlxExchange := "dlx_exchange"
err = ch.ExchangeDeclare(
dlxExchange, // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
// 声明死信队列
dlQueueName := "dl_queue"
_, err = ch.QueueDeclare(
dlQueueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// 绑定死信队列到死信交换器
err = ch.QueueBind(
dlQueueName, // queue name
"dl_key", // routing key
dlxExchange, // exchange
false,
nil,
)
failOnError(err, "Failed to bind a queue")
// 声明延迟队列(设置x-dead-letter-exchange和x-message-ttl参数)
delayQueueName := "delay_queue"
_, err = ch.QueueDeclare(
delayQueueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
amqp.Table{
"x-dead-letter-exchange": dlxExchange,
"x-dead-letter-routing-key": "dl_key",
"x-message-ttl": 5000, // 5秒后消息过期
}, // arguments
)
failOnError(err, "Failed to declare a queue")
// 发送消息到延迟队列
body := "Hello, delayed message!"
err = ch.Publish(
"", // exchange
delayQueueName, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
log.Println("消息已发送到延迟队列,5秒后将进入死信队列")
time.Sleep(2 * time.Second) // 等待消息发送完成
}
消费者
package main
import (
"github.com/streadway/amqp"
"log"
"time"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %v", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明死信队列(消费者只需要关注死信队列)
queueName := "dl_queue"
_, err = ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// 消费死信队列中的消息
msgs, err := ch.Consume(
queueName, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf(" [x] Received %s", d.Body)
log.Printf("当前时间: %s", time.Now().Format("2006-01-02 15:04:05"))
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
rabbitMQ其他功能
优先级队列
声明优先级队列
在队列声明时通过x-max-priority参数设置队列支持的最大优先级
发送带优先级的消息
在amqp.Publishing中设置Priority字段指定消息的优先级
消费者会优先收到优先级较高的消息
生产者
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
"mq_study/core"
)
func main() {
ch := core.InitMQ()
//声明优先级队列
q, err := ch.QueueDeclare(
"yxj", false, false, false, false,
amqp.Table{
"x-max-priority": 10, // 设置最大优先级
},
)
if err != nil {
log.Fatal(err)
}
// 发送不同优先级的消息
messages := []struct {
body string
priority uint8
}{
{"这是低优先级消息 (1)", 1},
{"这是中优先级消息 (5)", 5},
{"这是高优先级消息 (10)", 10},
{"这是另一个低优先级消息 (2)", 2},
{"这是另一个高优先级消息 (9)", 9},
}
for _, msg := range messages {
err = ch.Publish(
"", // 交换机
q.Name, // 路由键(队列名称)
false, // 非强制
false, // 非立即
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg.body),
Priority: msg.priority, // 设置消息优先级
})
if err != nil {
fmt.Println("消息发送失败", err)
continue
}
log.Printf("已发送: %s 优先级: %d", msg.body, msg.priority)
}
}
消费者,和之前消费者一样
package main
import (
"fmt"
"log"
"mq_study/core"
)
func main() {
ch := core.InitMQ()
// 4.消费消息
consumer, err := ch.Consume("yxj", "", false, false, false, false, nil)
if err != nil {
log.Fatal(err)
}
for msg := range consumer {
fmt.Println("消息内容", string(msg.Body))
//msg.Reject()
msg.Ack(false)
}
}
回调队列
生产者生产消息,消费者消费消息,然后把结果再返回给回调队列
生产者
package main
import (
"fmt"
"github.com/google/uuid"
"github.com/streadway/amqp"
"log"
"mq_study/core"
)
func main() {
ch := core.InitMQ()
//声明普通队列
q, err := ch.QueueDeclare(
"request_queue", false, false, false, false, nil)
if err != nil {
log.Fatal(err)
}
// 声明回调队列
callbackQueue, err := ch.QueueDeclare(
"", // 空名称表示让RabbitMQ自动生成队列名
false, // 非持久化(因为响应消息不需要长期保存)
true, // 自动删除(当消费者断开连接时)
true, // 排他性(只允许当前连接访问)
false, // 非阻塞
nil,
)
msgs, err := ch.Consume(
callbackQueue.Name, // 回调队列名
"", // 消费者标签
true, // 自动确认
false, // 非排他性
false, // 不阻塞
false, // 无参数
nil,
)
if err != nil {
log.Fatal(err)
}
for i := 0; i < 5; i++ {
msg := fmt.Sprintf("消息 %d", i)
msgID := uuid.New().String() // 消息id,用于匹配回调队列中的消息
err = ch.Publish(
"", // 交换机
q.Name, // 路由键(队列名称)
false, // 非强制
false, // 非立即
amqp.Publishing{
Body: []byte(msg),
CorrelationId: msgID,
ReplyTo: callbackQueue.Name,
})
if err != nil {
fmt.Println("消息发送失败", err)
continue
}
log.Printf("已发送: %s 消息id %s", msg, msgID)
}
for d := range msgs {
fmt.Printf("收到回调消息 %s %s\n", d.Body, d.CorrelationId)
}
}
消费者
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
"mq_study/core"
)
func main() {
ch := core.InitMQ()
// 4.消费消息
consumer, err := ch.Consume("request_queue", "x1", true, false, false, false, nil)
if err != nil {
log.Fatal(err)
}
for msg := range consumer {
fmt.Println("消息内容", string(msg.Body), msg.CorrelationId)
m := fmt.Sprintf("这是回来的消息 %s", msg.CorrelationId)
err = ch.Publish("", msg.ReplyTo, false, false, amqp.Publishing{
Body: []byte(m),
CorrelationId: msg.CorrelationId,
})
if err != nil {
fmt.Println("发送响应消息失败", err)
continue
}
fmt.Println("发送响应消息", m)
}
}
流量控制
生产者发送消息,如果队列消息已经积压了,则生产者慢点发
消费者按照自己能力消费消息
生产者 通过 ch.QueueInspect(q.Name) 获取队列状态
消费者通过ch.Qos() 设置消费的能力
package main
import (
"fmt"
"log"
"mq_study/core"
"time"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
// 1. 使用 QoS 进行流量控制(最常用方式)
func consumerWithQoS(consumerID string, count int) {
ch := core.InitMQ()
// 声明队列
q, err := ch.QueueDeclare(
"flow_control_queue",
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare a queue")
// 设置 QoS (Quality of Service)
// 第一个参数: prefetchCount - 每次从队列获取的最大消息数
// 第二个参数: prefetchSize - 每次获取的最大字节数,0表示不限制
// 第三个参数: global - 是否对整个连接生效,false表示只对当前通道生效
err = ch.Qos(
count, // 每次最多处理条消息,处理完再获取新的
0, // 不限制字节数
false,
)
failOnError(err, "Failed to set QoS")
// 消费消息
msgs, err := ch.Consume(
q.Name,
consumerID,
false, // 关闭自动确认,手动确认消息处理完成
false,
false,
false,
nil,
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("消费者 %s 收到消息: %s", consumerID, d.Body)
// 模拟处理耗时
time.Sleep(500 * time.Millisecond)
// 手动确认消息已处理
d.Ack(false)
log.Printf("消费者 %s 确认消息处理完成", consumerID)
}
}()
log.Printf("消费者 %s (QoS控制) 启动,等待消息...", consumerID)
<-forever
}
// 基于队列长度的流量控制
func producerWithQueueLengthControl() {
ch := core.InitMQ()
// 声明队列
q, err := ch.QueueDeclare(
"flow_control_queue",
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare a queue")
maxQueueLength := 100 // 队列最大长度阈值
for i := 1; i <= 1000; i++ {
// 获取队列状态
queueStatus, err := ch.QueueInspect(q.Name)
failOnError(err, "Failed to inspect queue")
// 如果队列长度超过阈值,等待
if queueStatus.Messages >= maxQueueLength {
log.Printf("队列长度 %d 超过阈值 %d,等待...", queueStatus.Messages, maxQueueLength)
time.Sleep(2 * time.Second)
i-- // 重试当前消息
continue
}
body := fmt.Sprintf("这是第 %d 条消息", i)
err = ch.Publish(
"",
q.Name,
false,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf("已发送消息 %d (当前队列长度: %d)", i, queueStatus.Messages)
}
log.Println("所有消息发送完成")
}
func main() {
// 启动消费者(带QoS控制)
go consumerWithQoS("consumer_1", 2)
go consumerWithQoS("consumer_2", 1)
// 启动生产者(选择一种流量控制方式)
go producerWithQueueLengthControl()
// 保持程序运行
time.Sleep(5 * time.Minute)
}
消息过期
之前讲延迟队列的时候,队列可以设置整体的消息过期时间
其实消息也可以设置过期时间
核心就是
amqp.Publishing{
Body: []byte(msg.body),
Expiration: fmt.Sprintf("%d", msg.ttl), // 消息过期时间(毫秒)
})
持久化
需要设置队列和消息的持久化
声明持久化队列
// 声明一个持久化的队列
q, err := ch.QueueDeclare(
"persistent_queue", // 队列名称
true, // 是否持久化
false, // 是否自动删除
false, // 是否排他
false, // 是否等待服务器响应
nil, // 额外参数
)
声明交换器持久化
ch.ExchangeDeclare(
exchangeName, // name
"direct", // type
true, // 是否持久化
false, //
false, //
false, //
nil, //
)
发送持久化消息
// 要发送的消息
body := "Hello, persistent message!"
// 发布消息到队列,设置为持久化
err = ch.Publish(
"", // 交换器
q.Name, // 路由键(队列名称)
false, // 是否强制
false, // 是否立即
amqp.Publishing{
ContentType: "text/plain",
// 重要:将消息标记为持久化(2表示持久化)
DeliveryMode: amqp.Persistent,
Body: []byte(body),
})
高可用配置
之前我们的rabbitMQ是跑在单机的
像这样的中间件服务,高可用是非常重要的
单节点的情况下,rabbitMQ容器挂了,整个依赖消息队列的业务都会受影响
这个在生产环境环境是万万接收不了的
这里就在一个机器上跑三个mq容器在模拟高可用配置
version: '3'
services:
rabbitmq1:
image: rabbitmq:3.9-management
container_name: rabbitmq1
ports:
- "5672:5672" # mq端口,只映射一个
- "15672:15672" # RabbitMQ的管理界面端口
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: password
RABBITMQ_ERLANG_COOKIE: 'TDONUYAMEVUIWQTTDRHM'
hostname: rabbitmq1
volumes:
- ./rabbitmq1_data:/var/lib/rabbitmq
networks:
mq_network:
ipv4_address: 10.1.0.2
rabbitmq2:
image: rabbitmq:3.9-management
container_name: rabbitmq2
hostname: rabbitmq2
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: password
RABBITMQ_ERLANG_COOKIE: 'TDONUYAMEVUIWQTTDRHM'
volumes:
- ./rabbitmq2_data:/var/lib/rabbitmq
networks:
mq_network:
ipv4_address: 10.1.0.3
rabbitmq3:
image: rabbitmq:3.9-management
container_name: rabbitmq3
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: password
RABBITMQ_ERLANG_COOKIE: 'TDONUYAMEVUIWQTTDRHM'
hostname: rabbitmq3
volumes:
- ./rabbitmq3_data:/var/lib/rabbitmq
networks:
mq_network:
ipv4_address: 10.1.0.4
networks:
mq_network:
driver: bridge
ipam:
driver: default
config:
- subnet: 10.1.0.0/24
执行
# 先停止 rabbitmq2 应用
docker exec rabbitmq2 rabbitmqctl stop_app
# 重置节点
docker exec rabbitmq2 rabbitmqctl reset
# 重新加入集群(确保主机名正确)
docker exec rabbitmq2 rabbitmqctl join_cluster rabbit@rabbitmq1
# 启动应用
docker exec rabbitmq2 rabbitmqctl start_app
docker exec rabbitmq3 rabbitmqctl stop_app
docker exec rabbitmq3 rabbitmqctl reset
docker exec rabbitmq3 rabbitmqctl join_cluster rabbit@rabbitmq1
docker exec rabbitmq3 rabbitmqctl start_app
然后在主节点的bash中,镜像队列就好了
rabbitmqctl set_policy ha-queue-two '^' '{"ha-mode":"exactly","ha-params":3,"ha-sync-mode":"automatic"}'
在界面上看到三个节点在就说明ok了

其中一个挂了,另外的就会顶上
参数详解
队列的参数
func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)
声明队列的时候,参数很多,到底每个参数表示什么意思呢
- **"**name ":队列名称(name)
- 这是你要声明的队列的名称
- 如果为空字符串,RabbitMQ 会自动生成一个随机名称
- durable:持久化(durable)
true表示队列是持久化的,在 broker 重启后队列依然存在false表示队列是非持久化的,broker 重启后队列会被删除- 注意:这只保证队列本身的持久化,不保证消息的持久化
- autoDelete:自动删除(autoDelete)
true表示当最后一个消费者断开连接后,队列会自动删除false表示队列不会自动删除,即使没有消费者连接
- exclusive:排他性(exclusive)
true表示队列是排他的,只能被当前连接使用,连接关闭后队列会被删除false表示队列可以被多个连接共享
- noWait :不等待(noWait)
true表示不需要等待服务器的确认,会立即返回false表示会等待服务器的确认后才返回
- args :额外参数(args)
- 用于指定队列的额外属性,如消息 TTL、最大长度等
nil表示使用默认设置
声明队列的时候,相同名称的队列,如果重复声明,必须保证所有配置一样
设置队列消息长度
args := amqp.Table{
"x-max-length": 100, // 最大消息数100
"x-overflow": "reject-publish", // drop-head丢弃旧消息,reject-publish超过限制时拒绝新消息
},
消息的参数
ch.Publish(
"", // exchange 交换器的名称
"xxx", // key 路由键,如果exchange="" 则路由键=队列名
true, // mandatory 如果消息无法被路由到任何队列(没有匹配的绑定规则),RabbitMQ 会将消息返回给生产者。
false, // immediate 已废弃 历史作用:如果消息路由到的队列没有消费者,消息会被返回给生产者。
amqp.Publishing{
Body: []byte("你好"),
})
如果配置了mandatory=true,就可以在NotifyReturn这个队列中获取没有匹配到路由的消息
returnChan := make(chan amqp.Return, 10)
ch.NotifyReturn(returnChan)
for returnMsg := range returnChan {
log.Printf("消息被退回:")
log.Printf(" 回复码: %d", returnMsg.ReplyCode)
log.Printf(" 回复文本: %s", returnMsg.ReplyText)
log.Printf(" 交换机: %s", returnMsg.Exchange)
log.Printf(" 路由键: %s", returnMsg.RoutingKey)
log.Printf(" 消息体: %s", string(returnMsg.Body))
}
Publishing 字段详解
type Publishing struct {
// 应用程序或交换机特定的字段,headers类型交换机会检查此字段
// 用于存储自定义键值对元数据,可用于headers交换机的路由规则匹配
Headers Table
// 消息属性
ContentType string // MIME内容类型(如"text/plain"、"application/json")
ContentEncoding string // MIME内容编码(如"gzip"、"base64",无编码则为空)
DeliveryMode uint8 // 投递模式:0或1为非持久化,2为持久化(需配合持久化队列)
Priority uint8 // 消息优先级(0-9,需队列支持x-max-priority参数)
CorrelationId string // 关联标识符,用于匹配请求与响应(尤其在RPC场景)
ReplyTo string // 回复地址,指定接收响应的回调队列(常用于RPC)
Expiration string // 消息过期时间(毫秒字符串,如"60000"表示60秒)
MessageId string // 消息唯一标识符(业务层面,用于追踪或去重)
Timestamp time.Time // 消息创建的时间戳
Type string // 消息类型名称(自定义,用于区分消息分类,如"order_created")
UserId string // 发送消息的用户ID(需RabbitMQ已存在的用户)
AppId string // 发送消息的应用程序ID(标识消息来源应用)
// 消息的实际业务数据负载
Body []byte
}
消费者的参数
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) {}
- queue:字符串类型,指定要消费的队列名称。
- consumer:字符串类型,消费者的标识名称。可以自定义命名,用于在 RabbitMQ 管理界面中识别不同的消费者。
- autoAck:布尔类型,是否自动确认消息。
true:消息被消费者接收后会自动确认(从队列中移除),无论业务处理是否成功。false:需要手动调用Delivery.Ack()方法确认消息,确保业务处理完成后再删除消息。
- exclusive:布尔类型,是否设置为排他性消费者。
true:该消费者是队列的唯一消费者,其他消费者无法消费该队列。false:允许多个消费者同时消费该队列(消息会轮询分配给不同消费者)。
- noLocal:布尔类型,是否禁止接收本连接发布的消息,需要借助direct交换器
true:消费者不会收到由当前连接发布的消息。false:可以接收当前连接发布的消息(通常设为false)。
- noWait:布尔类型,是否非阻塞等待服务器响应。
true:方法会立即返回,不等待服务器确认(可能存在潜在错误)。false:等待服务器确认后再返回(推荐使用)。
- args:
Table类型(类似字典),额外的参数配置。通常用于传递 RabbitMQ 扩展属性(如消息优先级、过期时间等),一般情况下可传nil。
msg.Ack(false)表示收到此消息,此消息会从队列里面删除
如果不执行ack,也不执行reject,那么下次运行消费者,此消息会被投递给消费者
msg.Reject(true),表示拒绝消息,并且让消息重新入队
msg.Reject(false),表示拒绝消息,此消息会从队列中删除
如果消费者接收消息的处理任务存在失败的风险,那么建议手动确认消息