轻量级消息队列-nsq
NSQ是Go语言编写的,开源的分布式消息队列中间件,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。 NSQ 作为轻量级消息队列,无需复杂依赖、部署 简单,是中小项目的优选 nsq部署
轻量级消息队列-nsq
发布时间:2026-03-26 (22天前)

NSQ是Go语言编写的,开源的分布式消息队列中间件,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。

NSQ 作为轻量级消息队列,无需复杂依赖、部署 简单,是中小项目的优选

nsq部署

NSQ 只有两个核心组件:

  • nsqlookupd:服务发现节点(必须先启动,如果是单机用,这个可以不用跑)
  • nsqd:业务消息节点(生产 / 消费消息)
  • nsqadmin:Web 管理面板(可选)

使用docker可以很方便的把nsq跑起来

version: "3"

services:
  nsqd:
    image: nsqio/nsq
    ports:
      - "4150:4150"   # TCP 端口
      - "4151:4151"   # HTTP 端口
    restart: always
    command: /nsqd

  nsqadmin:
    image: nsqio/nsq
    restart: always
    ports:
      - "4171:4171"
    command: /nsqadmin --nsqd-http-address=nsqd:4151

如果要用可执行文件部署,可以在官方地址下载

https://nsq.io/deployment/installing.html

go语言使用nsq

Go 是 NSQ 的原生语言,SDK 官方维护,使用极简

go get github.com/nsqio/go-nsq

生产者-消费者模型

消费者

package main

import (
  "github.com/nsqio/go-nsq"
  "log"
  "os"
  "os/signal"
  "syscall"
)

// 消息处理函数
func handleMessage(message *nsq.Message) error {
  log.Printf("收到消息:%s", string(message.Body))
  // 手动确认消息(必须调用,否则NSQ会认为消费失败并重试)
  message.Finish()
  return nil
}

func main() {
  config := nsq.NewConfig()
  topic := "test_topic"
  channel := "test_channel" // 消费组

  // 1. 创建消费者
  consumer, err := nsq.NewConsumer(topic, channel, config)
  if err != nil {
    log.Fatal("消费者创建失败:", err)
  }

  // 2. 绑定消息处理函数
  consumer.AddHandler(nsq.HandlerFunc(handleMessage))

  // 3. 连接nsqd
  err = consumer.ConnectToNSQD("192.168.80.166:4150")
  if err != nil {
    log.Fatal("连接nsqd失败:", err)
  }

  // 优雅退出
  sigChan := make(chan os.Signal, 1)
  signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
  <-sigChan

  consumer.Stop()
  log.Println("消费者已退出")
}

生产者

package main

import (
  "github.com/nsqio/go-nsq"
  "log"
)

func main() {
  // 1. 创建NSQ配置
  config := nsq.NewConfig()

  // 2. 创建生产者(连接nsqd的TCP端口:4150)
  producer, err := nsq.NewProducer("192.168.80.166:4150", config)
  if err != nil {
    log.Fatal("生产者创建失败:", err)
  }
  defer producer.Stop()

  // 3. 发送消息到 test_topic 主题
  topic := "test_topic"
  err = producer.Publish(topic, []byte("Hello NSQ! 这是一条Go发送的消息"))
  if err != nil {
    log.Fatal("消息发送失败:", err)
  }

  log.Println("消息发送成功!")
}

广播模式

如果有多个消费者,绑定不同的channel,那么每个消费者都可以完整的消费消息

任务分发模式

如果多个消费者绑定相同的channel,那么就会采用负载均衡模式,消息会均衡的分摊给每一个消费者

Message方法

message.Finish() 确认消息

message.Requeue(delay time.Duration) 消息重新入队,可指定延迟时间再重试

message.RequeueWithoutBackoff(delay time.Duration) 消息重新入队,不触发避退

message.Attempts 消息重试次数

message.ID 消息id

指数避退

// 消息处理函数
func handleMessage(message *nsq.Message) error {
  log.Printf("收到消息:%s %d", string(message.Body), message.Attempts)
  // 手动确认消息(必须调用,否则NSQ会认为消费失败并重试)
  //message.Finish()
  message.Requeue(3 * time.Second)

  return nil
}

会按照2s,4s,8s,16s, 32s的重试,重试5次

可以通过修改MaxAttempts 参数控制最大重试次数

config.MaxAttempts = 10 // 修改最大重试次数

如果不想指数避退,使用message.RequeueWithoutBackoff即可

还有一个注意点就是,在不调用message.DisableAutoResponse的情况下,return nil也相当于消费了消息

// 消息处理函数
func handleMessage(message *nsq.Message) error {
  log.Printf("收到消息:%s %d", string(message.Body), message.Attempts)
  // 自动确认消息
  return nil
}

如果不想自动确认消息,那么这条消息会在60s后自动投递

