我已经通过Josh Long YouTube视频了解到了Spring集成,我认为它将非常适合我们的一个业务 case .

我们想要做的在纸面上很简单:数据复制.我们有一个定制的消息存储,它的数据被定期轮询,我们想要将有效负载(以json格式的实体)保存到我们的表中.

我们的第一个流轮询数据,按实体类型对数据进行分组,然后再按类型分派到不同的子流.

IntegrationFlows
    .from(jdbcPollingChannelAdapter(queueName), p -> p.poller(m -> m.fixedDelay(250, TimeUnit.MILLISECONDS)))
    .split()
    .enrichHeaders(e -> e
        .headerExpression("type", "payload.type")
        .headerExpression("messageStoreId", "payload.id")
    )
    .aggregate(a -> a.outputProcessor(messageGroupProcessor))
    .gateway(c -> c.route("headers['type']"), /*c -> c.replyChannel("gatewayReplyChannel")*/)
    .log(Level.INFO, "After route...", "'-'")
    .get();

对于我们的每个实体,我们都有一个流,它将向上插入将被批处理的数据,如果成功,它将更新消息库,以通知行已被"处理".

public static IntegrationFlow batch(EntityType entityType, JdbcOutboundGateway upsertOutboundGateway, DataSource dataSource) {
    return IntegrationFlows
        .from(entityType.name())
        .enrichHeaders(e -> e.header("errorChannel", "batchInsertErrorChannel"))
        .split()
        .enrichHeaders(e -> e.headerExpression("messageStoreId", "payload.id", true))
        .transform(CommandMessageStore::payload)
        .transform(Transformers.fromJson(entityType.getClazz()))
        .handle((GenericHandler<? extends MessageDTO>) (payload, headers) -> {
            payload.setMessageStoreId(headers.get("messageStoreId", Long.class));
            return payload;
        })
        .aggregate()
        .handle(upsertOutboundGateway)
        .enrichHeaders(e -> e.header("status", "PROCESSED"))
        .handle((message, headers) -> headers.get("messageStoreIds", List.class))
        .handle(updateMessageStoreOutboundGateway(dataSource))
        .get();
}

public static JdbcOutboundGateway updateMessageStoreOutboundGateway(DataSource dataSource) {
    var jdbc = new JdbcOutboundGateway(dataSource,
        "update message_store set status = ?, error_message = ?, processed_date = now()::timestamp where id = ?");
    jdbc.setRequestPreparedStatementSetter((ps, message) -> {
        ps.setString(1, message.getHeaders().get("status", String.class));
        ps.setString(2, message.getHeaders().get("errorMessage", String.class));
        ps.setLong(3, (Long) message.getPayload());
    });
    return jdbc;
}

这在"快乐之路"中真的很管用.

问题是,有时,我们插入的数据并不是那么好,可能存在外键完整性问题.如果是这种情况,我们想要做的是逐行读取并插入它们.当然,出现外键错误的行将再次失败,但至少会插入其他行.对于成功插入的行,消息存储库将更新为"已处理"状态,对于其他行,将更新为"错误"状态.

因此,场景是在外键约束出现时转到错误通道,并执行单个插入流(同样,针对每个实体).它看起来是这样的:

@Bean
IntegrationFlow batchInsertErrorFlow() {
    return IntegrationFlows
        .from("batchInsertErrorChannel")
        .enrichHeaders(h -> h
            .headerExpression("type", "payload.failedMessage.headers['type']")
            //.replyChannelExpression("payload.cause.failedMessage.headers['replyChannel']") 
            //--> error: Reply message received but the receiving thread has exited due to an exception while sending the request message
            //.replyChannel("gatewayReplyChannel") --> error: StackOverflow
        )
        .transform("payload.cause.failedMessage.payload")
        .gateway(c -> c.route("headers['type'] + '_SINGLE'"))
        .get();
}

public static StandardIntegrationFlow single(EntityType entityType, JdbcOutboundGateway upsertOutboundGateway, DataSource dataSource) {
    return IntegrationFlows
        .from(entityType.name() + "_SINGLE")
        .split()
        .enrichHeaders(e -> e
            .headerExpression("messageStoreIds", "T(java.util.List).of(payload.messageStoreId)")
        )
        .handle(upsertOutboundGateway, e -> e.advice(singleInsertExpressionAdvice)) // I have to use an advice here, enriching the headers do not work
        .enrichHeaders(e -> e.header("status", "PROCESSED"))
        .handle((message, headers) -> headers.get("messageStoreIds", List.class))
        .handle(updateMessageStoreOutboundGateway(dataSource))
        .get();
}

@Bean
public Advice singleInsertExpressionAdvice() {
    var advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setFailureChannelName("singleInsertErrorChannel");
    advice.setOnFailureExpressionString("headers['messageStoreIds']");
    advice.setTrapException(true);
    return advice;
}

@Bean
IntegrationFlow singleInsertErrorFlow(JdbcMessageHandler processFinishedHandler, DataSource dataSource) {
    return IntegrationFlows
        .from("singleInsertErrorChannel")
        .log(Level.INFO, "singleInsertErrorChannel-1", "'-'")
        .enrichHeaders(e -> e
            .headerExpression("errorMessage", "payload.cause.message")
            .header("status", "ERROR")
        )
        .transform("payload.evaluationResult")
        .handle(processFinishedHandler) // does the same thing as updateMessageStoreOutboundGateway()
        .get();
}

