kafka零基础入门课程
之前我们学习了rabbitMQ消息队列 那是一个很常用的消息队列中间件,本教程学习另一个广泛用于日志收集的消息中间件 它的名字就是kafka kafka的优势 只要是消息队列,都能实现解耦、异

kafka零基础入门课程

发布时间:2025-08-15 (2025-08-15)

之前我们学习了rabbitMQ消息队列

那是一个很常用的消息队列中间件,本教程学习另一个广泛用于日志收集的消息中间件

它的名字就是kafka

kafka的优势

只要是消息队列,都能实现解耦、异步、削峰

但是kafka的优势非常特别

  1. 高吞吐量

    Kafka的高吞吐量表现堪称惊人。单机每秒处理几十上百万的消息量,即使存储了TB级别的消息,它依然能够保持稳定的性能

  2. 高性能

    Kafka的高性能不仅体现在吞吐量上,还体现在它可以支持大量的客户端连接。单节点可以支持上千个客户端,并保证零停机和零数据丢失

因为kafka出色的高吞吐能力,所以kafka非常适合用来做日志收集

例如在我的威胁诱捕矩阵项目中,日志收集就是发送到kafka中,再通过logstash从kafka发送到es中

kafka单节点安装

不认证、不加密的单节点kafka安装

后期把kafka讲的差不多了,再回过头来安装(认证+加密+分布式)kafka集群

在早期的 Kafka 版本中,ZooKeeper 是 Kafka 集群不可或缺的一部分

kafka和zookeeper是什么关系

在 Kafka 2.8.0 版本之前(不包含 2.8.0),Kafka 必须依赖 ZooKeeper 才能运行,且 ZooKeeper 在当时是实现 Kafka 高可用的核心组件之一。

具体来说,早期版本的 Kafka 架构设计中,很多核心功能的实现完全依赖于 ZooKeeper:

  • 集群的元数据(如 broker 列表、主题分区信息等)完全存储在 ZooKeeper 中
  • 控制器的选举过程由 ZooKeeper 协调完成
  • 分区副本的状态同步、故障检测等逻辑依赖 ZooKeeper 的监听机制
  • 消费者组的重平衡(rebalance)过程也依赖 ZooKeeper 进行协调

在新版本中,可以使用KRaft模式实现kafka的高可用,终于可以不用再安装zookeeper了

KRaft具体原理有兴趣的可以看看这篇文章

https://cloud.tencent.com/developer/article/2508468

安装

使用docker-compose安装,非常简单

version: '3.8'
services:
  kafka:
    image: bitnami/kafka:3.9.0  #
    container_name: kafka
    restart: unless-stopped  # 除非手动停止,否则容器退出时自动重启
    environment:
      - KAFKA_ENABLE_KRAFT=yes
      # 节点角色:同时作为控制器(controller)和 broker
      # 在KRaft模式下,节点可以是纯控制器、纯broker或混合角色
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      # 节点ID:在集群中唯一标识当前节点,取值范围1-255
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@10.3.0.1:9093
      # 指定控制器使用的监听器名称(需与下方LISTENERS配置对应)
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      # 指定broker之间通信使用的监听器名称
      # 必须与ADVERTISED_LISTENERS中定义的监听器名称一致
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
      # 配置Kafka监听的网络接口和端口
      # 格式:监听器名称://绑定地址:端口
      # INTERNAL: 用于内部通信和客户端连接的监听器,绑定所有网络接口(:9092)
      # CONTROLLER: 用于控制器之间通信的监听器,绑定所有网络接口(:9093)
      - KAFKA_CFG_LISTENERS=INTERNAL://:9092,CONTROLLER://:9093
      # 配置对外公告的监听器地址(客户端实际连接的地址)
      # 当客户端连接Kafka时,Kafka会返回此地址给客户端
      # 这里设置为宿主机的IP:端口(需根据实际环境修改10.3.0.1为宿主机IP)
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://10.3.0.1:9092
      # 配置监听器的安全协议映射
      # 格式:监听器名称:协议类型
      # PLAINTEXT表示不加密传输(适用于开发环境)
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT
    ports:
      - "9092:9092"  # 客户端连接端口
      - "9093:9093"  # 控制器通信端口

解释一下配置中的疑点:

其中9092是 Kafka 的客户端通信端口,主要用于外部客户端(如生产者、消费者)与 Kafka broker 之间的通信

9093是 Kafka 的控制器通信端口,仅用于 KRaft 模式下控制器之间的内部通信

go kafka消费者生产者