// 消息处理函数
func handleMessage(message *nsq.Message) error {
  log.Printf("收到消息:%s %d", string(message.Body), message.Attempts)
  message.DisableAutoResponse()
  return nil
}

config配置

修改消费者消费能力

MaxInFlight 表示消费者一次性最多从 nsqd 拿多少条消息,需要搭配AddConcurrentHandlers

config := nsq.NewConfig()
config.MaxInFlight = 10 // 设置消费者的消费能力
consumer.AddHandler(nsq.HandlerFunc(handleMessage)) // 单协程消费
consumer.AddConcurrentHandlers(nsq.HandlerFunc(handleMessage), 10) // 开多少个协程消费

协程数最好等于MaxInFlight

修改最大重试次数

MaxAttempts 一条消息最多重试多少次就放弃,默认5次

消息超时时间

MsgTimeout 消息发给消费者后,多久不确认就认为超时并重试,默认60s

默认重试延迟

DefaultRequeueDelay / MaxRequeueDelay

默认90s,可以通过RequeueWithoutBackoff()可以覆盖

退避设置

// 退避策略:消费失败后,等待重试的算法规则
// 默认:exponential(指数退避,失败次数越多,等待时间越长)
BackoffStrategy BackoffStrategy `opt:"backoff_strategy" default:"exponential"`

// 最大退避时长:消费失败后,最长等待多久再重试(到达这个时间就不再增长)
// 设为 0 表示完全关闭退避机制
// 默认:2分钟(2m)
MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"`

// 退避乘数(基础单位时间):计算指数退避等待时间的基础单位
// 指数退避公式:等待时间 = (2 ^ 失败次数) * 该值
// 默认:1秒(1s)
BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s"`

nsq生产级部署

上面的部署都是开发环境使用的,如果是生产环境,建议按照下面的部署方式部署nsq

nsq增加授权认证

如果需要开放使用,但是数据不是敏感数据,那就配置个授权认证就好了

version: "3"
services:
  nsqd:
    image: nsqio/nsq
    depends_on:
      - nsq-auth
    ports:
      - "4150:4150"
    restart: always
    command: /nsqd --auth-http-address="nsq-auth:1325"
  nsq-auth:
    image: zhimiao/nsq-auth:latest
    restart: always
    command: --secret="Password"

密码只需要在config里面配置就OK了

config := nsq.NewConfig()
config.AuthSecret = "Password"

nsq TLS配置 单向

先生成私钥和证书

openssl req -x509 -newkey rsa:2048 -nodes -keyout server.key -out server.crt -days 3650 -subj "/CN=nsqd"

把生成的server.crt server.key放入tls目录

docker-compose配置

version: "3"

services:
  nsqd:
    image: nsqio/nsq
    depends_on:
      - nsq-auth
    ports:
      - "4150:4150"
      - "4151:4151"
    restart: always
    volumes:
      - ./tls:/tls  # 证书挂载
    command: /nsqd --auth-http-address="nsq-auth:1325" --tls-required=true --tls-cert=/tls/server.crt --tls-key=/tls/server.key --verbose

  nsq-auth:
    image: zhimiao/nsq-auth:latest
    restart: always
    command: --secret="Password"

代码里面只需要加上

config := nsq.NewConfig()
config.TlsV1 = true
config.AuthSecret = "Password"
config.TlsConfig = &tls.Config{
  InsecureSkipVerify: true, // 自签名证书必须加
}

nsq TLS配置 向tls

单向tls是客户端现在服务端

双向tls不久客户端验证服务端,服务端也要验证客户端

生成证书的时候,需要把服务端的ip带上去

openssl req -x509 -newkey rsa:2048 -nodes -keyout server.key -out server.crt -days 3650 -subj "/CN=192.168.80.188" -extensions EXT -config <(
echo "[req]";
echo "distinguished_name = req_distinguished_name";
echo "[req_distinguished_name]";
echo "[EXT]";
echo "subjectAltName=IP:192.168.80.188";
echo "keyUsage=digitalSignature";
echo "extendedKeyUsage=serverAuth,clientAuth";
)

docker-compose配置

version: "3"

services:
  nsqd:
    image: nsqio/nsq
    depends_on:
      - nsq-auth
    ports:
      - "4150:4150"
      - "4151:4151"
    restart: always
    volumes:
      - ./tls:/tls
    command: /nsqd --auth-http-address="nsq-auth:1325" --tls-required=true --tls-client-auth-policy=require --tls-cert=/tls/server.crt --tls-key=/tls/server.key --verbose

  nsq-auth:
    image: zhimiao/nsq-auth:latest
    restart: always
    command: --secret="Password"

代码配置

// 1. 创建NSQ配置
config := nsq.NewConfig()

config.TlsV1 = true
config.AuthSecret = "Password"
cert, err := tls.LoadX509KeyPair("./nsq3/tls/server.crt", "./nsq3/tls/server.key")
if err != nil {
  log.Fatal(err)
}

