rabbitMQ消息队列基础课程
在我的新课程中,使用到了rabbitMQ消息队列 使用消息队列是为了将我们的服务进行解耦 并且支持高并发 消息队列的优势 解耦 例如在我的威胁诱捕项目中,系统服务和矩阵管理服务只需要发布
rabbitMQ消息队列基础课程
发布时间:2025-07-31 (2025-07-31)

在我的新课程中,使用到了rabbitMQ消息队列

使用消息队列是为了将我们的服务进行解耦

并且支持高并发

消息队列的优势

解耦

例如在我的威胁诱捕项目中,系统服务和矩阵管理服务只需要发布订阅消息

每个节点去订阅消息即可,这样三个服务之间就完全解耦了

异步

使用消息队列还有一个特别明显的好处,那就是可以将同步的逻辑异步化

削峰

在未使用消息队列的系统中,系统面对突发大流量会导致系统崩溃。

在使用消息队列之后,可以将大量消息积压在消息队列中,下游服务慢慢的从消息队列中消费即可

只要是消息队列,上面的优势都有,只是具体的技能点不一样

rabbitMQ安装

rabbitMQ的安装非常简单

version: '3'  # 使用Docker Compose版本3.8

services:
  rabbitmq:
    image: rabbitmq:3.9-management  # 使用带有管理插件的RabbitMQ镜像
    container_name: rabbitmq  # 容器名称
    ports:
      - "5672:5672"  # RabbitMQ的AMQP端口
      - "15672:15672"  # RabbitMQ的管理界面端口
    environment:
      RABBITMQ_DEFAULT_USER: admin  # 设置默认用户名
      RABBITMQ_DEFAULT_PASS: password  # 设置默认密码

5672是AMQP的端口,也就是我们使用代码去和消息队列交互的端口

15672是web端口,我们可以在web界面上管理消息队列

安全配置

上面的配置,很容易通过网络抓包,就能看到消息内容

很多时候需要使用tls进行消息加密

配置稍微麻烦一点点,大家如果配置过http的ssl,下面的操作应该就能理解了

编写一个shell脚本,用来生成ca证书、服务端证书、客户端证书

ca.sh

# 创建根证书颁发机构(CA)
openssl genrsa -out ca_key.pem 2048
openssl req -x509 -new -nodes -key ca_key.pem -days 3650 -out ca_certificate.pem -subj "/CN=MyCA"
# 创建服务端私钥
openssl genrsa -out server_key.pem 2048

# 1. 创建配置文件 ssl.conf
cat > ssl.conf <<EOF
[req]
req_extensions = v3_req
distinguished_name = req_distinguished_name

[req_distinguished_name]

[v3_req]
basicConstraints = CA:FALSE
keyUsage = nonRepudiation, digitalSignature, keyEncipherment
subjectAltName = @alt_names

[alt_names]
IP.1 = 192.168.80.164
DNS.1 = rabbitmq-server
EOF

# 2. 生成服务器证书请求(包含SAN)
openssl req -new -key server_key.pem -out server.csr -subj "/CN=rabbitmq-server" -config ssl.conf

# 3. 使用CA签名(包含SAN扩展)
openssl x509 -req -in server.csr \
    -CA ca_certificate.pem \
    -CAkey ca_key.pem \
    -CAcreateserial \
    -out server_certificate.pem \
    -days 365 \
    -extensions v3_req \
    -extfile ssl.conf

# 生成客户端私钥(client_key.pem)和证书签名请求(client.csr)
openssl genrsa -out client_key.pem 2048
openssl req -new -key client_key.pem -out client.csr -subj "/CN=rabbitmq-client"
# 使用根 CA 对客户端证书请求进行签名,生成客户端证书(client_certificate.pem)
openssl x509 -req -in client.csr -CA ca_certificate.pem -CAkey ca_key.pem -CAcreateserial -out client_certificate.pem -days 365

生成证书的时候,要把192.168.80.164这个ip地址换成你的虚拟机ip

rabbitmq.conf

listeners.ssl.default = 5671
ssl_options.cacertfile = /etc/rabbitmq/ssl/ca_certificate.pem
ssl_options.certfile = /etc/rabbitmq/ssl/server_certificate.pem
ssl_options.keyfile = /etc/rabbitmq/ssl/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = false

management.ssl.port = 15671
management.ssl.cacertfile = /etc/rabbitmq/ssl/ca_certificate.pem
management.ssl.certfile = /etc/rabbitmq/ssl/server_certificate.pem
management.ssl.keyfile = /etc/rabbitmq/ssl/server_key.pem

docker-compose配置

version: '3' 

services:
  rabbitmq:
    image: rabbitmq:3.9-management  # 使用带有管理插件的RabbitMQ镜像
    container_name: rabbitmq  # 容器名称
    ports:
      - "5671:5671"  # RabbitMQ的AMQP端口
      - "15671:15671"  # RabbitMQ的管理界面端口
    environment:
      RABBITMQ_DEFAULT_USER: admin  # 设置默认用户名
      RABBITMQ_DEFAULT_PASS: password  # 设置默认密码
      RABBITMQ_CONFIG_FILE: /etc/rabbitmq/rabbitmq.conf
    volumes:
      - ./config/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
      - ./ssl:/etc/rabbitmq/ssl          # SSL证书目录

消息队列各种模式

下载库

go get github.com/streadway/amqp

简单模式

一对一的消息传递,一个生产者将消息发送到队列,一个消费者从队列中接收消息。

生产者

  1. 连接mq
  2. 创建通道
  3. 声明队列
  4. 发送消息
package main

import (
  "fmt"
  "github.com/sirupsen/logrus"
  "github.com/streadway/amqp"
  "log"
)

func main() {
  // 连接 RabbitMQ
  conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
  if err != nil {
    log.Fatalf("无法连接到 RabbitMQ: %v", err)
  }
  defer conn.Close()

  // 创建通道
  ch, err := conn.Channel()
  if err != nil {
    log.Fatalf("无法打开通道: %v", err)
  }
  defer ch.Close()

  queueName := "xxx"

  // 声明队列
  q, err := ch.QueueDeclare(
    queueName, // 队列名称
    false,     // 持久性
    false,    // 自动删除
    false,    // 排他性
    false,    // 非阻塞
    nil,      // 其他参数
  )
  if err != nil {
    log.Fatalf("无法声明队列: %v", err)
  }

  // 发送消息
  body := fmt.Sprintf("msg-%d", 1)
  err = ch.Publish(
    "",        // exchange
    q.Name, // routing key
    false,     // mandatory
    false,     // immediate
    amqp.Publishing{
      Body: []byte(body),
    })
  if err != nil {
    logrus.Errorf("发送失败: %v", err)
    return
  }
  fmt.Printf("发送消息: %s\n", body)
}

消费者

package main

import (
  "fmt"
  "github.com/streadway/amqp"
  "log"
)