生产者

package main

import (
  "fmt"
  "log"

  "github.com/IBM/sarama"
)

func main() {
  // 配置生产者
  config := sarama.NewConfig()
  config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认
  config.Producer.Retry.Max = 3                    // 重试次数
  config.Producer.Return.Successes = true          // 返回成功的消息

  // 创建同步生产者(也可创建异步生产者)
  producer, err := sarama.NewSyncProducer([]string{"10.3.0.1:9092"}, config)
  if err != nil {
    log.Fatalf("Failed to create producer: %s", err)
  }
  defer producer.Close()

  // 发送消息
  topic := "test-topic"
  for i := 0; i < 5; i++ {
    msg := &sarama.ProducerMessage{
      Topic: topic,
      Value: sarama.StringEncoder(fmt.Sprintf("Hello Sarama! Message %d", i)),
    }

    // 同步发送,返回分区和偏移量
    partition, offset, err := producer.SendMessage(msg)
    if err != nil {
      log.Printf("Failed to send message: %s", err)
    } else {
      fmt.Printf("Message sent to partition %d, offset %d\n", partition, offset)
    }
  }
}

消费者

package main

import (
  "fmt"
  "github.com/IBM/sarama"
  "log"
)

func main() {
  // 配置消费者
  config := sarama.NewConfig()
  config.Consumer.Return.Errors = true
  config.Consumer.Offsets.Initial = sarama.OffsetOldest // 从最早的消息开始消费

  // 2. 创建基础消费者(非消费组模式)
  consumer, err := sarama.NewConsumer([]string{"10.3.0.1:9092"}, config)
  if err != nil {
    log.Fatalf("Failed to create consumer: %s", err)
  }
  defer consumer.Close()

  // 3. 指定要消费的主题和分区
  topic := "test-topic"
  partition := int32(0) // 分区编号(从0开始)

  // 4. 获取分区消费者(从指定偏移量开始消费)
  // 第三个参数为起始偏移量:可以是 sarama.OffsetOldest / sarama.OffsetNewest 或具体数值
  pc, err := consumer.ConsumePartition(topic, partition, sarama.OffsetNewest)
  if err != nil {
    log.Fatalf("Failed to consume partition: %s", err)
  }
  defer pc.Close()

  for msg := range pc.Messages() {
    // 成功接收到消息
    fmt.Printf("%s %s %d %d\n", msg.Topic, msg.Value, msg.Partition, msg.Offset)
  }
}

多生产者单消费者

大部分使用kafka的场景就是多个生产者生产消息,往kafka里面发

然后消费者在后面慢慢消费

这种模式不需要做额外配置,生产者只管往这个topic里面发就好了

发布订阅模式

一个生产者,多个消费者

如果是使用默认消费者(就是上节课的消费者),默认也是发布订阅模式

生产者的消息,会广播给所有消费者

消费者组

之前的默认消费者,会有一个很大的问题,它要么只能从最开始的位置消费

要么从最新的位置消费,要么就自己指定偏移去消费

消费者自己是不知道自己消费到哪里了

使用消费者组(Consumer Group) 可以自动实现 “记录消费位置” 的功能,无需手动管理偏移量,这是消费组最核心的优势之一。

消费组会通过 Kafka 集群自动跟踪和存储每个分区的消费偏移量(默认存储在 Kafka 内部的 __consumer_offsets 主题中),并在消费者重启或重平衡(Rebalance)时自动恢复消费位置,无需手动持久化偏移量。

package main

import (
  "context"
  "fmt"
  "github.com/IBM/sarama"
  "log"
  "os"
  "os/signal"
  "syscall"
)

// GroupConsumer 自定义消费者,实现 sarama.ConsumerGroupHandler 接口
type GroupConsumer struct{}

// Setup 在分区分配完成后调用(初始化)
func (c *GroupConsumer) Setup(_ sarama.ConsumerGroupSession) error {
  return nil
}

// Cleanup 在分区被重新分配前调用(清理)
func (c *GroupConsumer) Cleanup(_ sarama.ConsumerGroupSession) error {
  return nil
}

// ConsumeClaim 处理分区消息(核心逻辑)
func (c *GroupConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  // 遍历分区消息
  for msg := range claim.Messages() {
    // 处理消息(实际业务逻辑)
    fmt.Printf("Received message: %s (topic: %s, partition: %d, offset: %d)\n",
      string(msg.Value), msg.Topic, msg.Partition, msg.Offset)

    // 手动提交偏移量(可选,若关闭自动提交则必须手动调用)
    // 注意:需在消息处理成功后再提交,避免消息丢失
    session.MarkMessage(msg, "") // 标记消息为已处理,会在 session 结束时提交
  }
  return nil
}

