我可以使用融合CLI连接到融合Kafka集群,但无法使用Segentio的Kafka-Go库进行连接. 我得到了下面的错误.

with SASL: SASL handshake failed: EOF

以下是我在围棋中的功能

package consumer

import (
    "context"
    "fmt"
    "log"
    "os"
    "time"

    "github.com/segmentio/kafka-go"
    "github.com/segmentio/kafka-go/sasl/plain"
)
func Consume(ctx context.Context) {
    // create a new logger that outputs to stdout
    // and has the `kafka reader` prefix
    l := log.New(os.Stdout, "kafka reader: ", 0)
    mechanism := plain.Mechanism{
        Username: "my-api-key",
        Password: "my-api-secret",
    }

    dialer := &kafka.Dialer{
        Timeout:       10 * time.Second,
        DualStack:     true,
        SASLMechanism: mechanism,
    }

    r := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{brokerAddress}, // brokerAddress given in confluent cloud cluster settings. 
        Topic:   []string{"steps"}[0],
        // assign the logger to the reader
        Logger: l,
        Dialer: dialer,
    })
    for {
        // the `ReadMessage` method blocks until we receive the next event
        msg, err := r.ReadMessage(ctx)
        if err != nil {
            panic("could not read message " + err.Error())
        }
        // after receiving the message, log its value
        fmt.Println("received: ", string(msg.Value))
    }
}

我try 生成新的密钥,使用我的帐户用户名和密码,减少分区,但都不起作用.

推荐答案

您的服务器的TLS版本似乎不被接受,您可以使用MinVersion强制Go-Kafka接受:

dialer := &kafka.Dialer{
        Timeout:       10 * time.Second,
        DualStack:     true,
        SASLMechanism: mechanism,
        TLS: &tls.Config{
            MinVersion: tls.VersionTLS12,
        },
    }

Go相关问答推荐

更改位置级别和时间戳零点Golang

在nixos上找不到XInput2.h头文件的包

如何在gofiber/websocket/v2中设置状态代码和原因

如何修复proxyconnect tcp:tls:第一条记录看起来不像tls握手

如何在 zap 记录器库中使用自定义时间函数

使用 httptest 对 http 请求进行单元测试重试

如何将已知类型转换为指向switch 中类型参数的指针?

io.Reader 无限循环与 fmt.Fscan

我无法使用反向代理更改主机标头

将文本文件放入切片然后进行比较

GoLang: gocui 边框 colored颜色

使用 unsafe.Pointer 将 struct point直接转换为另一个 struct 是否安全?

Gorm 预加载给出了模糊的列错误

如何使用带有方法的字符串枚举作为通用参数?

如何使用带有方法的字符串枚举作为通用参数?

为什么 Go 被认为是部分抢占式的?

Go 加密库创建的 PKCS1 公钥与openssl rsa ...之间的区别

关于GO的几个问题

Go 泛型是否与 LINQ to Objects 等效?

Golang 查询扫描未将查询正确扫描到 struct 中