func main() {
  // 连接 RabbitMQ
  conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
  if err != nil {
    log.Fatalf("无法连接到 RabbitMQ: %v", err)
  }
  defer conn.Close()

  // 创建通道
  ch, err := conn.Channel()
  if err != nil {
    log.Fatalf("无法打开通道: %v", err)
  }
  defer ch.Close()

  queueName := "xxx"

  // 声明队列
  q, err := ch.QueueDeclare(
    queueName, // 队列名称
    false,     // 持久性
    false,     // 自动删除
    false,     // 排他性
    false,     // 非阻塞
    nil,       // 其他参数
  )
  if err != nil {
    log.Fatalf("无法声明队列: %v", err)
  }

  // 接收消息
  msgs, err := ch.Consume(
    q.Name, // 队列
    "",     // 消费者
    false,  // 自动确认
    false,  // 排他性
    false,  // 非本地
    false,  // 非阻塞
    nil,    // 其他参数
  )
  if err != nil {
    log.Fatalf("无法注册消费者: %v", err)
  }

  fmt.Println("等待接收消息")
  for d := range msgs {
    fmt.Printf("收到消息: %s\n", d.Body)
    fmt.Println("回复消息")
    d.Ack(false)
  }
}

SSL安全配置

只需要在连接mq的时候,配置证书和密钥就好了

  // 1. 加载客户端证书和密钥(双向认证时需要)
  cert, err := tls.LoadX509KeyPair("D:\\IT\\fengfeng\\honey\\honey_app\\deploy\\rabbitMQ\\ssl\\client_certificate.pem", "D:\\IT\\fengfeng\\honey\\honey_app\\deploy\\rabbitMQ\\ssl\\client_key.pem")
  if err != nil {
    log.Fatalf("加载客户端证书失败: %v", err)
  }

  // 2. 加载CA证书(验证服务器证书)
  caCert, err := os.ReadFile("D:\\IT\\fengfeng\\honey\\honey_app\\deploy\\rabbitMQ\\ssl\\ca_certificate.pem")
  if err != nil {
    log.Fatalf("读取CA证书失败: %v", err)
  }
  caCertPool := x509.NewCertPool()
  caCertPool.AppendCertsFromPEM(caCert)

  // 3. 配置TLS
  tlsConfig := &tls.Config{
    Certificates:       []tls.Certificate{cert}, // 客户端证书(双向认证时需要)
    RootCAs:            caCertPool,              // 信任的CA
    InsecureSkipVerify: false,                   // 必须验证服务器证书
  }

  // 连接 RabbitMQ
  conn, err := amqp.DialTLS("amqps://admin:password@192.168.80.165:5671/", tlsConfig)
  if err != nil {
    log.Fatalf("无法连接到 RabbitMQ: %v", err)
  }

注意mq的协议是amqps,端口是5671

工作队列模式

生产者生产消息,消息被轮询(Round-Robin) 分配给不同的消费者,避免单个消费者压力过大。

在简单模式的基础上,如果有多个消费者,默认就是工作队列模式

消息基本上会被均分给不同的消费者

如果要配置不同的消费者接收消息的能力,可以进行如下配置

// 消费者1
err = ch.Qos(
  1,     // prefetchCount: 未确认消息上限
  0,     // prefetchSize: 不限制消息大小
  false, // global: 仅对当前消费者生效
)
// 消费者2
err = ch.Qos(
  2,     // prefetchCount: 未确认消息上限
  0,     // prefetchSize: 不限制消息大小
  false, // global: 仅对当前消费者生效
)

这样,消费者2就会多接收一点消息

必须在ch.Consume()之前配置

发布订阅模式

生产者生产消息,多个消费者订阅消息

在 RabbitMQ 中,发布订阅模式(Publish/Subscribe Pattern) 是一种消息广播机制,生产者将消息发送到交换器(Exchange),交换器将消息复制到所有绑定的队列,每个队列对应一个消费者

生产者

  1. 声明fanout类型的交换器
  2. 将消息发给交换器
package main

import (
  "fmt"
  "github.com/sirupsen/logrus"
  "github.com/streadway/amqp"
  "log"
)

func main() {
  // 连接 RabbitMQ
  conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
  if err != nil {
    log.Fatalf("无法连接到 RabbitMQ: %v", err)
  }
  defer conn.Close()

  // 创建通道
  ch, err := conn.Channel()
  if err != nil {
    log.Fatalf("无法打开通道: %v", err)
  }
  defer ch.Close()

  exChangeName := "logs"

  // 声明 fanout 类型的交换器
  err = ch.ExchangeDeclare(
    exChangeName, // 交换器名称
    "fanout",     // 交换器类型
    true,         // 持久化
    false,        // 非自动删除
    false,        // 内部使用
    false,        // 不等待服务器确认
    nil,          // 额外参数
  )
  if err != nil {
    log.Fatalf("声明交换器失败 %s", err)
  }

  for i := 0; i < 20; i++ {
    // 发送消息
    body := fmt.Sprintf("msg-%d", i)
    err = ch.Publish(
      exChangeName, //  交换器名称
      "",           // 路由键(fanout 类型忽略)
      false,        // mandatory
      false,        // immediate
      amqp.Publishing{
        Body: []byte(body),
      })
    if err != nil {
      logrus.Errorf("发送失败: %v", err)
      return
    }
    fmt.Printf("发送消息: %s\n", body)
  }
}

消费者

  1. 声明交换器
  2. 创建临时队列
  3. 将队列绑定到交换器
  4. 消费消息
package main

import (
  "fmt"
  "github.com/streadway/amqp"
  "log"
)

func main() {
  // 连接 RabbitMQ
  conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
  if err != nil {
    log.Fatalf("无法连接到 RabbitMQ: %v", err)
  }
  defer conn.Close()

  // 创建通道
  ch, err := conn.Channel()
  if err != nil {
    log.Fatalf("无法打开通道: %v", err)
  }
  defer ch.Close()

  exChangeName := "logs"

  // 声明 fanout 类型的交换器
  err = ch.ExchangeDeclare(
    exChangeName, // 交换器名称
    "fanout",     // 交换器类型
    true,         // 持久化
    false,        // 非自动删除
    false,        // 内部使用
    false,        // 不等待服务器确认
    nil,          // 额外参数
  )
  if err != nil {
    log.Fatalf("声明交换器失败 %s", err)
  }

  // 创建临时队列(自动生成唯一名称,连接关闭后自动删除)
  q, err := ch.QueueDeclare(
    "",    // 空名称,让 RabbitMQ 自动生成
    false, // 非持久化
    true,  // 排他性(仅当前连接可见)
    true,  // 自动删除
    false, // 不等待服务器确认
    nil,   // 额外参数
  )
  if err != nil {
    log.Fatalf("声明临时队列失败 %s", err)
  }

  // 将队列绑定到交换器(关键步骤)
  err = ch.QueueBind(
    q.Name,       // 队列名称
    "",           // 路由键(fanout 类型忽略)
    exChangeName, // 交换器名称
    false,        // 不等待服务器确认
    nil,          // 额外参数
  )

  // 消费消息
  msgs, err := ch.Consume(
    q.Name, // 队列名称
    "",     // 消费者标签
    true,   // 自动确认(简单示例,生产环境建议手动确认)
    false,  // 非排他性
    false,  // 非本地消费者
    false,  // 不等待服务器确认
    nil,    // 额外参数
  )
  if err != nil {
    log.Fatalf("创建消费者失败 %s", err)
  }

  fmt.Println("准备接收消息")
  for d := range msgs {
    log.Printf("[x] 收到消息: %s\n", d.Body)
  }

}

