我正在使用Kafka模式注册表验证模式.问题是,即使我输入了正确的模式,我仍然得到错误Broker: Broker failed to verify record.

验证被设置为真,这样就可以在当前代理级别判断Value的模式.

我设置的模式和我发送的数据如下.

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "additionalProperties": false,
  "description": "Sample schema to help you get started.",
  "properties": {
    "test": {
      "type": "string"
    }
  },
  "title": "schema_test",
  "type": "object"
}
{"test": "test1"}

此外,我使用GO发送数据,数据代码如下所示.

// main

func main() {
    kafka.ProduceData("schema_test", []byte(`{"test": "test1"}`))
}
// kafka
func ProduceData(topic string, data []byte) {

    conf := ReadConfig()

    p, err := kafka.NewProducer(&conf)

    if err != nil {
        fmt.Printf("Failed to create producer: %s", err)
        os.Exit(1)
    }

    defer p.Close()

    // Go-routine to handle message delivery reports and
    // possibly other event types (errors, stats, etc)
    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition)
                } else {
                    fmt.Printf("Produced event to topic %s: key = %-10s value = %s\n",
                        *ev.TopicPartition.Topic, string(ev.Key), string(ev.Value))
                }
            }
        }
    }()

    p.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
        Value:          data,
    }, nil)

    // Wait for all messages to be delivered
    p.Flush(15 * 1000)
}

推荐答案

对于经纪人实际上是如何核实数据的,似乎存在误解.它的工作情况与预期一致.你需要Schema ID分.您只是在没有ID的主题上发送普通JSON.注册表上的模式实际上并不重要,只有它的ID才重要.

来自文档

使经纪人能够验证产生到卡夫卡主题is using a valid schema ID in Schema Registry的数据

更具体地说,您添加到注册表中的模式只是主题上可能存在的许多"版本"中的一个.每个版本都有一个唯一的ID.验证不只使用最新版本;该ID是客户端编码的.

请参阅使用JSON模式生成的融合示例(JSON模式本身应该进行记录验证).

https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/json_producer_example/json_producer_example.go

代理端验证只是为了防止不正确的序列化数据或"毒丸",就像您现在实际上正在做的那样.

Go相关问答推荐

向API网关终结点发出POST请求时出现AWS Lambda With Go:";Rune me.InvalidEntrypoint";错误

禁用Golang中的终端

为什么没有正确生成这些元组?

什么东西逃到了堆里?

如何在Golang中覆盖404

Redis:尽管数据存在,但 rdb.Pipelined 中出现redis:nil错误

Go 1.20 中如何计算连接错误?

如何在 Chi Router 的受保护路由下提供静态文件(尤其是图像)?

这是go语言gin提供的关于TypeEngine和RouterGroup的问题

如何根据中间件的请求设置上下文值?获取 go-staticcheck 问题

如何将 goose 迁移与 pgx 一起使用?

如何使用gosdk在Dynamodb中进行UpdateItem时,将ValueBuilder对象声明为StringSet类型?

GoLang: gocui 边框 colored颜色

如何使用 fyne Go 使用 canvas.NewText() 使文本可滚动

数据流中的无根单元错误,从 Golang 中的 PubSub 到 Bigquery

如何在Go中替换符号并使下一个字母大写

关系不存在 GORM

GRPC 反向代理混淆 GRPC 和 GRPC-Web

Golang 泛型同时具有接口和实现

手动下载并放置一个 golang mod 文件