// 加载证书池(信任自己)
caCert, _ := os.ReadFile("./nsq3/tls/server.crt")
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caCert)

// TLS 配置
config.TlsConfig = &tls.Config{
  Certificates: []tls.Certificate{cert}, // 客户端证书 = 服务端证书
  RootCAs:      pool,
}

nsq集群配置

如果需要让nsq高可用,那么就得用nsq的集群配置

需要引入nsqlookupd做服务发现,然后多个nsqd做节点

version: "3"

services:
  # 1. 服务发现(集群必须)
  nsqlookupd:
    image: nsqio/nsq
    ports:
      - "4160:4160"
      - "4161:4161"
    restart: always
    command: /nsqlookupd

  # 2. 消息节点 1
  nsqd1:
    image: nsqio/nsq
    depends_on:
      - nsqlookupd
    ports:
      - "4150:4150"
      - "4151:4151"
    restart: always
    volumes:
      - ./data1:/data
    command: /nsqd --broadcast-address=192.168.80.188 --lookupd-tcp-address=nsqlookupd:4160 --data-path=/data

  # 3. 消息节点 2
  nsqd2:
    image: nsqio/nsq
    depends_on:
      - nsqlookupd
    ports:
      - "4152:4150"
      - "4153:4151"
    restart: always
    volumes:
      - ./data2:/data
    command: /nsqd  --broadcast-address=192.168.80.188 --broadcast-tcp-port=4152 --broadcast-http-port=4153 --lookupd-tcp-address=nsqlookupd:4160 --data-path=/data

  # 4. Web 控制台
  nsqadmin:
    image: nsqio/nsq
    depends_on:
      - nsqlookupd
    ports:
      - "4171:4171"
    restart: always
    command: /nsqadmin --lookupd-http-address=nsqlookupd:4161

生产者只能向nsqd发消息

消费者可以消费nsqd的消息,也可以消费nsqlookup的

package main

import (
  "github.com/nsqio/go-nsq"
  "log"
  "os"
  "os/signal"
  "syscall"
  "time"
)

func handleMessage(message *nsq.Message) error {
  log.Printf("✅ 收到消息:%s", string(message.Body))
  time.Sleep(1 * time.Second)
  message.Finish()
  return nil
}

func main() {
  config := nsq.NewConfig()
  config.MaxInFlight = 10

  consumer, err := nsq.NewConsumer("test_topic", "test_channel", config)
  if err != nil {
    log.Fatal(err)
  }

  consumer.AddConcurrentHandlers(nsq.HandlerFunc(handleMessage), 10)

  // ==============================
  // 🔥 集群模式:连接 lookupd(4161)
  // ==============================
  err = consumer.ConnectToNSQLookupd("192.168.80.188:4161")
  if err != nil {
    log.Fatal("连接集群失败:", err)
  }

  sigChan := make(chan os.Signal, 1)
  signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
  <-sigChan

  consumer.Stop()
  log.Println("🚪 消费者已退出")
}

nsq 集群+授权+TLS配置

version: "3"

services:
  # 1. 服务发现(集群必须)
  nsqlookupd:
    image: nsqio/nsq
    ports:
      - "4160:4160"
      - "4161:4161"
    restart: always
    command: /nsqlookupd

  # 2. 消息节点 1
  nsqd1:
    image: nsqio/nsq
    depends_on:
      - nsqlookupd
    ports:
      - "4150:4150"
      - "4151:4151"
    restart: always
    volumes:
      - ./data1:/data
      - ./tls:/tls
    command: /nsqd --broadcast-address=192.168.80.188 --lookupd-tcp-address=nsqlookupd:4160 --data-path=/data --auth-http-address="nsq-auth:1325" --tls-required=true --tls-client-auth-policy=require --tls-cert=/tls/server.crt --tls-key=/tls/server.key --verbose

  # 3. 消息节点 2
  nsqd2:
    image: nsqio/nsq
    depends_on:
      - nsqlookupd
    ports:
      - "4152:4150"
      - "4153:4151"
    restart: always
    volumes:
      - ./data2:/data
      - ./tls:/tls
    command: /nsqd  --broadcast-address=192.168.80.188 --broadcast-tcp-port=4152 --broadcast-http-port=4153 --lookupd-tcp-address=nsqlookupd:4160 --data-path=/data --auth-http-address="nsq-auth:1325" --tls-required=true --tls-client-auth-policy=require --tls-cert=/tls/server.crt --tls-key=/tls/server.key --verbose

  # 4. Web 控制台
  nsqadmin:
    image: nsqio/nsq
    depends_on:
      - nsqlookupd
    ports:
      - "4171:4171"
    restart: always
    command: /nsqadmin --lookupd-http-address=nsqlookupd:4161

  nsq-auth:
    image: zhimiao/nsq-auth:latest
    restart: always
    command: --secret="Password"

生产者和消费者就参考tls配置