我正在try 通过使用Apache Camel的处理器聚合来自2个终端的数据,为此,我首先阅读了相关的Camel文档:

https://camel.apache.org/components/3.20.x/eips/aggregate-eip.html

并定义一个简单的聚合策略来简单地连接两个字段(在合并成功后,我将把它转换成我的数据模型).然而,几乎所有的示例都使用相同的数据源,并在拆分后聚合数据,等等.但我需要从2个端点获取数据,然后聚合这些数据.为简单起见,我try 从这2个数据源聚合2个字符串字段.之后,我会将汇总的数据传递给一个卡夫卡主题.

几乎所有的例子中,只有一种方法读取数据,例如from(direct: endpoint).但是,我们是否需要2来从端点或处理器读取多个数据呢?

我也try 使用enrich方法,如下所示,但我无法使用此方法聚合这2个数据:

from("timer:myTimer?period=300000") // trigger every 5 minutes
    .multicast(GroupedExchangeAggregationStrategy()).parallelProcessing()
    .enrich("direct:customerProcessor")
    .enrich("direct:orderProcessor")
    .to(kafka("customerOrderTopic"))
    .end()

那么,我如何使用Aggregate来实现这一点(或者,如果您有一个使用Enrich的 idea )?

这里是另一个例子,但就我所见,它也首先读取数据,然后仅对该数据应用聚合策略.然后将其传递到第二阶段.但我希望从两个来源读取数据,然后将其传递给聚合策略.我是不是错过了一些要点?

https://www.masterspringboot.com/camel/how-to-aggregate-messages-in-camel/

推荐答案

聚合EIP是在需要聚合来自同一使用者终结点的消息时使用的.

在您的例子中,因为您需要聚合来自2个不同来源的消息,所以我宁愿像您一样使用2个Enrich EIP,但使用第二个with an aggregation strategy来指定应该如何聚合消息.

类似于:

from("timer:myTimer?period=1000") 
        .enrich("direct:customerProcessor")
        .enrich("direct:orderProcessor", (oldExchange, newExchange) -> {
                String oldBody = oldExchange.getIn().getBody(String.class);
                String newBody = newExchange.getIn().getBody(String.class);
                oldExchange.getIn().setBody(oldBody + "-" + newBody);
                return oldExchange;
        })
        .to("stream:out");
from("direct:customerProcessor")
        .setBody().constant("My customerProcessor body");
from("direct:orderProcessor")
        .setBody().constant("My orderProcessor body");

结果:

My customerProcessor body-My orderProcessor body

Java相关问答推荐

如何用Java表示C++类以通过FFI使用?

从头开始使用Jgit初始化InMemoryRepository

Java 22模式匹配不适用于记录模式匹配.给出汇编问题

是否需要关闭Executors返回的执行器.newVirtualThreadPerTaskExecutor()?

找到允许的最大底片

Java Stream,需要更新列表对象列表

在for—each循环中的AnimationTimer中的if语句'

路径映射未发生

错误:在Liferay7.4中找不到符号导入com.liferay.portal.kernel.uuid.PortalUUID;";

Hibernate 6支持Joda DateTime吗?

我可以在MacOS上使用什么Java函数来在适当的设备上以适当的音量播放适当的alert 声音?

我如何解释这个错误?必需类型:供应商R,提供:收集器对象,捕获?,java.util.List java.lang.Object>>

SpringBoot Kafka自动配置-适用于SASL_PLAYTEXT的SSLBundle 包,带SCRAM-SHA-512

从ActiveMQ Classic迁移到ActiveMQ Artemis需要进行哪些客户端更改?

Java中将文本拆分为数字或十进制数字和字符串

S,要对Java复制构造函数深度克隆所有属性进行单元测试,最可靠的方法是什么?

本机方法(JNI)总是编译的吗?

根据应用程序 Select 的语言检索数据

HBox内部的左对齐按钮(如果重要的话,在页码内)

为什么Instant没有从UTC转换为PostgreSQL的时区?