我正在使用这个当地的环境:

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.5.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'

  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://broker:9092
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
      SCHEMA_REGISTRY_DEBUG: 'true'

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:7.5.1
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:9092
      KSQL_KSQL_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:7.5.1
    container_name: ksqldb-cli
    depends_on:
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

我创建了以下架构:

{
  "$id": "http://schema-registry:8081/schemas/ids/1",
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "Location",
  "type": "object",
  "properties": {
    "profileId": {
        "type":"string"
    }
  }
}

我使用这个文件schema-key-registry.json来发布它

{
  "schemaType":"JSON",
  "schema":"{\"$id\":\"http://schema-registry:8081/schemas/ids/1\",\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"title\":\"Location\",\"type\":\"object\",\"properties\":{\"profileId\":{\"type\":\"string\"}}}"
}
{
  "$id": "http://schema-registry:8081/schemas/ids/1",
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "Location",
  "type": "object",
  "properties": {
    "profileId": {
      "type": "string",
      "description": "The id of the location."
    },
    "latitude": {
      "type": "number",
      "minimum": -90,
      "maximum": 90,
      "description": "The location's latitude."
    },
    "longitude": {
      "type": "number",
      "minimum": -180,
      "maximum": 180,
      "description": "The location's longitude."
    }
  }
}

我使用这个文件schema-value-registry.json来发布它

{
  "schemaType":"JSON",
  "schema":"{\"$id\":\"http://schema-registry:8081/schemas/ids/1\",\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"title\":\"Location\",\"type\":\"object\",\"properties\":{\"profileId\":{\"type\":\"string\",\"description\":\"The id of the location.\"},\"latitude\":{\"type\":\"number\",\"minimum\":-90,\"maximum\":90,\"description\":\"The location's latitude.\"},\"longitude\":{\"type\":\"number\",\"minimum\":-180,\"maximum\":180,\"description\":\"The location's longitude.\"}}}"
}

并对它们进行注册

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data "$(cat schema-key-registry.json)" \
  http://localhost:8081/subjects/locations-key/versions
{"id":1}

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data "$(cat schema-value-registry.json)" \
  http://localhost:8081/subjects/locations/versions
{"id":2}

当消息违反架构时,生成消息会起作用,但会失败.

kafka-json-schema-console-producer \
--bootstrap-server broker:9092 \
--property schema.registry.url=http://schema-registry:8081 \
--property value.schema.id=2 \
--property key.schema.id=1 \
--property key.separator='|' \
--property parse.key=true \
--topic locations
{"profileId":"asdfghjkl"}|{"profileId":"asdfghjkl","latitude":90.000,"longitude":-180.000}
{"profileId":"asdfghjkl"}|{"profileId":"asdfghjkl","latitude":90.000,"longitude":-179}

消费也很管用

kafka-json-schema-console-consumer \
--bootstrap-server broker:9092  \
--from-beginning \
--property schema.registry.url=http://localhost:8081 \
--property print.key=true \
--property key.separator='|' \
--topic locations 
{"profileId":"asdfghjkl"}|{"profileId":"asdfghjkl","latitude":90.000,"longitude":-180.000}
{"profileId":"asdfghjkl"}|{"profileId":"asdfghjkl","latitude":90.000,"longitude":-179}

使用ksqldb,我创建了一个表:

ksql> CREATE TABLE loc WITH (
>KAFKA_TOPIC = 'locations',
>KEY_FORMAT = 'JSON_SR',
>KEY_SCHEMA_ID = 1,
>VALUE_FORMAT = 'JSON_SR',
>VALUE_SCHEMA_ID = 2
>);

它看起来像这样:

ksql> describe loc;

Name                 : LOC
 Field     | Type
-------------------------------------------------------------
 ROWKEY    | STRUCT<profileId VARCHAR(STRING)> (primary key)
 profileId | VARCHAR(STRING)
 latitude  | DOUBLE
 longitude | DOUBLE