这种模式,消费者只能消费它运行之后生产者生产的消息

发布订阅模式 缓存消息

上面的发布订阅模式

如果先运行生产者,再运行消费者,那么消费者是获取不到之前生产者上传的消息的

原因也很简单,因为生产者是直接发给交换器的,交换器只是做分发的,本身是不会存储消息的

改造也很简单

生产者生产消息的时候,提前绑定消费者的队列

消费者消费消息的时候,也绑定到那个队列就好了

生产者

package main

import (
  "fmt"
  "github.com/sirupsen/logrus"
  "github.com/streadway/amqp"
  "log"
)

func main() {
  // 连接 RabbitMQ
  conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
  if err != nil {
    log.Fatalf("无法连接到 RabbitMQ: %v", err)
  }
  defer conn.Close()

  // 创建通道
  ch, err := conn.Channel()
  if err != nil {
    log.Fatalf("无法打开通道: %v", err)
  }
  defer ch.Close()

  exChangeName := "logs1"

  // 声明 fanout 类型的交换器
  err = ch.ExchangeDeclare(
    exChangeName, // 交换器名称
    "fanout",     // 交换器类型
    true,         // 持久化
    false,        // 非自动删除
    false,        // 内部使用
    false,        // 不等待服务器确认
    nil,          // 额外参数
  )
  if err != nil {
    log.Fatalf("声明交换器失败 %s", err)
  }

  q1, err := ch.QueueDeclare(
    "log_queue_1",
    false,
    false,
    false,
    false,
    nil,
  )
  if err != nil {
    log.Fatalf("声明队列失败 %s", err)
  }
  q2, err := ch.QueueDeclare(
    "log_queue_2",
    false,
    false,
    false,
    false,
    nil,
  )
  if err != nil {
    log.Fatalf("声明队列失败 %s", err)
  }

  // 将队列绑定到交换器(关键步骤)
  err = ch.QueueBind(
    q1.Name,      // 队列名称
    "",           // 路由键(fanout 类型忽略)
    exChangeName, // 交换器名称
    false,        // 不等待服务器确认
    nil,          // 额外参数
  )
  err = ch.QueueBind(
    q2.Name,      // 队列名称
    "",           // 路由键(fanout 类型忽略)
    exChangeName, // 交换器名称
    false,        // 不等待服务器确认
    nil,          // 额外参数
  )
  if err != nil {
    log.Fatalf("队列绑定交换器失败 %s", err)
  }

  for i := 0; i < 20; i++ {
    // 发送消息
    body := fmt.Sprintf("msg-%d", i)
    err = ch.Publish(
      exChangeName, //  交换器名称
      "",           // 路由键(fanout 类型忽略)
      false,        // mandatory
      false,        // immediate
      amqp.Publishing{
        Body: []byte(body),
      })
    if err != nil {
      logrus.Errorf("发送失败: %v", err)
      return
    }
    fmt.Printf("发送消息: %s\n", body)
  }
}

消费者

package main

import (
  "fmt"
  "github.com/streadway/amqp"
  "log"
)

func main() {
  // 连接 RabbitMQ
  conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
  if err != nil {
    log.Fatalf("无法连接到 RabbitMQ: %v", err)
  }
  defer conn.Close()

  // 创建通道
  ch, err := conn.Channel()
  if err != nil {
    log.Fatalf("无法打开通道: %v", err)
  }
  defer ch.Close()

  exChangeName := "logs1"

  // 声明 fanout 类型的交换器
  err = ch.ExchangeDeclare(
    exChangeName, // 交换器名称
    "fanout",     // 交换器类型
    true,         // 持久化
    false,        // 非自动删除
    false,        // 内部使用
    false,        // 不等待服务器确认
    nil,          // 额外参数
  )
  if err != nil {
    log.Fatalf("声明交换器失败 %s", err)
  }

  // 消费消息
  msgs, err := ch.Consume(
    "log_queue_2", // 队列名称
    "",            // 消费者标签
    true,          // 自动确认(简单示例,生产环境建议手动确认)
    false,         // 非排他性
    false,         // 非本地消费者
    false,         // 不等待服务器确认
    nil,           // 额外参数
  )
  if err != nil {
    log.Fatalf("创建消费者失败 %s", err)
  }

  fmt.Println("准备接收消息")
  for d := range msgs {
    log.Printf("[x] 收到消息: %s\n", d.Body)
  }

}

路由模式

生产者指定消费者进行消费

基于消息的 “路由键(Routing Key)” 精确匹配消息分发,生产者发送消息时指定路由键,消费者通过绑定队列与路由键的关系接收特定消息。

交换器的类型必须是direct

发送消息的时候指定路由键即可

生产者

package main

import (
  "fmt"
  "github.com/sirupsen/logrus"
  "github.com/streadway/amqp"
  "log"
)

func main() {
  // 连接 RabbitMQ
  conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
  if err != nil {
    log.Fatalf("无法连接到 RabbitMQ: %v", err)
  }
  defer conn.Close()

  // 创建通道
  ch, err := conn.Channel()
  if err != nil {
    log.Fatalf("无法打开通道: %v", err)
  }
  defer ch.Close()

  exChangeName := "logs_direct"

  // 声明 fanout 类型的交换器
  err = ch.ExchangeDeclare(
    exChangeName, // 交换器名称
    "direct",     // 交换器类型
    true,         // 持久化
    false,        // 非自动删除
    false,        // 内部使用
    false,        // 不等待服务器确认
    nil,          // 额外参数
  )
  if err != nil {
    log.Fatalf("声明交换器失败 %s", err)
  }

  for i := 0; i < 20; i++ {
    // 发送消息
    body := fmt.Sprintf("msg-%d", i)
    err = ch.Publish(
      exChangeName, //  交换器名称
      "node001",           //
      false,        // mandatory
      false,        // immediate
      amqp.Publishing{
        Body: []byte(body),
      })
    if err != nil {
      logrus.Errorf("发送失败: %v", err)
      return
    }
    fmt.Printf("发送消息: %s\n", body)
  }
  for i := 0; i < 10; i++ {
    // 发送消息
    body := fmt.Sprintf("msg-%d", i)
    err = ch.Publish(
      exChangeName, //  交换器名称
      "node002",           //
      false,        // mandatory
      false,        // immediate
      amqp.Publishing{
        Body: []byte(body),
      })
    if err != nil {
      logrus.Errorf("发送失败: %v", err)
      return
    }
    fmt.Printf("发送消息: %s\n", body)
  }
}

