我正在使用Kafka模式注册表验证模式.问题是,即使我输入了正确的模式,我仍然得到错误Broker: Broker failed to verify record.
验证被设置为真,这样就可以在当前代理级别判断Value的模式.
我设置的模式和我发送的数据如下.
{
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": false,
"description": "Sample schema to help you get started.",
"properties": {
"test": {
"type": "string"
}
},
"title": "schema_test",
"type": "object"
}
{"test": "test1"}
此外,我使用GO发送数据,数据代码如下所示.
// main
func main() {
kafka.ProduceData("schema_test", []byte(`{"test": "test1"}`))
}
// kafka
func ProduceData(topic string, data []byte) {
conf := ReadConfig()
p, err := kafka.NewProducer(&conf)
if err != nil {
fmt.Printf("Failed to create producer: %s", err)
os.Exit(1)
}
defer p.Close()
// Go-routine to handle message delivery reports and
// possibly other event types (errors, stats, etc)
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Produced event to topic %s: key = %-10s value = %s\n",
*ev.TopicPartition.Topic, string(ev.Key), string(ev.Value))
}
}
}
}()
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: data,
}, nil)
// Wait for all messages to be delivered
p.Flush(15 * 1000)
}