NSQ是Go语言编写的,开源的分布式消息队列中间件,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。
NSQ 作为轻量级消息队列,无需复杂依赖、部署 简单,是中小项目的优选
nsq部署
NSQ 只有两个核心组件:
nsqlookupd:服务发现节点(必须先启动,如果是单机用,这个可以不用跑)nsqd:业务消息节点(生产 / 消费消息)nsqadmin:Web 管理面板(可选)
使用docker可以很方便的把nsq跑起来
version: "3"
services:
nsqd:
image: nsqio/nsq
ports:
- "4150:4150" # TCP 端口
- "4151:4151" # HTTP 端口
restart: always
command: /nsqd
nsqadmin:
image: nsqio/nsq
restart: always
ports:
- "4171:4171"
command: /nsqadmin --nsqd-http-address=nsqd:4151
如果要用可执行文件部署,可以在官方地址下载
https://nsq.io/deployment/installing.html
go语言使用nsq
Go 是 NSQ 的原生语言,SDK 官方维护,使用极简
go get github.com/nsqio/go-nsq
生产者-消费者模型
消费者
package main
import (
"github.com/nsqio/go-nsq"
"log"
"os"
"os/signal"
"syscall"
)
// 消息处理函数
func handleMessage(message *nsq.Message) error {
log.Printf("收到消息:%s", string(message.Body))
// 手动确认消息(必须调用,否则NSQ会认为消费失败并重试)
message.Finish()
return nil
}
func main() {
config := nsq.NewConfig()
topic := "test_topic"
channel := "test_channel" // 消费组
// 1. 创建消费者
consumer, err := nsq.NewConsumer(topic, channel, config)
if err != nil {
log.Fatal("消费者创建失败:", err)
}
// 2. 绑定消息处理函数
consumer.AddHandler(nsq.HandlerFunc(handleMessage))
// 3. 连接nsqd
err = consumer.ConnectToNSQD("192.168.80.166:4150")
if err != nil {
log.Fatal("连接nsqd失败:", err)
}
// 优雅退出
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
consumer.Stop()
log.Println("消费者已退出")
}
生产者
package main
import (
"github.com/nsqio/go-nsq"
"log"
)
func main() {
// 1. 创建NSQ配置
config := nsq.NewConfig()
// 2. 创建生产者(连接nsqd的TCP端口:4150)
producer, err := nsq.NewProducer("192.168.80.166:4150", config)
if err != nil {
log.Fatal("生产者创建失败:", err)
}
defer producer.Stop()
// 3. 发送消息到 test_topic 主题
topic := "test_topic"
err = producer.Publish(topic, []byte("Hello NSQ! 这是一条Go发送的消息"))
if err != nil {
log.Fatal("消息发送失败:", err)
}
log.Println("消息发送成功!")
}
广播模式
如果有多个消费者,绑定不同的channel,那么每个消费者都可以完整的消费消息
任务分发模式
如果多个消费者绑定相同的channel,那么就会采用负载均衡模式,消息会均衡的分摊给每一个消费者
Message方法
message.Finish() 确认消息
message.Requeue(delay time.Duration) 消息重新入队,可指定延迟时间再重试
message.RequeueWithoutBackoff(delay time.Duration) 消息重新入队,不触发避退
message.Attempts 消息重试次数
message.ID 消息id
指数避退
// 消息处理函数
func handleMessage(message *nsq.Message) error {
log.Printf("收到消息:%s %d", string(message.Body), message.Attempts)
// 手动确认消息(必须调用,否则NSQ会认为消费失败并重试)
//message.Finish()
message.Requeue(3 * time.Second)
return nil
}
会按照2s,4s,8s,16s, 32s的重试,重试5次
可以通过修改MaxAttempts 参数控制最大重试次数
config.MaxAttempts = 10 // 修改最大重试次数
如果不想指数避退,使用message.RequeueWithoutBackoff即可
还有一个注意点就是,在不调用message.DisableAutoResponse的情况下,return nil也相当于消费了消息
// 消息处理函数
func handleMessage(message *nsq.Message) error {
log.Printf("收到消息:%s %d", string(message.Body), message.Attempts)
// 自动确认消息
return nil
}
如果不想自动确认消息,那么这条消息会在60s后自动投递
// 消息处理函数
func handleMessage(message *nsq.Message) error {
log.Printf("收到消息:%s %d", string(message.Body), message.Attempts)
message.DisableAutoResponse()
return nil
}
config配置
修改消费者消费能力
MaxInFlight 表示消费者一次性最多从 nsqd 拿多少条消息,需要搭配AddConcurrentHandlers
config := nsq.NewConfig()
config.MaxInFlight = 10 // 设置消费者的消费能力
consumer.AddHandler(nsq.HandlerFunc(handleMessage)) // 单协程消费
consumer.AddConcurrentHandlers(nsq.HandlerFunc(handleMessage), 10) // 开多少个协程消费
协程数最好等于MaxInFlight
修改最大重试次数
MaxAttempts 一条消息最多重试多少次就放弃,默认5次
消息超时时间
MsgTimeout 消息发给消费者后,多久不确认就认为超时并重试,默认60s
默认重试延迟
DefaultRequeueDelay / MaxRequeueDelay
默认90s,可以通过RequeueWithoutBackoff()可以覆盖
退避设置
// 退避策略:消费失败后,等待重试的算法规则
// 默认:exponential(指数退避,失败次数越多,等待时间越长)
BackoffStrategy BackoffStrategy `opt:"backoff_strategy" default:"exponential"`
// 最大退避时长:消费失败后,最长等待多久再重试(到达这个时间就不再增长)
// 设为 0 表示完全关闭退避机制
// 默认:2分钟(2m)
MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"`
// 退避乘数(基础单位时间):计算指数退避等待时间的基础单位
// 指数退避公式:等待时间 = (2 ^ 失败次数) * 该值
// 默认:1秒(1s)
BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s"`
nsq生产级部署
上面的部署都是开发环境使用的,如果是生产环境,建议按照下面的部署方式部署nsq
nsq增加授权认证
如果需要开放使用,但是数据不是敏感数据,那就配置个授权认证就好了
version: "3"
services:
nsqd:
image: nsqio/nsq
depends_on:
- nsq-auth
ports:
- "4150:4150"
restart: always
command: /nsqd --auth-http-address="nsq-auth:1325"
nsq-auth:
image: zhimiao/nsq-auth:latest
restart: always
command: --secret="Password"
密码只需要在config里面配置就OK了
config := nsq.NewConfig()
config.AuthSecret = "Password"
nsq TLS配置 单向
先生成私钥和证书
openssl req -x509 -newkey rsa:2048 -nodes -keyout server.key -out server.crt -days 3650 -subj "/CN=nsqd"
把生成的server.crt server.key放入tls目录
docker-compose配置
version: "3"
services:
nsqd:
image: nsqio/nsq
depends_on:
- nsq-auth
ports:
- "4150:4150"
- "4151:4151"
restart: always
volumes:
- ./tls:/tls # 证书挂载
command: /nsqd --auth-http-address="nsq-auth:1325" --tls-required=true --tls-cert=/tls/server.crt --tls-key=/tls/server.key --verbose
nsq-auth:
image: zhimiao/nsq-auth:latest
restart: always
command: --secret="Password"
代码里面只需要加上
config := nsq.NewConfig()
config.TlsV1 = true
config.AuthSecret = "Password"
config.TlsConfig = &tls.Config{
InsecureSkipVerify: true, // 自签名证书必须加
}
nsq TLS配置 向tls
单向tls是客户端现在服务端
双向tls不久客户端验证服务端,服务端也要验证客户端
生成证书的时候,需要把服务端的ip带上去
openssl req -x509 -newkey rsa:2048 -nodes -keyout server.key -out server.crt -days 3650 -subj "/CN=192.168.80.188" -extensions EXT -config <(
echo "[req]";
echo "distinguished_name = req_distinguished_name";
echo "[req_distinguished_name]";
echo "[EXT]";
echo "subjectAltName=IP:192.168.80.188";
echo "keyUsage=digitalSignature";
echo "extendedKeyUsage=serverAuth,clientAuth";
)
docker-compose配置
version: "3"
services:
nsqd:
image: nsqio/nsq
depends_on:
- nsq-auth
ports:
- "4150:4150"
- "4151:4151"
restart: always
volumes:
- ./tls:/tls
command: /nsqd --auth-http-address="nsq-auth:1325" --tls-required=true --tls-client-auth-policy=require --tls-cert=/tls/server.crt --tls-key=/tls/server.key --verbose
nsq-auth:
image: zhimiao/nsq-auth:latest
restart: always
command: --secret="Password"
代码配置
// 1. 创建NSQ配置
config := nsq.NewConfig()
config.TlsV1 = true
config.AuthSecret = "Password"
cert, err := tls.LoadX509KeyPair("./nsq3/tls/server.crt", "./nsq3/tls/server.key")
if err != nil {
log.Fatal(err)
}
// 加载证书池(信任自己)
caCert, _ := os.ReadFile("./nsq3/tls/server.crt")
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caCert)
// TLS 配置
config.TlsConfig = &tls.Config{
Certificates: []tls.Certificate{cert}, // 客户端证书 = 服务端证书
RootCAs: pool,
}
nsq集群配置
如果需要让nsq高可用,那么就得用nsq的集群配置
需要引入nsqlookupd做服务发现,然后多个nsqd做节点
version: "3"
services:
# 1. 服务发现(集群必须)
nsqlookupd:
image: nsqio/nsq
ports:
- "4160:4160"
- "4161:4161"
restart: always
command: /nsqlookupd
# 2. 消息节点 1
nsqd1:
image: nsqio/nsq
depends_on:
- nsqlookupd
ports:
- "4150:4150"
- "4151:4151"
restart: always
volumes:
- ./data1:/data
command: /nsqd --broadcast-address=192.168.80.188 --lookupd-tcp-address=nsqlookupd:4160 --data-path=/data
# 3. 消息节点 2
nsqd2:
image: nsqio/nsq
depends_on:
- nsqlookupd
ports:
- "4152:4150"
- "4153:4151"
restart: always
volumes:
- ./data2:/data
command: /nsqd --broadcast-address=192.168.80.188 --broadcast-tcp-port=4152 --broadcast-http-port=4153 --lookupd-tcp-address=nsqlookupd:4160 --data-path=/data
# 4. Web 控制台
nsqadmin:
image: nsqio/nsq
depends_on:
- nsqlookupd
ports:
- "4171:4171"
restart: always
command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
生产者只能向nsqd发消息
消费者可以消费nsqd的消息,也可以消费nsqlookup的
package main
import (
"github.com/nsqio/go-nsq"
"log"
"os"
"os/signal"
"syscall"
"time"
)
func handleMessage(message *nsq.Message) error {
log.Printf("✅ 收到消息:%s", string(message.Body))
time.Sleep(1 * time.Second)
message.Finish()
return nil
}
func main() {
config := nsq.NewConfig()
config.MaxInFlight = 10
consumer, err := nsq.NewConsumer("test_topic", "test_channel", config)
if err != nil {
log.Fatal(err)
}
consumer.AddConcurrentHandlers(nsq.HandlerFunc(handleMessage), 10)
// ==============================
// 🔥 集群模式:连接 lookupd(4161)
// ==============================
err = consumer.ConnectToNSQLookupd("192.168.80.188:4161")
if err != nil {
log.Fatal("连接集群失败:", err)
}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
consumer.Stop()
log.Println("🚪 消费者已退出")
}
nsq 集群+授权+TLS配置
version: "3"
services:
# 1. 服务发现(集群必须)
nsqlookupd:
image: nsqio/nsq
ports:
- "4160:4160"
- "4161:4161"
restart: always
command: /nsqlookupd
# 2. 消息节点 1
nsqd1:
image: nsqio/nsq
depends_on:
- nsqlookupd
ports:
- "4150:4150"
- "4151:4151"
restart: always
volumes:
- ./data1:/data
- ./tls:/tls
command: /nsqd --broadcast-address=192.168.80.188 --lookupd-tcp-address=nsqlookupd:4160 --data-path=/data --auth-http-address="nsq-auth:1325" --tls-required=true --tls-client-auth-policy=require --tls-cert=/tls/server.crt --tls-key=/tls/server.key --verbose
# 3. 消息节点 2
nsqd2:
image: nsqio/nsq
depends_on:
- nsqlookupd
ports:
- "4152:4150"
- "4153:4151"
restart: always
volumes:
- ./data2:/data
- ./tls:/tls
command: /nsqd --broadcast-address=192.168.80.188 --broadcast-tcp-port=4152 --broadcast-http-port=4153 --lookupd-tcp-address=nsqlookupd:4160 --data-path=/data --auth-http-address="nsq-auth:1325" --tls-required=true --tls-client-auth-policy=require --tls-cert=/tls/server.crt --tls-key=/tls/server.key --verbose
# 4. Web 控制台
nsqadmin:
image: nsqio/nsq
depends_on:
- nsqlookupd
ports:
- "4171:4171"
restart: always
command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
nsq-auth:
image: zhimiao/nsq-auth:latest
restart: always
command: --secret="Password"
生产者和消费者就参考tls配置