消费者

package main

import (
  "fmt"
  "github.com/streadway/amqp"
  "log"
)

func main() {
  // 连接 RabbitMQ
  conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
  if err != nil {
    log.Fatalf("无法连接到 RabbitMQ: %v", err)
  }
  defer conn.Close()

  // 创建通道
  ch, err := conn.Channel()
  if err != nil {
    log.Fatalf("无法打开通道: %v", err)
  }
  defer ch.Close()

  exChangeName := "logs_direct"

  // 声明 fanout 类型的交换器
  err = ch.ExchangeDeclare(
    exChangeName, // 交换器名称
    "direct",     // 交换器类型
    true,         // 持久化
    false,        // 非自动删除
    false,        // 内部使用
    false,        // 不等待服务器确认
    nil,          // 额外参数
  )
  if err != nil {
    log.Fatalf("声明交换器失败 %s", err)
  }

  // 创建临时队列(自动生成唯一名称,连接关闭后自动删除)
  q, err := ch.QueueDeclare(
    "",    // 空名称,让 RabbitMQ 自动生成
    false, // 非持久化
    true,  // 排他性(仅当前连接可见)
    true,  // 自动删除
    false, // 不等待服务器确认
    nil,   // 额外参数
  )
  if err != nil {
    log.Fatalf("声明临时队列失败 %s", err)
  }

  // 将队列绑定到交换器(关键步骤)
  err = ch.QueueBind(
    q.Name,       // 队列名称
    "node002",           // 路由键(fanout 类型忽略)
    exChangeName, // 交换器名称
    false,        // 不等待服务器确认
    nil,          // 额外参数
  )

  // 消费消息
  msgs, err := ch.Consume(
    q.Name, // 队列名称
    "",     // 消费者标签
    true,   // 自动确认(简单示例,生产环境建议手动确认)
    false,  // 非排他性
    false,  // 非本地消费者
    false,  // 不等待服务器确认
    nil,    // 额外参数
  )
  if err != nil {
    log.Fatalf("创建消费者失败 %s", err)
  }

  fmt.Println("准备接收消息")
  for d := range msgs {
    log.Printf("[x] 收到消息: %s\n", d.Body)
  }

}

主题模式

基于路由键的 “模糊匹配” 分发,支持通配符,比路由模式更灵活

绑定队列时可使用通配符:

  • *:匹配一个单词(如 order.* 匹配 order.create,但不匹配 order.create.success)。
  • #:匹配零个或多个单词(如 order.# 匹配 order.createorder.create.success)。

生产者

package main

import (
  "log"
  "os"
  "strings"

  "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
  }
}

func main() {
  conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
  failOnError(err, "Failed to connect to RabbitMQ")
  defer conn.Close()

  ch, err := conn.Channel()
  failOnError(err, "Failed to open a channel")
  defer ch.Close()

  // 声明主题类型的交换器
  err = ch.ExchangeDeclare(
    "logs_topic", // name
    "topic",      // type
    true,         // durable
    false,        // auto-deleted
    false,        // internal
    false,        // no-wait
    nil,          // arguments
  )
  failOnError(err, "Failed to declare an exchange")

  // 设置路由键,默认为"anonymous.info"
  body := bodyFrom(os.Args)
  routingKey := severityFrom(os.Args)

  // 发布消息到交换器
  err = ch.Publish(
    "logs_topic", // exchange
    routingKey,   // routing key
    false,        // mandatory
    false,        // immediate
    amqp.Publishing{
      ContentType: "text/plain",
      Body:        []byte(body),
    })
  failOnError(err, "Failed to publish a message")

  log.Printf(" [x] Sent %s:%s", routingKey, body)
}

func bodyFrom(args []string) string {
  var s string
  if (len(args) < 3) || os.Args[2] == "" {
    s = "hello"
  } else {
    s = strings.Join(args[2:], " ")
  }
  return s
}

func severityFrom(args []string) string {
  var s string
  if (len(args) < 2) || os.Args[1] == "" {
    s = "anonymous.info"
  } else {
    s = os.Args[1]
  }
  return s
}

消费者

package main

import (
  "log"
  "os"

  "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
  }
}

func main() {
  conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
  failOnError(err, "Failed to connect to RabbitMQ")
  defer conn.Close()

  ch, err := conn.Channel()
  failOnError(err, "Failed to open a channel")
  defer ch.Close()

  // 声明主题类型的交换器
  err = ch.ExchangeDeclare(
    "logs_topic", // name
    "topic",      // type
    true,         // durable
    false,        // auto-deleted
    false,        // internal
    false,        // no-wait
    nil,          // arguments
  )
  failOnError(err, "Failed to declare an exchange")

  // 声明临时队列
  q, err := ch.QueueDeclare(
    "",    // name
    false, // durable
    false, // delete when unused
    true,  // exclusive
    false, // no-wait
    nil,   // arguments
  )
  failOnError(err, "Failed to declare a queue")

  // 获取命令行参数作为绑定键
  if len(os.Args) < 2 {
    log.Printf("Usage: %s [binding_key]...", os.Args[0])
    os.Exit(0)
  }
  for _, s := range os.Args[1:] {
    log.Printf("Binding queue %s to exchange %s with routing key %s",
      q.Name, "logs_topic", s)
    err = ch.QueueBind(
      q.Name,       // queue name
      s,            // routing key
      "logs_topic", // exchange
      false,
      nil)
    failOnError(err, "Failed to bind a queue")
  }

  msgs, err := ch.Consume(
    q.Name, // queue
    "",     // consumer
    true,   // auto-ack
    false,  // exclusive
    false,  // no-local
    false,  // no-wait
    nil,    // args
  )
  failOnError(err, "Failed to register a consumer")

  forever := make(chan bool)

  go func() {
    for d := range msgs {
      log.Printf(" [x] %s: %s", d.RoutingKey, d.Body)
    }
  }()

  log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
  <-forever
}

先跑消费者

go run 14.主题模式-消费者.go "kernal.*" # 接收内核相关的所有服务信息
go run 14.主题模式-消费者.go "*.error" # 接收所有服务的error信息
go run 14.主题模式-消费者.go "#" # 接收所有

然后跑生产者

go run 13.主题模式-生产者.go "auth.error" "认证服务错误"
go run 13.主题模式-生产者.go "kernal.error" "系统内核错误"
go run 13.主题模式-生产者.go "auth.warning" "认证警告"

死信队列

处理 “无法被正常消费” 的消息(如消费超时、消费失败且重试次数耗尽),将其转移到专门的 “死信队列”,便于后续排查问题。

  • 为普通队列设置死信交换器(x-dead-letter-exchange)和死信路由键(x-dead-letter-routing-key)。
  • 当消息满足死信条件(如被拒绝 basic.reject 且 requeue=false、TTL 过期、队列满)时,会被路由到死信队列。

