之前我们学习了rabbitMQ消息队列
那是一个很常用的消息队列中间件,本教程学习另一个广泛用于日志收集的消息中间件
它的名字就是kafka
kafka的优势
只要是消息队列,都能实现解耦、异步、削峰
但是kafka的优势非常特别
高吞吐量
Kafka的高吞吐量表现堪称惊人。单机每秒处理几十上百万的消息量,即使存储了TB级别的消息,它依然能够保持稳定的性能
高性能
Kafka的高性能不仅体现在吞吐量上,还体现在它可以支持大量的客户端连接。单节点可以支持上千个客户端,并保证零停机和零数据丢失
因为kafka出色的高吞吐能力,所以kafka非常适合用来做日志收集
例如在我的威胁诱捕矩阵项目中,日志收集就是发送到kafka中,再通过logstash从kafka发送到es中

kafka单节点安装
不认证、不加密的单节点kafka安装
后期把kafka讲的差不多了,再回过头来安装(认证+加密+分布式)kafka集群
在早期的 Kafka 版本中,ZooKeeper 是 Kafka 集群不可或缺的一部分
kafka和zookeeper是什么关系
在 Kafka 2.8.0 版本之前(不包含 2.8.0),Kafka 必须依赖 ZooKeeper 才能运行,且 ZooKeeper 在当时是实现 Kafka 高可用的核心组件之一。
具体来说,早期版本的 Kafka 架构设计中,很多核心功能的实现完全依赖于 ZooKeeper:
- 集群的元数据(如 broker 列表、主题分区信息等)完全存储在 ZooKeeper 中
- 控制器的选举过程由 ZooKeeper 协调完成
- 分区副本的状态同步、故障检测等逻辑依赖 ZooKeeper 的监听机制
- 消费者组的重平衡(rebalance)过程也依赖 ZooKeeper 进行协调
在新版本中,可以使用KRaft模式实现kafka的高可用,终于可以不用再安装zookeeper了
KRaft具体原理有兴趣的可以看看这篇文章
https://cloud.tencent.com/developer/article/2508468
安装
使用docker-compose安装,非常简单
version: '3.8'
services:
kafka:
image: bitnami/kafka:3.9.0 #
container_name: kafka
restart: unless-stopped # 除非手动停止,否则容器退出时自动重启
environment:
- KAFKA_ENABLE_KRAFT=yes
# 节点角色:同时作为控制器(controller)和 broker
# 在KRaft模式下,节点可以是纯控制器、纯broker或混合角色
- KAFKA_CFG_PROCESS_ROLES=controller,broker
# 节点ID:在集群中唯一标识当前节点,取值范围1-255
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@10.3.0.1:9093
# 指定控制器使用的监听器名称(需与下方LISTENERS配置对应)
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# 指定broker之间通信使用的监听器名称
# 必须与ADVERTISED_LISTENERS中定义的监听器名称一致
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
# 配置Kafka监听的网络接口和端口
# 格式:监听器名称://绑定地址:端口
# INTERNAL: 用于内部通信和客户端连接的监听器,绑定所有网络接口(:9092)
# CONTROLLER: 用于控制器之间通信的监听器,绑定所有网络接口(:9093)
- KAFKA_CFG_LISTENERS=INTERNAL://:9092,CONTROLLER://:9093
# 配置对外公告的监听器地址(客户端实际连接的地址)
# 当客户端连接Kafka时,Kafka会返回此地址给客户端
# 这里设置为宿主机的IP:端口(需根据实际环境修改10.3.0.1为宿主机IP)
- KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://10.3.0.1:9092
# 配置监听器的安全协议映射
# 格式:监听器名称:协议类型
# PLAINTEXT表示不加密传输(适用于开发环境)
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT
ports:
- "9092:9092" # 客户端连接端口
- "9093:9093" # 控制器通信端口
解释一下配置中的疑点:
其中9092是 Kafka 的客户端通信端口,主要用于外部客户端(如生产者、消费者)与 Kafka broker 之间的通信
9093是 Kafka 的控制器通信端口,仅用于 KRaft 模式下控制器之间的内部通信
go kafka消费者生产者
生产者
package main
import (
"fmt"
"log"
"github.com/IBM/sarama"
)
func main() {
// 配置生产者
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认
config.Producer.Retry.Max = 3 // 重试次数
config.Producer.Return.Successes = true // 返回成功的消息
// 创建同步生产者(也可创建异步生产者)
producer, err := sarama.NewSyncProducer([]string{"10.3.0.1:9092"}, config)
if err != nil {
log.Fatalf("Failed to create producer: %s", err)
}
defer producer.Close()
// 发送消息
topic := "test-topic"
for i := 0; i < 5; i++ {
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(fmt.Sprintf("Hello Sarama! Message %d", i)),
}
// 同步发送,返回分区和偏移量
partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Printf("Failed to send message: %s", err)
} else {
fmt.Printf("Message sent to partition %d, offset %d\n", partition, offset)
}
}
}
消费者
package main
import (
"fmt"
"github.com/IBM/sarama"
"log"
)
func main() {
// 配置消费者
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest // 从最早的消息开始消费
// 2. 创建基础消费者(非消费组模式)
consumer, err := sarama.NewConsumer([]string{"10.3.0.1:9092"}, config)
if err != nil {
log.Fatalf("Failed to create consumer: %s", err)
}
defer consumer.Close()
// 3. 指定要消费的主题和分区
topic := "test-topic"
partition := int32(0) // 分区编号(从0开始)
// 4. 获取分区消费者(从指定偏移量开始消费)
// 第三个参数为起始偏移量:可以是 sarama.OffsetOldest / sarama.OffsetNewest 或具体数值
pc, err := consumer.ConsumePartition(topic, partition, sarama.OffsetNewest)
if err != nil {
log.Fatalf("Failed to consume partition: %s", err)
}
defer pc.Close()
for msg := range pc.Messages() {
// 成功接收到消息
fmt.Printf("%s %s %d %d\n", msg.Topic, msg.Value, msg.Partition, msg.Offset)
}
}
多生产者单消费者
大部分使用kafka的场景就是多个生产者生产消息,往kafka里面发
然后消费者在后面慢慢消费
这种模式不需要做额外配置,生产者只管往这个topic里面发就好了
发布订阅模式
一个生产者,多个消费者
如果是使用默认消费者(就是上节课的消费者),默认也是发布订阅模式
生产者的消息,会广播给所有消费者
消费者组
之前的默认消费者,会有一个很大的问题,它要么只能从最开始的位置消费
要么从最新的位置消费,要么就自己指定偏移去消费
消费者自己是不知道自己消费到哪里了
使用消费者组(Consumer Group) 可以自动实现 “记录消费位置” 的功能,无需手动管理偏移量,这是消费组最核心的优势之一。
消费组会通过 Kafka 集群自动跟踪和存储每个分区的消费偏移量(默认存储在 Kafka 内部的 __consumer_offsets 主题中),并在消费者重启或重平衡(Rebalance)时自动恢复消费位置,无需手动持久化偏移量。
package main
import (
"context"
"fmt"
"github.com/IBM/sarama"
"log"
"os"
"os/signal"
"syscall"
)
// GroupConsumer 自定义消费者,实现 sarama.ConsumerGroupHandler 接口
type GroupConsumer struct{}
// Setup 在分区分配完成后调用(初始化)
func (c *GroupConsumer) Setup(_ sarama.ConsumerGroupSession) error {
return nil
}
// Cleanup 在分区被重新分配前调用(清理)
func (c *GroupConsumer) Cleanup(_ sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim 处理分区消息(核心逻辑)
func (c *GroupConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// 遍历分区消息
for msg := range claim.Messages() {
// 处理消息(实际业务逻辑)
fmt.Printf("Received message: %s (topic: %s, partition: %d, offset: %d)\n",
string(msg.Value), msg.Topic, msg.Partition, msg.Offset)
// 手动提交偏移量(可选,若关闭自动提交则必须手动调用)
// 注意:需在消息处理成功后再提交,避免消息丢失
session.MarkMessage(msg, "") // 标记消息为已处理,会在 session 结束时提交
}
return nil
}
func main() {
// 配置消费组
config := sarama.NewConfig()
config.Version = sarama.V2_8_1_0 // 指定 Kafka 版本(需与集群版本匹配)
// 偏移量配置:默认自动提交(可改为手动提交)
config.Consumer.Offsets.AutoCommit.Enable = true // 开启自动提交
config.Consumer.Offsets.AutoCommit.Interval = 5000 // 自动提交间隔(毫秒)
config.Consumer.Offsets.Initial = sarama.OffsetOldest // 初始偏移量(首次消费时)
// 创建消费组
groupID := "auto-offset-group" // 消费组唯一标识
consumerGroup, err := sarama.NewConsumerGroup(
[]string{"10.3.0.1:9092"}, // Kafka 地址
groupID,
config,
)
if err != nil {
log.Fatalf("Failed to create consumer group: %v", err)
}
defer consumerGroup.Close()
// 要消费的主题
topics := []string{"test-topic"}
consumer := &GroupConsumer{}
// 启动消费循环
go func() {
for {
// 持续消费(重平衡后会重新调用)
if err := consumerGroup.Consume(context.Background(), topics, consumer); err != nil {
log.Printf("Consume error: %v", err)
return
}
}
}()
// 等待中断信号
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
fmt.Println("Shutting down consumer group...")
}
负载均衡模式
如果跑相同的消费者组的消费者
则默认只有其中一个消费者可以消费消息(因为topic只有一个分区)
然后把那个能消费消息的消费者停掉,另一个就可以消费到消息了,这个就是负载均衡模式
当然,广义上的负载均衡模式,应该是一个生产者生产消息,消息被轮询到不同的消费者上
这个需要在创建topic的时候,指定分区数量
package main
import (
"fmt"
"github.com/IBM/sarama"
"log"
"time"
)
// 检查主题是否存在(旧版API)
func topicExists(client sarama.Client, topic string) (bool, error) {
// 获取所有主题
topics, err := client.Topics()
if err != nil {
return false, fmt.Errorf("获取主题列表失败: %v", err)
}
// 检查目标主题是否在列表中
for _, t := range topics {
if t == topic {
return true, nil
}
}
return false, nil
}
// 创建主题(旧版API)
func createTopicIfNotExists(brokers []string, topic string, partitions int32, replicationFactor int16) error {
// 配置客户端
config := sarama.NewConfig()
config.Net.DialTimeout = 5 * time.Second
// 创建基础客户端(旧版没有专门的AdminClient)
client, err := sarama.NewClient(brokers, config)
if err != nil {
return fmt.Errorf("创建客户端失败: %v", err)
}
defer client.Close()
// 检查主题是否存在
exists, err := topicExists(client, topic)
if err != nil {
return fmt.Errorf("检查主题存在性失败: %v", err)
}
if exists {
log.Printf("主题 %s 已存在,无需创建", topic)
return nil
}
// 构建创建主题的请求
request := sarama.CreateTopicsRequest{
TopicDetails: map[string]*sarama.TopicDetail{
topic: {
NumPartitions: partitions,
ReplicationFactor: replicationFactor,
ConfigEntries: make(map[string]*string),
},
},
Timeout: 10 * time.Second, // 10秒超时
}
// 通过客户端发送创建请求(旧版需手动选择broker)
broker, err := client.Controller()
if err != nil {
return fmt.Errorf("无法获取控制器broker")
}
// 发送请求
response, err := broker.CreateTopics(&request)
if err != nil {
return fmt.Errorf("发送创建请求失败: %v", err)
}
// 检查响应结果
for topic, err := range response.TopicErrors {
fmt.Println(topic, err)
}
log.Printf("主题 %s 创建成功,分区数: %d,副本数: %d", topic, partitions, replicationFactor)
return nil
}
func main() {
brokers := []string{"10.3.0.1:9092"}
topicName := "legacy-topic"
partitionCount := int32(3)
replicationFactor := int16(1)
err := createTopicIfNotExists(brokers, topicName, partitionCount, replicationFactor)
if err != nil {
log.Fatalf("操作失败: %v", err)
}
log.Println("操作完成")
}
然后在同一个消费者组里面的消费者,就会消费特定分区的消息了
如果消费者想消费全量消息,那用不同的消费者组就好了
kafka-map
这个是操作kafka的客户端
一般来说,在企业里面,是不能通过代码去创建topic的,一般是由专门的人去创建topic
docker run -d -p 8080:8080 -e DEFAULT_USERNAME=admin -e DEFAULT_PASSWORD=admin --name kafka-map --restart always dushixiang/kafka-map:latest
这个作者我认识,还给我的书写了书评
用户名密码就是 admin admin
进来之后,可以添加集群
添加集群

添加topic

发布、消费消息


kafka安全配置
配置认证、加密
先生成加密用的公私钥
#!/bin/bash
# 创建证书目录(与 Docker 挂载目录一致)
mkdir -p kafka-certs && cd kafka-certs
# 生成 CA 证书
openssl genrsa -out ca.key 2048
openssl req -new -x509 -key ca.key -out ca.crt -days 3650 \
-subj "/C=CN/ST=Beijing/L=Beijing/O=YourCompany/CN=kafka-ca"
# 生成 Broker 证书(含 SAN 扩展)
cat > san.cnf <<EOF
[req]
distinguished_name = req_distinguished_name
req_extensions = v3_req
prompt = no
[req_distinguished_name]
C = CN
ST = Beijing
L = Beijing
O = YourCompany
CN = kafka-broker
[v3_req]
subjectAltName = @alt_names
[alt_names]
IP.1 = 192.168.80.165
DNS.1 = kafka-broker
IP.2 = 10.3.0.1
DNS.2 = kafka-broker
EOF
openssl genrsa -out kafka.server.key 2048
openssl req -new -key kafka.server.key -out kafka.server.csr -config san.cnf
openssl x509 -req -in kafka.server.csr -CA ca.crt -CAkey ca.key -CAcreateserial \
-out kafka.server.crt -days 3650 -extensions v3_req -extfile san.cnf
# 生成 JKS 格式证书(Bitnami 镜像专用)
openssl pkcs12 -export \
-in kafka.server.crt -inkey kafka.server.key \
-name kafka-ssl -out kafka.keystore.p12 \
-password pass:yourpassword -certfile ca.crt
keytool -importkeystore \
-srckeystore kafka.keystore.p12 -srcstoretype pkcs12 \
-destkeystore kafka.keystore.jks -deststoretype JKS \
-alias kafka-ssl -srcstorepass yourpassword -deststorepass yourpassword
# 生成 Truststore
keytool -keystore kafka.truststore.jks \
-alias ca-cert -import -file ca.crt \
-storepass yourpassword -noprompt
echo "证书已生成到 kafka-certs 目录,请挂载到容器的 /bitnami/kafka/config/certs"
docker-compose.yaml
services:
kafka-server:
image: bitnami/kafka:3.9.0
restart: always
ports:
- "9095:9095"
environment:
# KRaft 基础配置
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@10.3.0.1:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# SSL 核心参数
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SSL
- KAFKA_CFG_SSL_PROTOCOL=TLSv1.2
- KAFKA_CFG_SSL_CLIENT_AUTH=none # 生产环境建议改为 required
- KAFKA_CFG_SSL_KEYSTORE_LOCATION=/bitnami/kafka/config/certs/kafka.keystore.jks
- KAFKA_CFG_SSL_KEYSTORE_PASSWORD=yourpassword
- KAFKA_CFG_SSL_KEY_PASSWORD=yourpassword
# 监听器配置(关键!)
- KAFKA_CFG_LISTENERS=INTERNAL://:9092,CONTROLLER://:9093,SSL://:9095 # 移除9094,将INTERNAL绑定到9092
- KAFKA_CFG_ADVERTISED_LISTENERS=SSL://kafka-broker:9095,INTERNAL://10.3.0.1:9092 # 内部服务地址改为9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,SSL:SASL_SSL,INTERNAL:PLAINTEXT # 移除PLAINTEXT相关映射
# SASL 认证(与 SSL 叠加)
- KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN
- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
- KAFKA_CLIENT_USERS=user1
- KAFKA_CLIENT_PASSWORDS=password1
volumes:
- ./kafka/kafka-certs:/bitnami/kafka/config/certs
kafka-map目前是连不上加了密的kafka的
安全模式下的生产者
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
"github.com/IBM/sarama"
"log"
"os"
)
func main() {
// Kafka 配置
config := sarama.NewConfig()
// SASL 认证配置
config.Net.SASL.Enable = true
config.Net.SASL.User = "user1"
config.Net.SASL.Password = "password1"
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
// TLS 配置
config.Net.TLS.Enable = true
tlsConfig, err := newTLSConfig()
if err != nil {
log.Fatalf("Failed to create TLS config: %v", err)
}
config.Net.TLS.Config = tlsConfig
// 生产者配置
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Producer.RequiredAcks = sarama.WaitForAll
// 创建生产者
producer, err := sarama.NewSyncProducer([]string{"kafka-broker:9095"}, config)
if err != nil {
log.Fatalf("Failed to create producer: %v", err)
}
defer producer.Close()
// 准备消息
topic := "test-topic"
message := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder("Hello Kafka from Sarama!"),
}
// 发送消息
partition, offset, err := producer.SendMessage(message)
if err != nil {
log.Fatalf("Failed to send message: %v", err)
}
fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset)
}
func newTLSConfig() (*tls.Config, error) {
// 1. 加载 CA 证书(用于验证服务端)
caCert, err := os.ReadFile("deploy-auth-ssl/kafka/kafka-certs/ca.crt")
if err != nil {
return nil, fmt.Errorf("failed to read CA cert: %v", err)
}
caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("failed to parse CA cert")
}
// 2. 直接加载 PEM 格式的证书和私钥
cert, err := tls.LoadX509KeyPair(
"deploy-auth-ssl/kafka/kafka-certs/kafka.server.crt", // 从 PKCS12 导出的服务端证书
"deploy-auth-ssl/kafka/kafka-certs/kafka.server.key", // 从 PKCS12 导出的私钥
)
if err != nil {
return nil, fmt.Errorf("failed to load key pair: %v", err)
}
return &tls.Config{
RootCAs: caCertPool,
Certificates: []tls.Certificate{cert},
}, nil
}
安全模式下的消费者
kafka高可用配置
集群配置,部分机器挂了不影响整体可用
已简单模式的kafka为例了
version: '3.8'
services:
kafka-1:
image: bitnami/kafka:3.9.0
container_name: kafka-1
restart: unless-stopped
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_KRAFT_CLUSTER_ID=abcdef1234567890 # 集群唯一ID
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_NODE_ID=1 # 唯一节点ID
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka-1:9093,2@kafka-2:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
# 核心修正:listeners和advertised.listeners使用相同的监听器名称
- KAFKA_CFG_LISTENERS=INTERNAL://:9092,EXTERNAL://:9094,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-1:9092,EXTERNAL://10.3.0.1:9092
# 确保协议映射包含所有监听器名称
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
ports:
- "9092:9094" # 外部访问映射到EXTERNAL监听器的9094端口
- "9093:9093" # 控制器端口
networks:
- kafka-network
kafka-2:
image: bitnami/kafka:3.9.0
container_name: kafka-2
restart: unless-stopped
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_KRAFT_CLUSTER_ID=abcdef1234567890 # 与节点1相同的集群ID
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_NODE_ID=2 # 唯一节点ID
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka-1:9093,2@kafka-2:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
# 核心修正:保持监听器名称一致
- KAFKA_CFG_LISTENERS=INTERNAL://:9092,EXTERNAL://:9095,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-2:9092,EXTERNAL://10.3.0.1:9094
# 协议映射包含所有监听器名称
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
ports:
- "9094:9095" # 外部访问映射到EXTERNAL监听器的9095端口
- "9095:9093" # 控制器端口(避免冲突)
networks:
- kafka-network
networks:
kafka-network:
driver: bridge
kafka-map里面就这样写就好了