func main() {
  // 配置消费组
  config := sarama.NewConfig()
  config.Version = sarama.V2_8_1_0 // 指定 Kafka 版本(需与集群版本匹配)

  // 偏移量配置:默认自动提交(可改为手动提交)
  config.Consumer.Offsets.AutoCommit.Enable = true      // 开启自动提交
  config.Consumer.Offsets.AutoCommit.Interval = 5000    // 自动提交间隔(毫秒)
  config.Consumer.Offsets.Initial = sarama.OffsetOldest // 初始偏移量(首次消费时)

  // 创建消费组
  groupID := "auto-offset-group" // 消费组唯一标识
  consumerGroup, err := sarama.NewConsumerGroup(
    []string{"10.3.0.1:9092"}, // Kafka 地址
    groupID,
    config,
  )
  if err != nil {
    log.Fatalf("Failed to create consumer group: %v", err)
  }
  defer consumerGroup.Close()

  // 要消费的主题
  topics := []string{"test-topic"}
  consumer := &GroupConsumer{}

  // 启动消费循环
  go func() {
    for {
      // 持续消费(重平衡后会重新调用)
      if err := consumerGroup.Consume(context.Background(), topics, consumer); err != nil {
        log.Printf("Consume error: %v", err)
        return
      }
    }
  }()

  // 等待中断信号
  sigChan := make(chan os.Signal, 1)
  signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
  <-sigChan

  fmt.Println("Shutting down consumer group...")
}

负载均衡模式

如果跑相同的消费者组的消费者

则默认只有其中一个消费者可以消费消息(因为topic只有一个分区)

然后把那个能消费消息的消费者停掉,另一个就可以消费到消息了,这个就是负载均衡模式

当然,广义上的负载均衡模式,应该是一个生产者生产消息,消息被轮询到不同的消费者上

这个需要在创建topic的时候,指定分区数量

package main

import (
  "fmt"
  "github.com/IBM/sarama"
  "log"
  "time"

)

// 检查主题是否存在(旧版API)
func topicExists(client sarama.Client, topic string) (bool, error) {
  // 获取所有主题
  topics, err := client.Topics()
  if err != nil {
    return false, fmt.Errorf("获取主题列表失败: %v", err)
  }

  // 检查目标主题是否在列表中
  for _, t := range topics {
    if t == topic {
      return true, nil
    }
  }
  return false, nil
}

// 创建主题(旧版API)
func createTopicIfNotExists(brokers []string, topic string, partitions int32, replicationFactor int16) error {
  // 配置客户端
  config := sarama.NewConfig()
  config.Net.DialTimeout = 5 * time.Second

  // 创建基础客户端(旧版没有专门的AdminClient)
  client, err := sarama.NewClient(brokers, config)
  if err != nil {
    return fmt.Errorf("创建客户端失败: %v", err)
  }
  defer client.Close()

  // 检查主题是否存在
  exists, err := topicExists(client, topic)
  if err != nil {
    return fmt.Errorf("检查主题存在性失败: %v", err)
  }

  if exists {
    log.Printf("主题 %s 已存在,无需创建", topic)
    return nil
  }

  // 构建创建主题的请求
  request := sarama.CreateTopicsRequest{
    TopicDetails: map[string]*sarama.TopicDetail{
      topic: {
        NumPartitions:     partitions,
        ReplicationFactor: replicationFactor,
        ConfigEntries:     make(map[string]*string),
      },
    },
    Timeout: 10 * time.Second, // 10秒超时
  }

  // 通过客户端发送创建请求(旧版需手动选择broker)
  broker, err := client.Controller()
  if err != nil {
    return fmt.Errorf("无法获取控制器broker")
  }

  // 发送请求
  response, err := broker.CreateTopics(&request)
  if err != nil {
    return fmt.Errorf("发送创建请求失败: %v", err)
  }

  // 检查响应结果
  for topic, err := range response.TopicErrors {
    fmt.Println(topic, err)
  }

  log.Printf("主题 %s 创建成功,分区数: %d,副本数: %d", topic, partitions, replicationFactor)
  return nil
}

func main() {
  brokers := []string{"10.3.0.1:9092"}
  topicName := "legacy-topic"
  partitionCount := int32(3)
  replicationFactor := int16(1)

  err := createTopicIfNotExists(brokers, topicName, partitionCount, replicationFactor)
  if err != nil {
    log.Fatalf("操作失败: %v", err)
  }
  log.Println("操作完成")
}

