Asynq 是一个非常流行且强大的分布式任务队列库。它基于 Redis 构建,设计灵感很大程度上来自于 Ruby 的 Sidekiq,旨在处理异步任务和后台作业。
如果你的项目里面恰好用到了redis,并且需要异步处理任务(如发送邮件、图像处理、数据分析),Asynq 是一个极佳的选择。
核心架构与原理
Asynq 的运行机制非常直观:它采用生产者-消费者模型。
- Client (生产者): 将任务封装并推送到 Redis 队列中。
- Redis: 作为消息代理(Broker),存储任务元数据和优先级队列。
- Server (消费者): 从队列中拉取任务,并根据注册的处理函数执行逻辑。
主要特性
Asynq 之所以受欢迎,是因为它在保证简单易用的同时,提供了企业级的特性:
- 任务持久化: 任务存储在 Redis 中,即使应用崩溃,任务也不会丢失。
- 重试机制: 支持自定义重试次数和指数退避(Exponential Backoff)策略。
- **优先级队列:**可以为不同任务设置权重(例如:
critical: 6,default: 3,low: 1)。 - 延时任务: 支持在未来的某个特定时间点执行任务。
- 周期性任务: 内置 Cron 表达式支持,可以像配置定时任务一样执行重复作业。
- **可视化监控:**提供
asynqmon工具(Web UI 和 CLI),可以直观查看队列状态、失败任务和吞吐量。 - 优雅停机: 能够安全地等待正在运行的任务完成后再关闭服务。