正常生产者

  1. 先声明普通交换器
  2. 声明普通队列,通过额外参数设置死信交换器
  3. 绑定普通队列到普通交换器
  4. 发送消息到普通队列
package main

import (
  "log"
  "os"
  "strings"

  "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
  }
}

func main() {
  conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
  failOnError(err, "Failed to connect to RabbitMQ")
  defer conn.Close()

  ch, err := conn.Channel()
  failOnError(err, "Failed to open a channel")
  defer ch.Close()

  // 声明普通交换器
  err = ch.ExchangeDeclare(
    "normal_exchange", // name
    "direct",          // type
    true,              // durable
    false,             // auto-deleted
    false,             // internal
    false,             // no-wait
    nil,               // arguments
  )
  failOnError(err, "Failed to declare an exchange")

  // 声明普通队列并设置死信交换器
  args := amqp.Table{
    "x-dead-letter-exchange":    "dlx_exchange",
    "x-dead-letter-routing-key": "dlx_key",
  }
  _, err = ch.QueueDeclare(
    "normal_queue", // name
    true,           // durable
    false,          // delete when unused
    false,          // exclusive
    false,          // no-wait
    args,           // arguments
  )
  failOnError(err, "Failed to declare a queue")

  // 绑定普通队列到普通交换器
  err = ch.QueueBind(
    "normal_queue",       // queue name
    "normal_routing_key", // routing key
    "normal_exchange",    // exchange
    false,
    nil,
  )
  failOnError(err, "Failed to bind a queue")

  // 发送消息到普通队列
  body := bodyFrom(os.Args)
  err = ch.Publish(
    "normal_exchange",    // exchange
    "normal_routing_key", // routing key
    false,                // mandatory
    false,                // immediate
    amqp.Publishing{
      ContentType: "text/plain",
      Body:        []byte(body),
    })
  failOnError(err, "Failed to publish a message")

  log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
  var s string
  if (len(args) < 2) || os.Args[1] == "" {
    s = "hello"
  } else {
    s = strings.Join(args[1:], " ")
  }
  return s
}

正常消费者

package main

import (
  "log"

  "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
  }
}

func main() {
  conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
  failOnError(err, "Failed to connect to RabbitMQ")
  defer conn.Close()

  ch, err := conn.Channel()
  failOnError(err, "Failed to open a channel")
  defer ch.Close()

  // 声明普通交换器
  err = ch.ExchangeDeclare(
    "normal_exchange", // name
    "direct",          // type
    true,              // durable
    false,             // auto-deleted
    false,             // internal
    false,             // no-wait
    nil,               // arguments
  )
  failOnError(err, "Failed to declare an exchange")

  // 声明普通队列并设置死信交换器
  args := amqp.Table{
    "x-dead-letter-exchange":    "dlx_exchange",
    "x-dead-letter-routing-key": "dlx_key",
  }
  _, err = ch.QueueDeclare(
    "normal_queue", // name
    true,           // durable
    false,          // delete when unused
    false,          // exclusive
    false,          // no-wait
    args,           // arguments
  )
  failOnError(err, "Failed to declare a queue")

  // 绑定普通队列到普通交换器
  err = ch.QueueBind(
    "normal_queue",       // queue name
    "normal_routing_key", // routing key
    "normal_exchange",    // exchange
    false,
    nil,
  )
  failOnError(err, "Failed to bind a queue")

  // 消费普通队列中的消息
  msgs, err := ch.Consume(
    "normal_queue", // queue
    "",             // consumer
    false,          // auto-ack设为false,手动处理确认
    false,          // exclusive
    false,          // no-local
    false,          // no-wait
    nil,            // args
  )
  failOnError(err, "Failed to register a consumer")

  forever := make(chan bool)

  go func() {
    for d := range msgs {
      log.Printf(" [x] Received %s", d.Body)
      // 拒绝消息,不重新入队,触发死信机制
      d.Reject(false)
      log.Printf(" [x] Rejected message and sent to DLX")
    }
  }()

  log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
  <-forever
}

死信消费者

package main

import (
  "log"

  "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
  }
}

func main() {
  conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
  failOnError(err, "Failed to connect to RabbitMQ")
  defer conn.Close()

  ch, err := conn.Channel()
  failOnError(err, "Failed to open a channel")
  defer ch.Close()

  // 声明死信交换器
  err = ch.ExchangeDeclare(
    "dlx_exchange", // name
    "direct",       // type
    true,           // durable
    false,          // auto-deleted
    false,          // internal
    false,          // no-wait
    nil,            // arguments
  )
  failOnError(err, "Failed to declare an exchange")

  // 声明死信队列
  _, err = ch.QueueDeclare(
    "dlx_queue", // name
    true,        // durable
    false,       // delete when unused
    false,       // exclusive
    false,       // no-wait
    nil,         // arguments
  )
  failOnError(err, "Failed to declare a queue")

  // 绑定死信队列到死信交换器
  err = ch.QueueBind(
    "dlx_queue",    // queue name
    "dlx_key",      // routing key
    "dlx_exchange", // exchange
    false,
    nil,
  )
  failOnError(err, "Failed to bind a queue")

  // 消费死信队列中的消息
  msgs, err := ch.Consume(
    "dlx_queue", // queue
    "",          // consumer
    true,        // auto-ack
    false,       // exclusive
    false,       // no-local
    false,       // no-wait
    nil,         // args
  )
  failOnError(err, "Failed to register a consumer")

  forever := make(chan bool)

  go func() {
    for d := range msgs {
      log.Printf(" [x] DLX Received %s", d.Body)
    }
  }()

  log.Printf(" [*] Waiting for DLX messages. To exit press CTRL+C")
  <-forever
}

延迟队列模式

消息发送后不会立即被消费,而是延迟指定时间后才被处理

上面讲了死信队列,那么生产者可以让消息过期,这样消息就进入了死信队列,而且是按时间顺序进入的

这样消费者消费死信队列的数据即可

生产者

package main

import (
  "log"
  "time"

  "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %v", msg, err)
  }
}

