我有一个应用程序,它使用KStream从Kafka读取数据,根据标题过滤数据,并写入KTable.

public Topology buildTopology() {
        KStream<String,String> inputStream = builder.stream("topicname");
        KStream<String,String> filteredStream = inputStream.transformValues(KSExtension::new)
                .filter((key,value) -> value!=null);
        
        kTable = filteredStream.groupByKey()
                .reduce(((value1, value2) -> value2), Materialized.as("ktable"));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        return builder.build();
    }

我正在try 使用TopologyTestDriver为此创建一个单元测试

private TopologyTestDriver td;
    private TestInputTopic<String, String> inputTopic;
    private TestOutputTopic<String, String> outputTopic;
    private Topology topology;
    private Properties streamConfig;

@BeforeEach
    void setUp() {
        streamConfig = new Properties();
        streamConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "AppId");
        streamConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "foo:1234");
        streamConfig.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamConfig.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        topology = new Topology();
        td = new TopologyTestDriver(topology, streamConfig);
        inputTopic = td.createInputTopic("input-topic", Serdes.String().serializer(), Serdes.String().serializer());
        outputTopic = td.createOutputTopic("output-topic", Serdes.String().deserializer(), Serdes.String().deserializer());
    }
 @Test
    void buildTopology(){
        inputTopic.pipeInput("key1", "value1");
        topology = app.buildTopology();
    }

当我运行测试时,我得到了异常"java.lang.IllegalArgumentException:Un…Theme:Input-Theme"

DEBUG org.apache.kafka.streams.processor.internals.InternalTopologyBuilder - No source topics using pattern subscription found, initializing consumer's subscription collection.

java.lang.IllegalArgumentException: Unknown topic: input-topic
    at org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:582)
    at org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:945)
    at org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:115)
    at org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:137)
    at testclassname.buildTopology()

有人能帮我理解一下我在这里错过了什么吗?

推荐答案

我看到您正在创建一个empty Topology用于初始化TopologyTestDriver:

topology = new Topology();
td = new TopologyTestDriver(topology, streamConfig);

当这个空拓Flutter 用于用td = new TopologyTestDriver(topology, streamConfig);实例化TopologyTestDriver时,测试驱动程序不知道任何主题,因为没有有效地构建拓Flutter .

我想这就是为什么,当您try 使用inputTopic.pipeInput("key1", "value1");将输入通过管道传递到"input-topic"时,测试驱动程序抛出IllegalArgumentException,抱怨"Unknown topic: input-topic".


您应该调用您的buildTopology()方法来生成您正在测试的实际拓Flutter ,并在创建TopologyTestDriver时使用它.

确保测试(input-topic,output-topic)中的主题名称与实际应用程序("topicname")中的名称匹配.

@BeforeEach
void setUp() {
    streamConfig = new Properties();
    streamConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "AppId");
    streamConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "foo:1234");
    streamConfig.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamConfig.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

    // Create the topology using your actual code
    topology = app.buildTopology();

    // Now create a TopologyTestDriver using the real topology
    td = new TopologyTestDriver(topology, streamConfig);

    // The topic name here should match the actual topic you use in the real topology
    inputTopic = td.createInputTopic("topicname", Serdes.String().serializer(), Serdes.String().serializer());

    // Create output topic if you need it
    // outputTopic = td.createOutputTopic("output-topic", Serdes.String().deserializer(), Serdes.String().deserializer());
}

@Test
void buildTopology(){
    inputTopic.pipeInput("key1", "value1");
    // Your assertions here
}

注意:我从设置中删除了输出主题,因为在您的代码片段中没有指定写入KTable的输出主题.如果您的实际应用程序写入输出主题,则可以将其添加回来.


I have updated the code where I add the name of the ktable store.
How do I test the value was added to the ktable?

You can query the state store that backs the KTable to check its contents.
In Kafka Streams, each KTable is backed by a state store (even a versioned one very recently, Aug. 2023), and you can directly interact with this store in tests.

确保您在拓Flutter 中为您的KTable设置了存储名称:

kTable = filteredStream.groupByKey()
        .reduce(((value1, value2) -> value2), Materialized.as("myKTableStore"));

这里,"myKTableStore"是支持KTable的状态存储的名称.

在您的测试中,您可以从TopologyTestDriver中检索存储并判断特定密钥的值:

@Test
void buildTopology() {
    inputTopic.pipeInput("key1", "value1");

    // Retrieve the state store
    ReadOnlyKeyValueStore<String, String> keyValueStore = 
        td.getKeyValueStore("myKTableStore");

    // Assert that the KTable contains the expected value for the key
    assertEquals("value1", keyValueStore.get("key1"));
}

这样,您就可以验证您的KTable是否包含预期的键-值对.

Note that ReadOnlyKeyValueStore is a part of the Kafka Streams API. Import it as needed.
You can see it used in "Kafka Streams Interactive Queries / Querying local key-value stores"

https://docs.confluent.io/platform/current/_images/streams-interactive-queries-api-01.png


如何将标题输入到测试中的输入主题?我找不到其他 Select .我在这里过滤标题值inputStream.transformValues(KSExtension::new)

In Kafka Streams' TopologyTestDriver, the ability to directly add headers to the TestInputTopic is somewhat limited.
However, you can use the lower-level pipeInput() method that allows you to pass a ConsumerRecord object, which can have headers.

您将需要手动构建ConsumerRecord,然后使用它:

@Test
void buildTopology() {
    // Create a Headers object and add your custom headers
    Headers headers = new RecordHeaders();
    headers.add(new RecordHeader("myHeaderKey", "myHeaderValue".getBytes()));

    // Create a ConsumerRecord with headers
    ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
            "topicname", // topic
            0, // partition
            0, // offset
            "key1".getBytes(), // key
            "value1".getBytes(), // value
            headers // headers
    );

    // Pipe the record into TopologyTestDriver
    td.pipeInput(record);

    // The rest of your test
}

确保将"topicname"替换为您在拓Flutter 中实际阅读的主题的名称,并根据测试需要调整键、值和标题.

这应该允许您在测试记录中包括标头,然后应该由您的transformValues操作按预期进行处理.

Java相关问答推荐

Spring安全实现多个SQL表身份验证

Java 22模式匹配不适用于记录模式匹配.给出汇编问题

Java WireMock定义存根在Cucumber并行执行的多线程测试中失败

在spring—data中自动发现native—sql查询期间遇到重复的SQL别名[id]

替换com. sun. jndi. dns. DnsContextFactory Wildfly23 JDK 17

@org.springframework.beans.factory.annotation.Autowired(required=true)-注入点有以下注释:-SpringBoot

Java FX中的河内之塔游戏-在游戏完全解决之前什么都不会显示

Spring Boot Maven包

为什么S的文档中说常量方法句柄不能在类的常量池中表示?

Java ArrayList的整数和数组的泛型

MimeMessage emlMessage=new MimeMessage(Session,emlInputStream);抛出InvocationTargetException

在Oracle JDBC连接中,连接失效和身份验证失效是什么意思?

通过Java列表中的某些字段搜索值

JavaFX:无论何时显示应用程序,如何更改组件/ node 位置?

当我在Java中有一个Synchronized块来递增int时,必须声明一个变量Volatile吗?

为什么Spring要更改Java版本配置以及如何正确设置?

由于版本不匹配,从Java 8迁移到Java 17和Spring 6 JUnit4失败

try 添加;按流派搜索;在Web应用程序上,但没有;I don’我不知道;It’这个代码错了

无泄漏函数的Java DRY

使用DynamoDB增强客户端时未更新属性