问题是,每次它进入错误通道时,我都不知道如何回答初始流的网关,所以它挂起了.我知道是因为"后根..."在这种情况下,永远不会显示日志(log).因此,数据被成功处理,但不再有轮询.

我try 了很多东西,有些在代码中有注释.

我遗漏了什么,以便我可以在错误流完成时回复网关?


UPDATE (SOLVED)

阿特姆指出了我的流程中的不同缺陷.这就是我改变的:

  1. batch流中,我不再通过头设置错误通道,而是使用建议(就像在single流中一样)
  2. batchInsertErrorFlow分中,我加了.replyChannelExpression("payload.failedMessage.headers['replyChannel']")
  3. singleInsertErrorFlow人的情况也是如此
  4. 还是在singleInsertErrorFlow的时候,我不再使用JdbcMessageHandler,而是使用JdbcOutboundGateway,就像在所有其他流程中一样.

在所有这些之后,当upsert中出现错误时,我能够在我的第一个流中看到我的日志(log)"After route",当然,还可以继续轮询数据.

推荐答案

以下是演示用例以及如何处理错误的基本单元测试:

@SpringJUnitConfig
public class So77955301Tests {
    
    @Autowired
    Function<String, String> flowGateway;
    
    @Test
    void errorFlowRepliesToGateway() {
        assertThat(this.flowGateway.apply("test")).isEqualTo("error flow reply");
    }
    
    @Configuration
    @EnableIntegration
    static class TestConfiguration {

        @Bean
        IntegrationFlow flowWithErrorReply(ExpressionEvaluatingRequestHandlerAdvice singleInsertExpressionAdvice) {
            return IntegrationFlow.from(Function.class)
                    .<Object>handle((p, h) -> {
                        throw new RuntimeException("intentional");
                    }, e -> e.advice(singleInsertExpressionAdvice))
                    .get();
        }

        @Bean
        public ExpressionEvaluatingRequestHandlerAdvice singleInsertExpressionAdvice() {
            var advice = new ExpressionEvaluatingRequestHandlerAdvice();
            advice.setFailureChannelName("singleInsertErrorFlow.input");
            advice.setTrapException(true);
            return advice;
        }

        @Bean
        IntegrationFlow singleInsertErrorFlow() {
            return f -> f
                .enrichHeaders(e -> e.replyChannelExpression("payload.failedMessage.headers[replyChannel]"))
                .transform(payload -> "error flow reply");
    }
        
    }

}

关键部分是replyChannelExpression("payload.failedMessage.headers[replyChannel]").

从该ExpressionEvaluatingRequestHandlerAdvice发送到错误通道的ErrorMessage没有回复所需的所有报头.因此,我们需要从failedMessage中提取它们.

另外,你的.handle(processFinishedHandler)可能是单向的,因为你说大门是挂着的.因此,为了能够生成对网关的回复(我相信是.gateway(c -> c.route("headers['type'] + '_SINGLE'"))),您需要将您的singleInsertErrorFlow设置为依赖的,并处理来自失败消息的标头.

我认为我们可以在框架中使其自动化,以丰富这一点:

        MessagingException messagingException =
                new MessageHandlingExpressionEvaluatingAdviceException(message, "Handler Failed",
                        unwrapThrowableIfNecessary(exception), evalResult);
        ErrorMessage errorMessage = new ErrorMessage(messagingException);
        this.messagingTemplate.send(this.failureChannel, errorMessage);

对于来自message的头文件,我们刚刚失败了,但这将只会使它进入当前的6.3版本.

Java相关问答推荐

那么比较似乎不是词典学的,尽管doctor 这么说

Android视图覆盖不阻止点击它后面的控件

我想了解Java中的模块化.编译我的应用程序时,我有一个ResolutionException

从技术上讲,OPC UA客户端是否可以通过转发代理将请求通过 tunel 发送到OPC UA服务器?

更新GWT 2.5.1到2.11.0和sencha GXT 3.1.1到4.1时出现错误

无法在Java中处理PayPal支付响应

Java连接池无法正常工作

try 使用类来包含JSON响应

Spring动态反序列化JSON可以是列表,也可以只是一个对象

如何生成指定范围内的11位序列号?

Spring Validator批注不起作用

如何在右击时 Select 新行?

为什么Instant没有从UTC转换为PostgreSQL的时区?

保持标题窗格的箭头可见,即使设置为不可折叠

[jdk21][Foreign Function&;Memory API]MemoryLayout::varHandle通过可变数组进行 struct 化的问题

Java编译器是否进行了持续的折叠优化,以及如何进行判断?

JavaFX中ListView中的问题

在外部类和内部类之间,当调用外部类内部或外部的主方法时,它们的静态初始化程序的运行顺序不同

Jackson YAML:支持锚点扩展/覆盖

有没有办法仅将 JComboBox 中的选定项目居中(因此保持组合框中的所有项目左对齐)