func main() {
  conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
  failOnError(err, "Failed to connect to RabbitMQ")
  defer conn.Close()

  ch, err := conn.Channel()
  failOnError(err, "Failed to open a channel")
  defer ch.Close()

  // 声明死信交换器
  dlxExchange := "dlx_exchange"
  err = ch.ExchangeDeclare(
    dlxExchange, // name
    "direct",    // type
    true,        // durable
    false,       // auto-deleted
    false,       // internal
    false,       // no-wait
    nil,         // arguments
  )
  failOnError(err, "Failed to declare an exchange")

  // 声明死信队列
  dlQueueName := "dl_queue"
  _, err = ch.QueueDeclare(
    dlQueueName, // name
    true,        // durable
    false,       // delete when unused
    false,       // exclusive
    false,       // no-wait
    nil,         // arguments
  )
  failOnError(err, "Failed to declare a queue")

  // 绑定死信队列到死信交换器
  err = ch.QueueBind(
    dlQueueName, // queue name
    "dl_key",    // routing key
    dlxExchange, // exchange
    false,
    nil,
  )
  failOnError(err, "Failed to bind a queue")

  // 声明延迟队列(设置x-dead-letter-exchange和x-message-ttl参数)
  delayQueueName := "delay_queue"
  _, err = ch.QueueDeclare(
    delayQueueName, // name
    true,           // durable
    false,          // delete when unused
    false,          // exclusive
    false,          // no-wait
    amqp.Table{
      "x-dead-letter-exchange":    dlxExchange,
      "x-dead-letter-routing-key": "dl_key",
      "x-message-ttl":             5000, // 5秒后消息过期
    }, // arguments
  )
  failOnError(err, "Failed to declare a queue")

  // 发送消息到延迟队列
  body := "Hello, delayed message!"
  err = ch.Publish(
    "",             // exchange
    delayQueueName, // routing key
    false,          // mandatory
    false,          // immediate
    amqp.Publishing{
      ContentType: "text/plain",
      Body:        []byte(body),
    })
  failOnError(err, "Failed to publish a message")

  log.Printf(" [x] Sent %s", body)
  log.Println("消息已发送到延迟队列,5秒后将进入死信队列")

  time.Sleep(2 * time.Second) // 等待消息发送完成
}

消费者

package main

import (
  "github.com/streadway/amqp"
  "log"
  "time"
)

func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %v", msg, err)
  }
}

func main() {
  conn, err := amqp.Dial("amqp://admin:password@10.3.0.8:5672/")
  failOnError(err, "Failed to connect to RabbitMQ")
  defer conn.Close()

  ch, err := conn.Channel()
  failOnError(err, "Failed to open a channel")
  defer ch.Close()

  // 声明死信队列(消费者只需要关注死信队列)
  queueName := "dl_queue"
  _, err = ch.QueueDeclare(
    queueName, // name
    true,      // durable
    false,     // delete when unused
    false,     // exclusive
    false,     // no-wait
    nil,       // arguments
  )
  failOnError(err, "Failed to declare a queue")

  // 消费死信队列中的消息
  msgs, err := ch.Consume(
    queueName, // queue
    "",        // consumer
    true,      // auto-ack
    false,     // exclusive
    false,     // no-local
    false,     // no-wait
    nil,       // args
  )
  failOnError(err, "Failed to register a consumer")

  var forever chan struct{}

  go func() {
    for d := range msgs {
      log.Printf(" [x] Received %s", d.Body)
      log.Printf("当前时间: %s", time.Now().Format("2006-01-02 15:04:05"))
    }
  }()

  log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
  <-forever
}

rabbitMQ其他功能

优先级队列

声明优先级队列

在队列声明时通过x-max-priority参数设置队列支持的最大优先级

发送带优先级的消息

amqp.Publishing中设置Priority字段指定消息的优先级

消费者会优先收到优先级较高的消息

生产者

package main

import (
  "fmt"
  "github.com/streadway/amqp"
  "log"
  "mq_study/core"
)

func main() {
  ch := core.InitMQ()

  //声明优先级队列
  q, err := ch.QueueDeclare(
    "yxj", false, false, false, false,
    amqp.Table{
      "x-max-priority": 10, // 设置最大优先级
    },
  )
  if err != nil {
    log.Fatal(err)
  }
  // 发送不同优先级的消息
  messages := []struct {
    body     string
    priority uint8
  }{
    {"这是低优先级消息 (1)", 1},
    {"这是中优先级消息 (5)", 5},
    {"这是高优先级消息 (10)", 10},
    {"这是另一个低优先级消息 (2)", 2},
    {"这是另一个高优先级消息 (9)", 9},
  }

  for _, msg := range messages {
    err = ch.Publish(
      "",     // 交换机
      q.Name, // 路由键(队列名称)
      false,  // 非强制
      false,  // 非立即
      amqp.Publishing{
        ContentType: "text/plain",
        Body:        []byte(msg.body),
        Priority:    msg.priority, // 设置消息优先级
      })
    if err != nil {
      fmt.Println("消息发送失败", err)
      continue
    }
    log.Printf("已发送: %s 优先级: %d", msg.body, msg.priority)
  }
}

消费者,和之前消费者一样

package main

import (
  "fmt"
  "log"
  "mq_study/core"
)

func main() {
  ch := core.InitMQ()

  // 4.消费消息
  consumer, err := ch.Consume("yxj", "", false, false, false, false, nil)
  if err != nil {
    log.Fatal(err)
  }
  for msg := range consumer {
    fmt.Println("消息内容", string(msg.Body))
    //msg.Reject()
    msg.Ack(false)
  }
}

回调队列

生产者生产消息,消费者消费消息,然后把结果再返回给回调队列

生产者

package main

import (
  "fmt"
  "github.com/google/uuid"
  "github.com/streadway/amqp"
  "log"
  "mq_study/core"
)

func main() {
  ch := core.InitMQ()

  //声明普通队列
  q, err := ch.QueueDeclare(
    "request_queue", false, false, false, false, nil)
  if err != nil {
    log.Fatal(err)
  }

  // 声明回调队列
  callbackQueue, err := ch.QueueDeclare(
    "",    // 空名称表示让RabbitMQ自动生成队列名
    false, // 非持久化(因为响应消息不需要长期保存)
    true,  // 自动删除(当消费者断开连接时)
    true,  // 排他性(只允许当前连接访问)
    false, // 非阻塞
    nil,
  )
  msgs, err := ch.Consume(
    callbackQueue.Name,  // 回调队列名
    "", // 消费者标签
    true,                // 自动确认
    false,               // 非排他性
    false,               // 不阻塞
    false,               // 无参数
    nil,
  )
  if err != nil {
    log.Fatal(err)
  }

  for i := 0; i < 5; i++ {
    msg := fmt.Sprintf("消息 %d", i)
    msgID := uuid.New().String() // 消息id,用于匹配回调队列中的消息
    err = ch.Publish(
      "",     // 交换机
      q.Name, // 路由键(队列名称)
      false,  // 非强制
      false,  // 非立即
      amqp.Publishing{
        Body:          []byte(msg),
        CorrelationId: msgID,
        ReplyTo:       callbackQueue.Name,
      })
    if err != nil {
      fmt.Println("消息发送失败", err)
      continue
    }
    log.Printf("已发送: %s 消息id %s", msg, msgID)
  }
  for d := range msgs {
    fmt.Printf("收到回调消息 %s %s\n", d.Body, d.CorrelationId)
  }
}

消费者

package main

import (
  "fmt"
  "github.com/streadway/amqp"
  "log"
  "mq_study/core"
)