-------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

当我try 定义推送查询时,我收到以下错误:

ksql> SELECT * FROM loc EMIT CHANGES;
+-------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+
|ROWKEY                                                                                                 |profileId                                                                                              |latitude                                                                                               |longitude                                                                                              |
+-------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, topic=locations, partition=0, offset=3, stacktrace=io.confluent.ksql.serde.KsqlSerializationException: Error serializing message to topic: _confluent-ksql-default_transient_transient_LOC_4102565114369480088_1698924612467-KsqlTopic-Reduce-changelog. Mismatching schema.
Hint: You probably forgot to add VALUE_SCHEMA_ID when creating the source.
        at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:56)
        at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:37)
        at io.confluent.ksql.serde.unwrapped.UnwrappedSerializer.serialize(UnwrappedSerializer.java:56)
        at io.confluent.ksql.serde.unwrapped.UnwrappedSerializer.serialize(UnwrappedSerializer.java:31)
        at io.confluent.ksql.serde.GenericSerializer.serialize(GenericSerializer.java:62)
        at io.confluent.ksql.logging.processing.LoggingSerializer.serialize(LoggingSerializer.java:47)
        at org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:174)
        at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.keyBytes(MeteredKeyValueStore.java:431)
        at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$6(MeteredKeyValueStore.java:330)
        at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:877)
        at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:330)
        at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:131)
        at org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.put(KeyValueStoreWrapper.java:86)
        at org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:124)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
        at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:152)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
        at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:792)
        at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:877)
        at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:792)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:723)
        at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
        at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1747)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
Caused by: org.apache.kafka.connect.errors.DataException: Mismatching schema.
        at io.confluent.connect.json.JsonSchemaData.fromConnectData(JsonSchemaData.java:524)
        at io.confluent.connect.json.JsonSchemaConverter.fromConnectData(JsonSchemaConverter.java:94)
        at io.confluent.connect.json.JsonSchemaConverter.fromConnectData(JsonSchemaConverter.java:85)
        at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:53)
        ... 33 more

我做错了什么?

推荐答案

您的id=2的模式在主题名称中缺少-value,因此您可能有一个不同的id=3的模式,因此出现错误"Mismated schema"

注意:kafka-json-schema-console-producer将自动为您注册架构

此外,如果键真的是"一个字符串字段",那么您实际上不需要它的模式/对象--只需使用该值本身.这也有助于KSQL/Kafka Streams关键字比较的怪异(即JSON可以包括空格,{"id":"foo"}将不在同一个分区中,与{ "id" : "foo" }相比;只使用"foo"会好得多).例如,您可以在控制台生成器中传递--key-serializer来使用StringSerializer.

Json相关问答推荐

从JSON格式提取数据时分隔通用名称

如何在对象投影(*)上应用滤镜投影([?port==`eth1`])?

将数组中的值作为键连接到另一个数组中的值(Jolt)

如何让JSON子查询在没有行的情况下返回空数组而不是NULL

JQ:获取该值的较短语法是什么

从包含 JSON 对象序列的文件中获取第一个 JSON 对象

如何将复杂的 JSON 反序列化为 Rust 类型?

在 CodePipeline 中调用 lambda 时传递用户参数

如果值不存在,则将值插入 JSON 数组

Scala - 在构建 Json 时无法删除 Key -> value "{}" 大括号的双引号

N1QL 聚合查询 Couchbase

Swift - 将图像从 URL 写入本地文件

如何使用 C# 将 JSON 文本转换为对象

从 HttpResponse 获取 json

IE中Json响应下载(7~10)

如何将 LinkedTreeMap 转换为 gson JsonObject

在 Qt 4.7 中解析 JSON 的最简单方法

通过 JSON 发送 HTML 代码

如何在spark 上将json字符串转换为数据帧

如何在本地存储中存储对象数组?