然后在同一个消费者组里面的消费者,就会消费特定分区的消息了

如果消费者想消费全量消息,那用不同的消费者组就好了

kafka-map

这个是操作kafka的客户端

一般来说,在企业里面,是不能通过代码去创建topic的,一般是由专门的人去创建topic

docker run -d -p 8080:8080 -e DEFAULT_USERNAME=admin  -e DEFAULT_PASSWORD=admin --name kafka-map  --restart always dushixiang/kafka-map:latest

这个作者我认识,还给我的书写了书评

用户名密码就是 admin admin

进来之后,可以添加集群

添加集群

添加topic

发布、消费消息

kafka安全配置

配置认证、加密

先生成加密用的公私钥

#!/bin/bash

# 创建证书目录(与 Docker 挂载目录一致)
mkdir -p kafka-certs && cd kafka-certs

# 生成 CA 证书
openssl genrsa -out ca.key 2048
openssl req -new -x509 -key ca.key -out ca.crt -days 3650 \
  -subj "/C=CN/ST=Beijing/L=Beijing/O=YourCompany/CN=kafka-ca"

# 生成 Broker 证书(含 SAN 扩展)
cat > san.cnf <<EOF
[req]
distinguished_name = req_distinguished_name
req_extensions = v3_req
prompt = no

[req_distinguished_name]
C = CN
ST = Beijing
L = Beijing
O = YourCompany
CN = kafka-broker

[v3_req]
subjectAltName = @alt_names

[alt_names]
IP.1 = 192.168.80.165
DNS.1 = kafka-broker
IP.2 = 10.3.0.1
DNS.2 = kafka-broker
EOF

openssl genrsa -out kafka.server.key 2048
openssl req -new -key kafka.server.key -out kafka.server.csr -config san.cnf
openssl x509 -req -in kafka.server.csr -CA ca.crt -CAkey ca.key -CAcreateserial \
  -out kafka.server.crt -days 3650 -extensions v3_req -extfile san.cnf

# 生成 JKS 格式证书(Bitnami 镜像专用)
openssl pkcs12 -export \
  -in kafka.server.crt -inkey kafka.server.key \
  -name kafka-ssl -out kafka.keystore.p12 \
  -password pass:yourpassword -certfile ca.crt

keytool -importkeystore \
  -srckeystore kafka.keystore.p12 -srcstoretype pkcs12 \
  -destkeystore kafka.keystore.jks -deststoretype JKS \
  -alias kafka-ssl -srcstorepass yourpassword -deststorepass yourpassword

# 生成 Truststore
keytool -keystore kafka.truststore.jks \
  -alias ca-cert -import -file ca.crt \
  -storepass yourpassword -noprompt

echo "证书已生成到 kafka-certs 目录,请挂载到容器的 /bitnami/kafka/config/certs"

docker-compose.yaml

services:

  kafka-server:
    image: bitnami/kafka:3.9.0
    restart: always
    ports:
      - "9095:9095"
    environment:
      # KRaft 基础配置
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@10.3.0.1:9093
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER

      # SSL 核心参数
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SSL
      - KAFKA_CFG_SSL_PROTOCOL=TLSv1.2
      - KAFKA_CFG_SSL_CLIENT_AUTH=none  # 生产环境建议改为 required
      - KAFKA_CFG_SSL_KEYSTORE_LOCATION=/bitnami/kafka/config/certs/kafka.keystore.jks
      - KAFKA_CFG_SSL_KEYSTORE_PASSWORD=yourpassword
      - KAFKA_CFG_SSL_KEY_PASSWORD=yourpassword

      # 监听器配置(关键!)
      - KAFKA_CFG_LISTENERS=INTERNAL://:9092,CONTROLLER://:9093,SSL://:9095  # 移除9094,将INTERNAL绑定到9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=SSL://kafka-broker:9095,INTERNAL://10.3.0.1:9092  # 内部服务地址改为9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,SSL:SASL_SSL,INTERNAL:PLAINTEXT  # 移除PLAINTEXT相关映射

      # SASL 认证(与 SSL 叠加)
      - KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN
      - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
      - KAFKA_CLIENT_USERS=user1
      - KAFKA_CLIENT_PASSWORDS=password1

    volumes:
      - ./kafka/kafka-certs:/bitnami/kafka/config/certs

kafka-map目前是连不上加了密的kafka的