func main() {
  ch := core.InitMQ()

  // 4.消费消息
  consumer, err := ch.Consume("request_queue", "x1", true, false, false, false, nil)
  if err != nil {
    log.Fatal(err)
  }
  for msg := range consumer {
    fmt.Println("消息内容", string(msg.Body), msg.CorrelationId)
    m := fmt.Sprintf("这是回来的消息 %s", msg.CorrelationId)
    err = ch.Publish("", msg.ReplyTo, false, false, amqp.Publishing{
      Body:          []byte(m),
      CorrelationId: msg.CorrelationId,
    })
    if err != nil {
      fmt.Println("发送响应消息失败", err)
      continue
    }
    fmt.Println("发送响应消息", m)

  }
}

流量控制

生产者发送消息,如果队列消息已经积压了,则生产者慢点发

消费者按照自己能力消费消息

生产者 通过 ch.QueueInspect(q.Name) 获取队列状态

消费者通过ch.Qos() 设置消费的能力

package main

import (
  "fmt"
  "log"
  "mq_study/core"
  "time"

  "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
  }
}

// 1. 使用 QoS 进行流量控制(最常用方式)
func consumerWithQoS(consumerID string, count int) {
  ch := core.InitMQ()
  // 声明队列
  q, err := ch.QueueDeclare(
    "flow_control_queue",
    true,
    false,
    false,
    false,
    nil,
  )
  failOnError(err, "Failed to declare a queue")

  // 设置 QoS (Quality of Service)
  // 第一个参数: prefetchCount - 每次从队列获取的最大消息数
  // 第二个参数: prefetchSize - 每次获取的最大字节数,0表示不限制
  // 第三个参数: global - 是否对整个连接生效,false表示只对当前通道生效
  err = ch.Qos(
    count, // 每次最多处理条消息,处理完再获取新的
    0,     // 不限制字节数
    false,
  )
  failOnError(err, "Failed to set QoS")

  // 消费消息
  msgs, err := ch.Consume(
    q.Name,
    consumerID,
    false, // 关闭自动确认,手动确认消息处理完成
    false,
    false,
    false,
    nil,
  )
  failOnError(err, "Failed to register a consumer")

  forever := make(chan bool)

  go func() {
    for d := range msgs {
      log.Printf("消费者 %s 收到消息: %s", consumerID, d.Body)

      // 模拟处理耗时
      time.Sleep(500 * time.Millisecond)

      // 手动确认消息已处理
      d.Ack(false)
      log.Printf("消费者 %s 确认消息处理完成", consumerID)
    }
  }()

  log.Printf("消费者 %s (QoS控制) 启动,等待消息...", consumerID)
  <-forever
}

// 基于队列长度的流量控制
func producerWithQueueLengthControl() {
  ch := core.InitMQ()

  // 声明队列
  q, err := ch.QueueDeclare(
    "flow_control_queue",
    true,
    false,
    false,
    false,
    nil,
  )
  failOnError(err, "Failed to declare a queue")

  maxQueueLength := 100 // 队列最大长度阈值

  for i := 1; i <= 1000; i++ {
    // 获取队列状态
    queueStatus, err := ch.QueueInspect(q.Name)
    failOnError(err, "Failed to inspect queue")

    // 如果队列长度超过阈值,等待
    if queueStatus.Messages >= maxQueueLength {
      log.Printf("队列长度 %d 超过阈值 %d,等待...", queueStatus.Messages, maxQueueLength)
      time.Sleep(2 * time.Second)
      i-- // 重试当前消息
      continue
    }

    body := fmt.Sprintf("这是第 %d 条消息", i)
    err = ch.Publish(
      "",
      q.Name,
      false,
      false,
      amqp.Publishing{
        DeliveryMode: amqp.Persistent,
        ContentType:  "text/plain",
        Body:         []byte(body),
      })
    failOnError(err, "Failed to publish a message")

    log.Printf("已发送消息 %d (当前队列长度: %d)", i, queueStatus.Messages)
  }

  log.Println("所有消息发送完成")
}

func main() {
  // 启动消费者(带QoS控制)
  go consumerWithQoS("consumer_1", 2)
  go consumerWithQoS("consumer_2", 1)

  // 启动生产者(选择一种流量控制方式)
  go producerWithQueueLengthControl()

  // 保持程序运行
  time.Sleep(5 * time.Minute)
}

消息过期

之前讲延迟队列的时候,队列可以设置整体的消息过期时间

其实消息也可以设置过期时间

核心就是

amqp.Publishing{
  Body:         []byte(msg.body),
  Expiration:   fmt.Sprintf("%d", msg.ttl), // 消息过期时间(毫秒)
})

持久化

需要设置队列和消息的持久化

声明持久化队列

// 声明一个持久化的队列
  q, err := ch.QueueDeclare(
    "persistent_queue", // 队列名称
    true,               // 是否持久化
    false,              // 是否自动删除
    false,              // 是否排他
    false,              // 是否等待服务器响应
    nil,                // 额外参数
  )

声明交换器持久化

ch.ExchangeDeclare(
  exchangeName, // name
  "direct",     // type
  true,         // 是否持久化
  false,        //
  false,        //
  false,        //
  nil,          //
)

发送持久化消息

// 要发送的消息
body := "Hello, persistent message!"
// 发布消息到队列,设置为持久化
err = ch.Publish(
  "",     // 交换器
  q.Name, // 路由键(队列名称)
  false,  // 是否强制
  false,  // 是否立即
  amqp.Publishing{
    ContentType: "text/plain",
    // 重要:将消息标记为持久化(2表示持久化)
    DeliveryMode: amqp.Persistent,
    Body:         []byte(body),
  })

高可用配置

之前我们的rabbitMQ是跑在单机的

像这样的中间件服务,高可用是非常重要的

单节点的情况下,rabbitMQ容器挂了,整个依赖消息队列的业务都会受影响

这个在生产环境环境是万万接收不了的

这里就在一个机器上跑三个mq容器在模拟高可用配置

version: '3'

services:
  rabbitmq1:
    image: rabbitmq:3.9-management
    container_name: rabbitmq1
    ports:
      - "5672:5672"  # mq端口,只映射一个
      - "15672:15672"  # RabbitMQ的管理界面端口
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: password
      RABBITMQ_ERLANG_COOKIE: 'TDONUYAMEVUIWQTTDRHM'
    hostname: rabbitmq1
    volumes:
      - ./rabbitmq1_data:/var/lib/rabbitmq
    networks:
      mq_network:
        ipv4_address: 10.1.0.2
  rabbitmq2:
    image: rabbitmq:3.9-management
    container_name: rabbitmq2
    hostname: rabbitmq2
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: password
      RABBITMQ_ERLANG_COOKIE: 'TDONUYAMEVUIWQTTDRHM'
    volumes:
      - ./rabbitmq2_data:/var/lib/rabbitmq
    networks:
      mq_network:
        ipv4_address: 10.1.0.3
  rabbitmq3:
    image: rabbitmq:3.9-management
    container_name: rabbitmq3
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: password
      RABBITMQ_ERLANG_COOKIE: 'TDONUYAMEVUIWQTTDRHM'
    hostname: rabbitmq3
    volumes:
      - ./rabbitmq3_data:/var/lib/rabbitmq
    networks:
      mq_network:
        ipv4_address: 10.1.0.4

