我有一个卡夫卡制片人,暂时失go 了与经纪人的联系.每个主题定义了10个分区.当连接断开时,我在运行Producer的进程中看到以下日志(log):

%3|1690891270.524|FAIL|rdkafka#producer-1| [thrd:my_ip:9092/bootstrap]: my_ip:9092/bootstrap: Connect to ipv4#my_ip:9092 failed: Connection refused (after 0ms in state CONNECT, 10 identical error(s) suppressed)
%3|1690891273.524|FAIL|rdkafka#producer-1| [thrd:my_ip:9093/bootstrap]: my_ip:9093/bootstrap: Connect to ipv4#my_ip:9093 failed: Connection refused (after 0ms in state CONNECT, 7 identical error(s) suppressed)
%3|1690891273.584|FAIL|rdkafka#producer-1| [thrd:hostname:9094/1001]: hostname:9094/1001: Connect to ipv4#my_ip:9094 failed: Connection refused (after 0ms in state CONNECT, 4 identical error(s) suppressed)
%4|1690891277.639|CLUSTERID|rdkafka#producer-1| [thrd:main]: Broker my_ip:9092/bootstrap reports different ClusterId "TkRgxModQH-mlCkT9mr3lQ" than previously known "o4j9GSQrQ5Svuwvq4uiWQQ": a client must not be simultaneously connected to multiple clusters
%5|1690891278.529|PARTCNT|rdkafka#producer-1| [thrd:main]: Topic my_topic partition count changed from 10 to 1

我的生产者配置是最基本的配置:

    config := kafka.ConfigMap{
        "security.protocol": "plaintext",
        "bootstrap.servers": my_ip,
    }

    producer, err := kafka.NewProducer(&config)

我的 docker 与卡夫卡作曲:

zoo1:
    image: "${IMAGE_ZOOKEEPER}"
    hostname: zoo1
    container_name: zoo1
    ports:
     - "127.0.0.1:2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888
    networks:
      - internal_network

  zoo2:
    image: "${IMAGE_ZOOKEEPER}"
    hostname: zoo2
    container_name: zoo2
    ports:
     - "127.0.0.1:2182:2182"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2182
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888
    networks:
      - internal_network

  zoo3:
    image: "${IMAGE_ZOOKEEPER}"
    hostname: zoo3
    container_name: zoo3
    ports:
     - "127.0.0.1:2183:2183"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2183
      ZOOKEEPER_SERVER_ID: 3
      ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888
    networks:
      - internal_network

  kafka1:
    image: "${IMAGE_KAFKA}"
    hostname: kafka1
    container_name: kafka1
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:29092,EXTERNAL://niro:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
    depends_on:
      - zoo1
      - zoo2
      - zoo3
    networks:
      - internal_network

  kafka2:
    image: "${IMAGE_KAFKA}"
    hostname: kafka2
    container_name: kafka2
    ports:
      - "9093:9093"
      - "29093:29093"
    environment:
      KAFKA_LISTENERS: INTERNAL://0.0.0.0:29093,EXTERNAL://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:29093,EXTERNAL://niro:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
    depends_on:
      - zoo1
      - zoo2
      - zoo3
    networks:
      - internal_network

  kafka3:
    image: "${IMAGE_KAFKA}"
    hostname: kafka3
    container_name: kafka3
    ports:
      - "9094:9094"
      - "29094:29094"
    environment:
      KAFKA_LISTENERS: INTERNAL://0.0.0.0:29094,EXTERNAL://0.0.0.0:9094
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:29094,EXTERNAL://niro:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
    depends_on:
      - zoo1
      - zoo2
      - zoo3
    networks:
      - internal_network

  kafka-topic-create:
    image: "${IMAGE_KAFKA}"
    hostname: kafka-topic-create
    container_name: kafka-topic-create
    depends_on:
      - kafka1
      - kafka2
      - kafka3
    entrypoint: [ '/bin/sh', '-c' ]
    command: |
      "
      # blocks until kafka is reachable
      kafka-topics --bootstrap-server kafka1:29092 --list

      echo -e 'Creating kafka topics'
      kafka-topics --bootstrap-server kafka1:29092 --create --if-not-exists --topic my_topic --replication-factor 1 --partitions 10

      echo -e 'Successfully created the following topics for Kafka1:'
      kafka-topics --bootstrap-server kafka1:29092 --list

      echo -e 'Kafka2:'
      kafka-topics --bootstrap-server kafka2:29093 --list
      echo -e 'Kafka3:'

      kafka-topics --bootstrap-server kafka3:29094 --list


      "

    environment:
      KAFKA_BROKER_ID: ignored
      KAFKA_ZOOKEEPER_CONNECT: ignored
    networks:
      - internal_network

其中的图像是confluentinc/cp-kafka:7.3.2,confluentinc/cp-zookeeper:7.3.2

为什么卡夫卡要自己改变分区?我如何防止这种情况发生? 谢谢

推荐答案

首先,在每个代理上设置env-var

KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'

这应该可以解决你的my_topic partition count changed from 10 to 1美元.

它变为1的原因是,您的代理显然正在重新启动,这是自动创建的主题的默认设置(您之前的主题已被删除)

如果你想知道为什么这个话题被删除了,或者为什么Kafka没有连接/重启,你需要调试经纪人日志(log),而不是你的Golang应用程序.

你在这里也不需要三个经纪人.特别是考虑到你的主题有一个副本...一个代理还可以托管具有10个分区的主题.


例如,也许您的代理失败是因为您的机器内存不足.我认为,Docker for Windows/Mac默认只有2G的内存,但每个代理开始时至少需要1G的堆空间,然后还有ZooKeeper

Go相关问答推荐

如何从google.golang.org/grpc/stats包中将golang中不同事件的输出进行组合,以获取func HandlePRC

编辑时保留YAML文件中的单引号

将类型定义为泛型类型实例化

转到http服务器头内容-类型设置为多部分/表单-数据,但在客户端获取内容-类型:文本/纯文本

如何修复proxyconnect tcp:tls:第一条记录看起来不像tls握手

Golang:访问any类型泛型上的字段

Caddy服务器try 打开端口80而不是8090.

下载和合并时输出文件已损坏

判断不同 go map 类型中的重复键

如何过滤来自 fsnotify 的重复系统消息

不能在 *gorm.db 而不是 gorm.db 上使用 WithContext(ctx) 方法

使用 `didip/tollbooth` 限制每小时最大请求数

Go 并发、goroutine 同步和关闭通道

gorm 获取列名

如何从 Go 1.18 中的单个方法返回两种不同的具体类型?

K8s 算子读取原始数据

如何在golang中使用ozzo验证进行时间最大验证

如何在 Gorm 中获得特定日期的最大值?

gopls 为 github.com/Shopify/sarama 返回错误gopls: no packages returned: packages.Load error

Golang LinkedList 删除第一个元素