库的下载
在 Go 项目里使用 Asynq,需要先把库下载到当前项目。下面这条命令会把 github.com/hibiken/asynq 添加到 go.mod,后面代码里才能 import "github.com/hibiken/asynq"。
如果你是零基础,可以把 go get 理解成“给项目安装一个第三方工具包”。安装完成后,我们就可以使用 Asynq 提供的 NewClient、NewServer、NewTask 等函数。
go get github.com/hibiken/asynq
需要redis4.0以上的版本,我这里直接装了redis8.6.2的版本
不想折腾docker的,直接下这个,然后本地跑就行
https://github.com/redis-windows/redis-windows/releases/tag/8.6.2
核心功能实战
学习 Asynq 的时候,不要一开始就把它想得太复杂。你可以先把它理解成一个“任务代办系统”:
- 生产者负责把任务写到 Redis,类似“把一张待办事项纸条放进箱子里”。
- 消费者负责从 Redis 取任务并执行,类似“工作人员从箱子里拿纸条,然后开始干活”。
- 任务类型就是任务名称,例如
email:welcome表示“发送欢迎邮件”。 - Payload就是任务参数,例如用户 ID、邮箱、订单号等。
下面的每个示例都建议按这个顺序学习:先看这个功能解决什么问题,再看消费者如何处理任务,最后看生产者如何把任务放进队列。
普通asynq
这是最基础的 Asynq 用法。普通任务适合处理“现在不想在接口里立刻做,但希望后台尽快完成”的事情。
比如用户注册成功后,接口需要马上返回“注册成功”,但发送欢迎邮件可能比较慢。如果在接口里直接发邮件,用户就要一直等;如果用 Asynq,就可以先把“发送欢迎邮件”这个任务丢进 Redis,然后接口立刻返回,后台消费者慢慢处理。
这个例子里面:
- 生产者创建一个
email:welcome任务,并把用户 ID 和邮箱放进 payload。 - 消费者注册
email:welcome的处理函数。 - 只要消费者运行着,任务入队后就会被尽快取出来执行。
消费者
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/hibiken/asynq"
)
const redisAddr = "127.0.0.1:6379"
const typeEmailWelcome = "email:welcome"
type EmailWelcomePayload struct {
UserID int `json:"user_id"`
Email string `json:"email"`
}
func main() {
server := asynq.NewServer(
asynq.RedisClientOpt{Addr: redisAddr},
asynq.Config{
// Concurrency 表示最多同时执行多少个任务。
Concurrency: 5,
},
)
mux := asynq.NewServeMux()
mux.HandleFunc(typeEmailWelcome, handleEmailWelcomeTask)
log.Println("普通消费者启动,监听 default 队列")
if err := server.Run(mux); err != nil {
log.Fatal(err)
}
}
func handleEmailWelcomeTask(ctx context.Context, task *asynq.Task) error {
var payload EmailWelcomePayload
if err := json.Unmarshal(task.Payload(), &payload); err != nil {
// 返回错误会触发 asynq 的失败处理;如果设置了重试次数,会进入重试队列。
return err
}
fmt.Printf("发送欢迎邮件: user_id=%d email=%s\n", payload.UserID, payload.Email)
return nil
}
生产者
package main
import (
"encoding/json"
"fmt"
"log"
"github.com/hibiken/asynq"
)
const redisAddr = "127.0.0.1:6379"
const typeEmailWelcome = "email:welcome"
type EmailWelcomePayload struct {
UserID int `json:"user_id"`
Email string `json:"email"`
}
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
defer client.Close()
payload, err := json.Marshal(EmailWelcomePayload{
UserID: 1001,
Email: "user@example.com",
})
if err != nil {
log.Fatal(err)
}
// 普通任务:Enqueue 后,只要有消费者监听 default 队列,就会尽快执行。
task := asynq.NewTask(typeEmailWelcome, payload)
info, err := client.Enqueue(task)
if err != nil {
log.Fatal(err)
}
fmt.Printf("普通任务已入队: id=%s queue=%s type=%s\n", info.ID, info.Queue, info.Type)
}
如果有多个消费者,多个消费者之间是竞争状态
也就是说,同一个任务只会被其中一个消费者抢到并执行,不会每个消费者都执行一遍。这一点和“广播消息”不一样,Asynq 更像是“谁有空谁来干这件事”。
超时和截止时间
有些任务可能会执行很久,比如调用第三方接口、处理大文件、生成报表。如果任务一直卡住,不做任何限制,就可能占着消费者的并发名额不释放,影响其他任务执行。
Asynq 提供了两种常用的时间限制:
Timeout:限制“单次任务最多执行多久”。例如最多执行 3 秒,超过就取消。Deadline:限制“最晚必须在某个时间点前完成”。例如 10 秒后就是截止时间,到点还没完成就取消。
这里要特别理解 context:消费者函数里拿到的 ctx 就是任务的上下文。如果超时或到达截止时间,ctx.Done() 会收到信号,说明任务应该停止了。
消费者
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/hibiken/asynq"
)
const redisAddr = "127.0.0.1:6379"
func main() {
server := asynq.NewServer(
asynq.RedisClientOpt{Addr: redisAddr},
asynq.Config{Concurrency: 2},
)
mux := asynq.NewServeMux()
mux.HandleFunc("time:timeout", slowHandler)
mux.HandleFunc("time:deadline", slowHandler)
log.Println("超时/截止时间消费者启动")
if err := server.Run(mux); err != nil {
log.Fatal(err)
}
}
func slowHandler(ctx context.Context, task *asynq.Task) error {
fmt.Printf("开始慢任务: type=%s payload=%s\n", task.Type(), task.Payload())
select {
case <-time.After(5 * time.Second):
fmt.Println("慢任务执行完成")
return nil
case <-ctx.Done():
// Timeout 或 Deadline 到期时会走到这里;返回 ctx.Err() 会触发失败/重试逻辑。
fmt.Printf("慢任务被取消: err=%v\n", ctx.Err())
return ctx.Err()
}
}
生产者
package main
import (
"fmt"
"log"
"time"
"github.com/hibiken/asynq"
)
const redisAddr = "127.0.0.1:6379"
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
defer client.Close()
timeoutTask := asynq.NewTask("time:timeout", []byte("slow job"))
timeoutInfo, err := client.Enqueue(
timeoutTask,
// Timeout 限制单次处理最长耗时;超过后 context 会被取消。
asynq.Timeout(3*time.Second),
asynq.MaxRetry(1),
)
if err != nil {
log.Fatal(err)
}
fmt.Printf("超时任务已入队: id=%s timeout=%s\n", timeoutInfo.ID, timeoutInfo.Timeout)
deadlineTask := asynq.NewTask("time:deadline", []byte("must finish before deadline"))
deadline := time.Now().Add(10 * time.Second)
deadlineInfo, err := client.Enqueue(
deadlineTask,
// Deadline 指定最晚完成时间;到期后 context 会被取消。
asynq.Deadline(deadline),
asynq.MaxRetry(1),
)
if err != nil {
log.Fatal(err)
}
fmt.Printf("截止时间任务已入队: id=%s deadline=%s\n", deadlineInfo.ID, deadline.Format(time.RFC3339))
}
重试和失败处理
后台任务经常会失败,但有些失败只是临时的。比如网络抖动、第三方服务短暂不可用、Redis/MySQL 瞬间连接失败。这类错误不应该马上放弃,而应该过一会儿再试。
Asynq 的重试机制就是为了解决这个问题:消费者只要返回非 nil 的 error,Asynq 就认为任务失败。如果这个任务还没超过最大重试次数,就会被放进 retry 队列,等一段时间后再次执行。
这个例子里面:
- 生产者通过
asynq.MaxRetry(3)设置最多重试 3 次。 - 消费者故意返回
errors.New("模拟业务失败"),用来演示失败后如何重试。 RetryDelayFunc用来控制每次失败后等多久再重试。
对零基础同学来说,可以记住一句话:返回 nil 表示成功,返回 error 表示失败并可能重试。
消费者
package main
import (
"context"
"errors"
"fmt"
"log"
"time"
"github.com/hibiken/asynq"
)
const redisAddr = "127.0.0.1:6379"
func main() {
server := asynq.NewServer(
asynq.RedisClientOpt{Addr: redisAddr},
asynq.Config{
Concurrency: 1,
RetryDelayFunc: func(n int, err error, task *asynq.Task) time.Duration {
// n 是第几次重试,从 1 开始。
// 这里为了演示方便,固定 5 秒后重试;生产环境通常使用指数退避。
fmt.Printf("任务失败,准备第 %d 次重试: type=%s err=%v\n", n, task.Type(), err)
return 5 * time.Second
},
},
)
mux := asynq.NewServeMux()
mux.HandleFunc("retry:demo", func(ctx context.Context, task *asynq.Task) error {
fmt.Printf("处理重试任务: payload=%s\n", task.Payload())
// 返回非 nil error 就会触发重试;重试耗尽后任务会进入 archived。
return errors.New("模拟业务失败")
})
log.Println("重试消费者启动")
if err := server.Run(mux); err != nil {
log.Fatal(err)
}
}
生产者
package main
import (
"fmt"
"log"
"github.com/hibiken/asynq"
)
const redisAddr = "127.0.0.1:6379"
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
defer client.Close()
task := asynq.NewTask("retry:demo", []byte("this task will fail several times"))
// MaxRetry(3) 表示失败后最多重试 3 次;加上首次执行,最多会处理 4 次。
info, err := client.Enqueue(task, asynq.MaxRetry(3))
if err != nil {
log.Fatal(err)
}
fmt.Printf("重试示例任务已入队: id=%s max_retry=%d\n", info.ID, info.MaxRetry)
}
优先级任务
不是所有后台任务都一样重要。比如“服务器宕机告警”应该尽快处理,而“每天生成报表”可以晚一点处理。这时候就可以使用优先级队列。
Asynq 的优先级不是给单个任务设置一个数字,而是把任务放进不同队列,例如:
critical:高优先级队列,适合告警、支付、重要通知。default:普通队列,适合大多数业务任务。low:低优先级队列,适合统计、报表、清理任务。
消费者通过 Config.Queues 给队列设置权重。权重越高,被消费者取到的概率越大。注意它不是绝对排序,不是说 critical 全部执行完才执行 low,而是“高优先级更容易被取到”。
消费者
package main
import (
"context"
"fmt"
"log"
"github.com/hibiken/asynq"
)
const redisAddr = "127.0.0.1:6379"
func main() {
server := asynq.NewServer(
asynq.RedisClientOpt{Addr: redisAddr},
asynq.Config{
Concurrency: 3,
Queues: map[string]int{
// 权重越高,被取到的概率越大;不是严格排序,而是加权调度。
"critical": 6,
"default": 3,
"low": 1,
},
},
)
mux := asynq.NewServeMux()
mux.HandleFunc("priority:demo", func(ctx context.Context, task *asynq.Task) error {
fmt.Printf("处理优先级任务: payload=%s\n", task.Payload())
return nil
})
log.Println("优先级消费者启动,监听 critical/default/low 队列")
if err := server.Run(mux); err != nil {
log.Fatal(err)
}
}
生产者
package main
import (
"fmt"
"log"
"github.com/hibiken/asynq"
)
const redisAddr = "127.0.0.1:6379"
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
defer client.Close()
tasks := []struct {
name string
queue string
body string
}{
{name: "低优先级报表", queue: "low", body: "daily report"},
{name: "默认任务", queue: "default", body: "normal email"},
{name: "高优先级告警", queue: "critical", body: "server down"},
}
for _, item := range tasks {
task := asynq.NewTask("priority:demo", []byte(item.body))
// Queue 指定任务进入哪个队列。
// 消费者通过 Config.Queues 给不同队列设置权重。
info, err := client.Enqueue(task, asynq.Queue(item.queue))
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s 已入队: id=%s queue=%s\n", item.name, info.ID, info.Queue)
}
}
延时任务
延时任务适合“现在创建任务,但不要马上执行”的场景。
常见例子:
- 用户下单后 30 分钟未支付,自动取消订单。
- 注册成功 10 分钟后,发送新手引导消息。
- 活动开始前 5 分钟,发送提醒通知。
Asynq 里面主要有两种写法:
ProcessIn(10*time.Second):从现在开始,10 秒后执行。ProcessAt(time):在指定时间点执行。
这类任务入队后,并不是立刻被消费者执行,而是先放在 Redis 的计划任务区域里。等时间到了,Asynq 会把它移动到可执行队列,消费者才能取到。
消费者
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/hibiken/asynq"
)
const redisAddr = "127.0.0.1:6379"
func main() {
server := asynq.NewServer(
asynq.RedisClientOpt{Addr: redisAddr},
asynq.Config{Concurrency: 5},
)
mux := asynq.NewServeMux()
mux.HandleFunc("schedule:delay", printScheduledTask)
mux.HandleFunc("schedule:at", printScheduledTask)
log.Println("延迟/指定时间消费者启动")
if err := server.Run(mux); err != nil {
log.Fatal(err)
}
}
func printScheduledTask(ctx context.Context, task *asynq.Task) error {
fmt.Printf("执行定时类任务: now=%s type=%s payload=%s\n", time.Now().Format(time.RFC3339), task.Type(), task.Payload())
return nil
}
生产者
package main
import (
"fmt"
"log"
"time"
"github.com/hibiken/asynq"
)
const redisAddr = "127.0.0.1:6379"
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
defer client.Close()
// ProcessIn 表示从现在开始延迟多久执行。
delayTask := asynq.NewTask("schedule:delay", []byte("run after 10 seconds"))
delayInfo, err := client.Enqueue(delayTask, asynq.ProcessIn(10*time.Second))
if err != nil {
log.Fatal(err)
}
fmt.Printf("延迟任务已入队: id=%s next_process_at=%s\n", delayInfo.ID, delayInfo.NextProcessAt.Format(time.RFC3339))
// ProcessAt 表示在指定时间执行。
at := time.Now().Add(30 * time.Second)
atTask := asynq.NewTask("schedule:at", []byte("run at a fixed time"))
atInfo, err := client.Enqueue(atTask, asynq.ProcessAt(at))
if err != nil {
log.Fatal(err)
}
fmt.Printf("指定时间任务已入队: id=%s process_at=%s\n", atInfo.ID, at.Format(time.RFC3339))
}
唯一任务
唯一任务用来解决“同一件事不要重复入队”的问题。
比如用户连续点击两次按钮,如果每次都创建一个任务,后台可能会重复发送两封邮件、重复同步两次订单、重复生成两份报表。Unique 和 TaskID 都可以用来减少这种重复。
它们的区别可以这样理解:
Unique:按任务内容去重。相同的任务类型、payload 和队列,在指定时间内只能入队一次。TaskID:由你自己指定任务 ID。只要这个 ID 已经存在,再次入队就会冲突。
对业务开发来说,如果你有明确的业务唯一键,例如订单号、用户 ID、账单 ID,通常 TaskID 更直观;如果只是希望短时间内相同内容不要重复提交,可以用 Unique。
消费者
package main
import (
"context"
"fmt"
"log"
"github.com/hibiken/asynq"
)
const redisAddr = "127.0.0.1:6379"
func main() {
server := asynq.NewServer(
asynq.RedisClientOpt{Addr: redisAddr},
asynq.Config{Concurrency: 5},
)
mux := asynq.NewServeMux()
mux.HandleFunc("unique:demo", printTask)
mux.HandleFunc("order:sync", printTask)
log.Println("唯一任务/TaskID 消费者启动")
if err := server.Run(mux); err != nil {
log.Fatal(err)
}
}
func printTask(ctx context.Context, task *asynq.Task) error {
fmt.Printf("执行任务: type=%s payload=%s\n", task.Type(), task.Payload())
return nil
}
生产者
package main
import (
"errors"
"fmt"
"log"
"time"
"github.com/hibiken/asynq"
)
const redisAddr = "127.0.0.1:6379"
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
defer client.Close()
uniqueTask := asynq.NewTask("unique:demo", []byte("same user same action"))
for i := 0; i < 2; i++ {
// Unique 会在指定时间内防止相同 type + payload + queue 的任务重复入队。
info, err := client.Enqueue(uniqueTask, asynq.Unique(5*time.Minute))
if err != nil {
if errors.Is(err, asynq.ErrDuplicateTask) {
fmt.Println("重复任务被 Unique 拦截")
continue
}
log.Fatal(err)
}
fmt.Printf("唯一任务已入队: id=%s\n", info.ID)
}
// TaskID 指定业务侧任务 ID;重复 ID 入队也会失败,适合防止同一个订单重复处理。
orderTask := asynq.NewTask("order:sync", []byte("order_10001"))
info, err := client.Enqueue(orderTask, asynq.TaskID("order-sync-10001"))
if err != nil {
if errors.Is(err, asynq.ErrTaskIDConflict) {
fmt.Println("指定 TaskID 已存在,跳过入队")
return
}
log.Fatal(err)
}
fmt.Printf("指定 TaskID 任务已入队: id=%s\n", info.ID)
}
周期性任务
周期性任务适合“每隔一段时间自动执行一次”的场景,也就是我们平时说的定时任务。
常见例子:
- 每天凌晨生成日报。
- 每 10 分钟同步一次外部数据。
- 每小时清理一次过期缓存。
Asynq 的周期任务由两个角色配合完成:
- 调度器 Scheduler:负责按照时间规则创建任务,并把任务放进 Redis。
- 消费者 Server:负责真正执行任务。
所以严格来说,周期任务不是“只需要消费者就好”。消费者只能执行任务,不能自己按周期创建任务;调度器才是负责“每隔多久创建一次任务”的组件。
学习时可以把 Scheduler 理解成“闹钟”。闹钟每 10 秒响一次,每响一次就往任务队列里放一张纸条;消费者看到纸条后,才开始真正干活。
消费者
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/hibiken/asynq"
)
const redisAddr = "127.0.0.1:6379"
func main() {
server := asynq.NewServer(
asynq.RedisClientOpt{Addr: redisAddr},
asynq.Config{Concurrency: 3},
)
mux := asynq.NewServeMux()
mux.HandleFunc("cron:demo", func(ctx context.Context, task *asynq.Task) error {
fmt.Printf("执行周期任务: now=%s payload=%s\n", time.Now().Format(time.RFC3339), task.Payload())
return nil
})
log.Println("周期任务消费者启动")
if err := server.Run(mux); err != nil {
log.Fatal(err)
}
}
调度器
package main
import (
"log"
"github.com/hibiken/asynq"
)
const redisAddr = "127.0.0.1:6379"
func main() {
scheduler := asynq.NewScheduler(
asynq.RedisClientOpt{Addr: redisAddr},
&asynq.SchedulerOpts{},
)
// Cron 表达式:每分钟执行一次。
// 也可以使用 "@every 10s" 这种 interval 写法,适合本地快速演示。
entryID, err := scheduler.Register("@every 10s", asynq.NewTask("cron:demo", []byte("tick")))
if err != nil {
log.Fatal(err)
}
log.Printf("周期任务已注册: entry_id=%s", entryID)
// Run 会阻塞当前进程;它只负责按周期把任务入队,真正执行仍需要消费者。
if err := scheduler.Run(); err != nil {
log.Fatal(err)
}
}
中间件
中间件不是 Asynq 独有的概念,很多 Web 框架也有中间件。它的作用是:在真正的业务处理函数前后,统一加一层逻辑。
比如你不想在每个任务处理函数里都手写日志、耗时统计、错误记录,就可以用中间件统一处理。
这个例子里面:
- 任务开始前记录日志。
- 调用真正的业务处理函数
next.ProcessTask(ctx, task)。 - 如果业务返回错误,就记录失败日志。
- 如果业务成功,就记录成功日志和耗时。
对初学者来说,可以把中间件理解成“外包装”。业务函数负责做具体事情,中间件负责在外面统一包一层公共逻辑。
消费者
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/hibiken/asynq"
)
const redisAddr = "127.0.0.1:6379"
func main() {
server := asynq.NewServer(
asynq.RedisClientOpt{Addr: redisAddr},
asynq.Config{Concurrency: 5},
)
mux := asynq.NewServeMux()
mux.Use(loggingMiddleware)
mux.HandleFunc("middleware:demo", func(ctx context.Context, task *asynq.Task) error {
fmt.Printf("业务处理: type=%s payload=%s\n", task.Type(), task.Payload())
return nil
})
log.Println("中间件消费者启动")
if err := server.Run(mux); err != nil {
log.Fatal(err)
}
}
func loggingMiddleware(next asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) error {
start := time.Now()
log.Printf("任务开始: type=%s", task.Type())
err := next.ProcessTask(ctx, task)
if err != nil {
log.Printf("任务失败: type=%s cost=%s err=%v", task.Type(), time.Since(start), err)
return err
}
log.Printf("任务成功: type=%s cost=%s", task.Type(), time.Since(start))
return nil
})
}
生产者
package main
import (
"fmt"
"log"
"github.com/hibiken/asynq"
)
const redisAddr = "127.0.0.1:6379"
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
defer client.Close()
task := asynq.NewTask("middleware:demo", []byte("observe me"))
info, err := client.Enqueue(task)
if err != nil {
log.Fatal(err)
}
fmt.Printf("中间件示例任务已入队: id=%s\n", info.ID)
}
可视化监控
任务队列运行起来以后,光看命令行日志有时候不够直观。比如你可能想知道:
- 当前队列里还有多少任务没执行。
- 哪些任务失败了。
- 哪些任务正在等待重试。
- 每个队列的处理速度怎么样。
asynqmon 就是 Asynq 官方提供的监控工具。它会启动一个 Web 页面,连接到 Redis,然后把任务队列状态展示出来。对学习和排查问题都很有帮助。
下面的 Docker 命令会启动一个 asynqmon 容器,并把页面暴露到本机 8080 端口。启动后可以在浏览器访问:http://127.0.0.1:8080
docker run --rm --name asynqmon -p 8080:8080 hibiken/asynqmon --redis-addr=host.docker.internal:6379
与 RabbitMQ、Kafka 的区别
虽然它们都能传递消息,但定位完全不同:
| 特性 | Asynq (Task Queue) | RabbitMQ (Message Broker) | Kafka (Event Streaming) |
|---|---|---|---|
| 核心抽象 | 任务 (Task):做一件具体的事 | 消息 (Message):路由与分发 | 流 (Stream):持久化的日志流 |
| 存储介质 | Redis | 内存/磁盘 | 磁盘 (高性能顺序写) |
| 重试/延时 | 内置:极其简单的 API 支持 | 需插件或死信队列实现 | 需自行实现逻辑 |
| 主要用途 | 异步业务处理(发邮件、报表) | 系统解耦、削峰填谷 | 大数据处理、日志聚合、溯源 |
| 复杂度 | 低:Go 原生感强,运维简单 | 中:需要维护集群和 Exchange | 高:需要维护 Zookeeper/KRaft |