networks:
  mq_network:
    driver: bridge
    ipam:
      driver: default
      config:
        - subnet: 10.1.0.0/24

执行

# 先停止 rabbitmq2 应用
docker exec rabbitmq2 rabbitmqctl stop_app
# 重置节点
docker exec rabbitmq2 rabbitmqctl reset
# 重新加入集群(确保主机名正确)
docker exec rabbitmq2 rabbitmqctl join_cluster rabbit@rabbitmq1
# 启动应用
docker exec rabbitmq2 rabbitmqctl start_app

docker exec rabbitmq3 rabbitmqctl stop_app
docker exec rabbitmq3 rabbitmqctl reset
docker exec rabbitmq3 rabbitmqctl join_cluster rabbit@rabbitmq1
docker exec rabbitmq3 rabbitmqctl start_app

然后在主节点的bash中,镜像队列就好了

rabbitmqctl set_policy ha-queue-two '^' '{"ha-mode":"exactly","ha-params":3,"ha-sync-mode":"automatic"}'

在界面上看到三个节点在就说明ok了

其中一个挂了,另外的就会顶上

参数详解

队列的参数

func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)

声明队列的时候,参数很多,到底每个参数表示什么意思呢

  1. **"**name ":队列名称(name)
    • 这是你要声明的队列的名称
    • 如果为空字符串,RabbitMQ 会自动生成一个随机名称
  2. durable:持久化(durable)
    • true 表示队列是持久化的,在 broker 重启后队列依然存在
    • false 表示队列是非持久化的,broker 重启后队列会被删除
    • 注意:这只保证队列本身的持久化,不保证消息的持久化
  3. autoDelete:自动删除(autoDelete)
    • true 表示当最后一个消费者断开连接后,队列会自动删除
    • false 表示队列不会自动删除,即使没有消费者连接
  4. exclusive:排他性(exclusive)
    • true 表示队列是排他的,只能被当前连接使用,连接关闭后队列会被删除
    • false 表示队列可以被多个连接共享
  5. noWait :不等待(noWait)
    • true 表示不需要等待服务器的确认,会立即返回
    • false 表示会等待服务器的确认后才返回
  6. args :额外参数(args)
    • 用于指定队列的额外属性,如消息 TTL、最大长度等
    • nil 表示使用默认设置

声明队列的时候,相同名称的队列,如果重复声明,必须保证所有配置一样

设置队列消息长度

args := amqp.Table{
    "x-max-length": 100,        // 最大消息数100
    "x-overflow":   "reject-publish", // drop-head丢弃旧消息,reject-publish超过限制时拒绝新消息  
},

消息的参数

ch.Publish(
  "",    // exchange 交换器的名称
  "xxx", // key 路由键,如果exchange="" 则路由键=队列名
  true,  // mandatory 如果消息无法被路由到任何队列(没有匹配的绑定规则),RabbitMQ 会将消息返回给生产者。
  false, // immediate 已废弃 历史作用:如果消息路由到的队列没有消费者,消息会被返回给生产者。
  amqp.Publishing{
    Body: []byte("你好"),
  })

如果配置了mandatory=true,就可以在NotifyReturn这个队列中获取没有匹配到路由的消息

returnChan := make(chan amqp.Return, 10)
  ch.NotifyReturn(returnChan)
  for returnMsg := range returnChan {
    log.Printf("消息被退回:")
    log.Printf("  回复码: %d", returnMsg.ReplyCode)
    log.Printf("  回复文本: %s", returnMsg.ReplyText)
    log.Printf("  交换机: %s", returnMsg.Exchange)
    log.Printf("  路由键: %s", returnMsg.RoutingKey)
    log.Printf("  消息体: %s", string(returnMsg.Body))
  }

Publishing 字段详解

type Publishing struct {
  // 应用程序或交换机特定的字段,headers类型交换机会检查此字段
  // 用于存储自定义键值对元数据,可用于headers交换机的路由规则匹配
  Headers Table

  // 消息属性
  ContentType     string    // MIME内容类型(如"text/plain"、"application/json")
  ContentEncoding string    // MIME内容编码(如"gzip"、"base64",无编码则为空)
  DeliveryMode    uint8     // 投递模式:0或1为非持久化,2为持久化(需配合持久化队列)
  Priority        uint8     // 消息优先级(0-9,需队列支持x-max-priority参数)
  CorrelationId   string    // 关联标识符,用于匹配请求与响应(尤其在RPC场景)
  ReplyTo         string    // 回复地址,指定接收响应的回调队列(常用于RPC)
  Expiration      string    // 消息过期时间(毫秒字符串,如"60000"表示60秒)
  MessageId       string    // 消息唯一标识符(业务层面,用于追踪或去重)
  Timestamp       time.Time // 消息创建的时间戳
  Type            string    // 消息类型名称(自定义,用于区分消息分类,如"order_created")
  UserId          string    // 发送消息的用户ID(需RabbitMQ已存在的用户)
  AppId           string    // 发送消息的应用程序ID(标识消息来源应用)

  // 消息的实际业务数据负载
  Body []byte
}

消费者的参数

func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) {}
  1. queue:字符串类型,指定要消费的队列名称。
  2. consumer:字符串类型,消费者的标识名称。可以自定义命名,用于在 RabbitMQ 管理界面中识别不同的消费者。
  3. autoAck:布尔类型,是否自动确认消息。
    • true:消息被消费者接收后会自动确认(从队列中移除),无论业务处理是否成功。
    • false:需要手动调用 Delivery.Ack() 方法确认消息,确保业务处理完成后再删除消息。
  4. exclusive:布尔类型,是否设置为排他性消费者。
    • true:该消费者是队列的唯一消费者,其他消费者无法消费该队列。
    • false:允许多个消费者同时消费该队列(消息会轮询分配给不同消费者)。
  5. noLocal:布尔类型,是否禁止接收本连接发布的消息,需要借助direct交换器
    • true:消费者不会收到由当前连接发布的消息。
    • false:可以接收当前连接发布的消息(通常设为 false)。
  6. noWait:布尔类型,是否非阻塞等待服务器响应。
    • true:方法会立即返回,不等待服务器确认(可能存在潜在错误)。
    • false:等待服务器确认后再返回(推荐使用)。
  7. argsTable 类型(类似字典),额外的参数配置。通常用于传递 RabbitMQ 扩展属性(如消息优先级、过期时间等),一般情况下可传 nil

msg.Ack(false)表示收到此消息,此消息会从队列里面删除

如果不执行ack,也不执行reject,那么下次运行消费者,此消息会被投递给消费者

msg.Reject(true),表示拒绝消息,并且让消息重新入队

msg.Reject(false),表示拒绝消息,此消息会从队列中删除

如果消费者接收消息的处理任务存在失败的风险,那么建议手动确认消息

管理界面