我已经通过Josh Long YouTube视频了解到了Spring集成,我认为它将非常适合我们的一个业务 case .
我们想要做的在纸面上很简单:数据复制.我们有一个定制的消息存储,它的数据被定期轮询,我们想要将有效负载(以json格式的实体)保存到我们的表中.
我们的第一个流轮询数据,按实体类型对数据进行分组,然后再按类型分派到不同的子流.
IntegrationFlows
.from(jdbcPollingChannelAdapter(queueName), p -> p.poller(m -> m.fixedDelay(250, TimeUnit.MILLISECONDS)))
.split()
.enrichHeaders(e -> e
.headerExpression("type", "payload.type")
.headerExpression("messageStoreId", "payload.id")
)
.aggregate(a -> a.outputProcessor(messageGroupProcessor))
.gateway(c -> c.route("headers['type']"), /*c -> c.replyChannel("gatewayReplyChannel")*/)
.log(Level.INFO, "After route...", "'-'")
.get();
对于我们的每个实体,我们都有一个流,它将向上插入将被批处理的数据,如果成功,它将更新消息库,以通知行已被"处理".
public static IntegrationFlow batch(EntityType entityType, JdbcOutboundGateway upsertOutboundGateway, DataSource dataSource) {
return IntegrationFlows
.from(entityType.name())
.enrichHeaders(e -> e.header("errorChannel", "batchInsertErrorChannel"))
.split()
.enrichHeaders(e -> e.headerExpression("messageStoreId", "payload.id", true))
.transform(CommandMessageStore::payload)
.transform(Transformers.fromJson(entityType.getClazz()))
.handle((GenericHandler<? extends MessageDTO>) (payload, headers) -> {
payload.setMessageStoreId(headers.get("messageStoreId", Long.class));
return payload;
})
.aggregate()
.handle(upsertOutboundGateway)
.enrichHeaders(e -> e.header("status", "PROCESSED"))
.handle((message, headers) -> headers.get("messageStoreIds", List.class))
.handle(updateMessageStoreOutboundGateway(dataSource))
.get();
}
public static JdbcOutboundGateway updateMessageStoreOutboundGateway(DataSource dataSource) {
var jdbc = new JdbcOutboundGateway(dataSource,
"update message_store set status = ?, error_message = ?, processed_date = now()::timestamp where id = ?");
jdbc.setRequestPreparedStatementSetter((ps, message) -> {
ps.setString(1, message.getHeaders().get("status", String.class));
ps.setString(2, message.getHeaders().get("errorMessage", String.class));
ps.setLong(3, (Long) message.getPayload());
});
return jdbc;
}
这在"快乐之路"中真的很管用.
问题是,有时,我们插入的数据并不是那么好,可能存在外键完整性问题.如果是这种情况,我们想要做的是逐行读取并插入它们.当然,出现外键错误的行将再次失败,但至少会插入其他行.对于成功插入的行,消息存储库将更新为"已处理"状态,对于其他行,将更新为"错误"状态.
因此,场景是在外键约束出现时转到错误通道,并执行单个插入流(同样,针对每个实体).它看起来是这样的:
@Bean
IntegrationFlow batchInsertErrorFlow() {
return IntegrationFlows
.from("batchInsertErrorChannel")
.enrichHeaders(h -> h
.headerExpression("type", "payload.failedMessage.headers['type']")
//.replyChannelExpression("payload.cause.failedMessage.headers['replyChannel']")
//--> error: Reply message received but the receiving thread has exited due to an exception while sending the request message
//.replyChannel("gatewayReplyChannel") --> error: StackOverflow
)
.transform("payload.cause.failedMessage.payload")
.gateway(c -> c.route("headers['type'] + '_SINGLE'"))
.get();
}
public static StandardIntegrationFlow single(EntityType entityType, JdbcOutboundGateway upsertOutboundGateway, DataSource dataSource) {
return IntegrationFlows
.from(entityType.name() + "_SINGLE")
.split()
.enrichHeaders(e -> e
.headerExpression("messageStoreIds", "T(java.util.List).of(payload.messageStoreId)")
)
.handle(upsertOutboundGateway, e -> e.advice(singleInsertExpressionAdvice)) // I have to use an advice here, enriching the headers do not work
.enrichHeaders(e -> e.header("status", "PROCESSED"))
.handle((message, headers) -> headers.get("messageStoreIds", List.class))
.handle(updateMessageStoreOutboundGateway(dataSource))
.get();
}
@Bean
public Advice singleInsertExpressionAdvice() {
var advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setFailureChannelName("singleInsertErrorChannel");
advice.setOnFailureExpressionString("headers['messageStoreIds']");
advice.setTrapException(true);
return advice;
}
@Bean
IntegrationFlow singleInsertErrorFlow(JdbcMessageHandler processFinishedHandler, DataSource dataSource) {
return IntegrationFlows
.from("singleInsertErrorChannel")
.log(Level.INFO, "singleInsertErrorChannel-1", "'-'")
.enrichHeaders(e -> e
.headerExpression("errorMessage", "payload.cause.message")
.header("status", "ERROR")
)
.transform("payload.evaluationResult")
.handle(processFinishedHandler) // does the same thing as updateMessageStoreOutboundGateway()
.get();
}
问题是,每次它进入错误通道时,我都不知道如何回答初始流的网关,所以它挂起了.我知道是因为"后根..."在这种情况下,永远不会显示日志(log).因此,数据被成功处理,但不再有轮询.
我try 了很多东西,有些在代码中有注释.
我遗漏了什么,以便我可以在错误流完成时回复网关?
UPDATE (SOLVED)个
阿特姆指出了我的流程中的不同缺陷.这就是我改变的:
- 在
batch
流中,我不再通过头设置错误通道,而是使用建议(就像在single
流中一样) - 在
batchInsertErrorFlow
分中,我加了.replyChannelExpression("payload.failedMessage.headers['replyChannel']")
分 -
singleInsertErrorFlow
人的情况也是如此 - 还是在
singleInsertErrorFlow
的时候,我不再使用JdbcMessageHandler
,而是使用JdbcOutboundGateway
,就像在所有其他流程中一样.
在所有这些之后,当upsert中出现错误时,我能够在我的第一个流中看到我的日志(log)"After route",当然,还可以继续轮询数据.