安全模式下的生产者

package main

import (
  "crypto/tls"
  "crypto/x509"
  "fmt"
  "github.com/IBM/sarama"
  "log"
  "os"
)

func main() {
  // Kafka 配置
  config := sarama.NewConfig()

  // SASL 认证配置
  config.Net.SASL.Enable = true
  config.Net.SASL.User = "user1"
  config.Net.SASL.Password = "password1"
  config.Net.SASL.Mechanism = sarama.SASLTypePlaintext

  // TLS 配置
  config.Net.TLS.Enable = true
  tlsConfig, err := newTLSConfig()
  if err != nil {
    log.Fatalf("Failed to create TLS config: %v", err)
  }
  config.Net.TLS.Config = tlsConfig

  // 生产者配置
  config.Producer.Return.Successes = true
  config.Producer.Return.Errors = true
  config.Producer.RequiredAcks = sarama.WaitForAll

  // 创建生产者
  producer, err := sarama.NewSyncProducer([]string{"kafka-broker:9095"}, config)
  if err != nil {
    log.Fatalf("Failed to create producer: %v", err)
  }
  defer producer.Close()

  // 准备消息
  topic := "test-topic"
  message := &sarama.ProducerMessage{
    Topic: topic,
    Value: sarama.StringEncoder("Hello Kafka from Sarama!"),
  }

  // 发送消息
  partition, offset, err := producer.SendMessage(message)
  if err != nil {
    log.Fatalf("Failed to send message: %v", err)
  }

  fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset)
}

func newTLSConfig() (*tls.Config, error) {
  // 1. 加载 CA 证书(用于验证服务端)
  caCert, err := os.ReadFile("deploy-auth-ssl/kafka/kafka-certs/ca.crt")
  if err != nil {
    return nil, fmt.Errorf("failed to read CA cert: %v", err)
  }
  caCertPool := x509.NewCertPool()
  if !caCertPool.AppendCertsFromPEM(caCert) {
    return nil, fmt.Errorf("failed to parse CA cert")
  }

  // 2. 直接加载 PEM 格式的证书和私钥
  cert, err := tls.LoadX509KeyPair(
    "deploy-auth-ssl/kafka/kafka-certs/kafka.server.crt", // 从 PKCS12 导出的服务端证书
    "deploy-auth-ssl/kafka/kafka-certs/kafka.server.key", // 从 PKCS12 导出的私钥
  )
  if err != nil {
    return nil, fmt.Errorf("failed to load key pair: %v", err)
  }

  return &tls.Config{
    RootCAs:      caCertPool,
    Certificates: []tls.Certificate{cert},
  }, nil
}

安全模式下的消费者

kafka高可用配置

集群配置,部分机器挂了不影响整体可用

已简单模式的kafka为例了

version: '3.8'
services:
  kafka-1:
    image: bitnami/kafka:3.9.0
    container_name: kafka-1
    restart: unless-stopped
    environment:
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_KRAFT_CLUSTER_ID=abcdef1234567890  # 集群唯一ID
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_NODE_ID=1                       # 唯一节点ID
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka-1:9093,2@kafka-2:9093
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL

      # 核心修正:listeners和advertised.listeners使用相同的监听器名称
      - KAFKA_CFG_LISTENERS=INTERNAL://:9092,EXTERNAL://:9094,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-1:9092,EXTERNAL://10.3.0.1:9092

      # 确保协议映射包含所有监听器名称
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
    ports:
      - "9092:9094"  # 外部访问映射到EXTERNAL监听器的9094端口
      - "9093:9093"  # 控制器端口
    networks:
      - kafka-network

  kafka-2:
    image: bitnami/kafka:3.9.0
    container_name: kafka-2
    restart: unless-stopped
    environment:
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_KRAFT_CLUSTER_ID=abcdef1234567890  # 与节点1相同的集群ID
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_NODE_ID=2                       # 唯一节点ID
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka-1:9093,2@kafka-2:9093
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL

      # 核心修正:保持监听器名称一致
      - KAFKA_CFG_LISTENERS=INTERNAL://:9092,EXTERNAL://:9095,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-2:9092,EXTERNAL://10.3.0.1:9094

      # 协议映射包含所有监听器名称
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
    ports:
      - "9094:9095"  # 外部访问映射到EXTERNAL监听器的9095端口
      - "9095:9093"  # 控制器端口(避免冲突)
    networks:
      - kafka-network

networks:
  kafka-network:
    driver: bridge

kafka-map里面就这样写就好了