在我的例子中,我使用JmsOutound Gateway进行MQ连接--inputChannel是ExecutorChannel.

目前,ServiceActivator没有任何输出通道. 网关接口方法返回CompletableFuture对象.

@MessagingGateway
public interface SimpleGateway {

@gateway(requestChannel = "mqRequestChannel")
CompletableFuture sendAndReceiveMqMessage(String message);
}

configuration class

@bean
public MessageChannel mqRequestChannel(AsyncTaskExecutor taskExecutor) {
return new ExecutorChannel(taskExecutor);
}

@ServiceActivator(inputChannel = "mqRequestChannel")
@Bean
public JmsOutboundGateway jmsOutboundGateway(... ) {
    JmsOutboundGateway gateway = new JmsOutboundGateway();
    gateway.setConnectionFactory(mqConnectionFactory);
    gateway.setRequestDestinationName(requestDestination);
    gateway.setReplyDestinationName(responseDestination);
    gateway.setAsync(true);
     ....
     ....
    return gateway;
}
@Autowired
private SimpleGateway simpleGateway;

 CompletableFuture<String> mqResponse = simpleGateway.sendAndReceiveMqMessage("sdsadas");

我试着使用递归,但它不是最好的解决方案.

    private void callGateway(int retry, int requestNum){

        String dummyOfs = "dsfsdf";

            CompletableFuture<String> response = simpleGateway.sendAndReceiveMqMessage(dummyOfs);

        nCoreResponse.exceptionally(throwable -> { LOG.info("error occured");
        LOG.info("[Error section CURRENT THREAD in gateway class call ]: {} , requestNum = {}, retry = {}", Thread.currentThread(), requestNum, retry );
        if(MAX_RETRY > retry){
            Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
            callGateway(retry + 1, requestNum);
        }

        return null;});

        response.thenAccept((String someString) ->
    {
        LOG.info("[success ]: {} , returned value: {}", Thread.currentThread(), someString);
    });
    };
}

我还在考虑将重试推送到不同的线程池.

但对我来说,重要的是设置重试次数和后退时间,而且如果请求超时,则不会触发或停止重试.

推荐答案

CompletableFuture左右没有重试API.至少在我看来是这样.根据您的gateway.setAsync(true)ExecutorChannel,从消息传递网关返回的CompletableFuture和来自JMS的回复之间没有任何联系.在完成Future之后,JmsOutboundGateway仍然会从它对replyChannel标头的调用中产生实际的结果,并且只有在它被包装到另一个CompletableFuture之后,才能满足您的消息传递网关的期望.ExecutorChannel也是一种开销,因为来自消息传递网关方法的CompletableFuture返回类型使用TaskExecutor作为异步请求回复.因此,在请求和应答之间确实有大量的线程切换.

这可能不是你想要的,但我可以建议你把你的sendAndReceiveMqMessage()换成Mono.在那里,您可以使用retry()API.对于下游流量来说,这并不重要,它仍然将是async.最后,如果这是您的API使用者所需要的,则可以将这Mono的成功结果转换为CompletableFuture(请参见Mono.toFuture()).

Java相关问答推荐

在Java 8之后,HashMap的最坏情况下时间复杂度仍然是O(n)而不是O(log n)?

无法找到符号错误—Java—封装

H2弹簧靴试验跌落台

Javascript在边界中心调整ImageView大小

如何在访问完所有文件后加入所有线程?

在for—each循环中的AnimationTimer中的if语句'

@ IdClass with @ Inheritance(策略= InheritanceType. SINGLE_TABLE)

确定Java中Math.Ranb()输出的上限

Java记录的不同序列化/反序列化

如何判断一个矩阵是否为有框矩阵?

如何在运行时动态创建表(使用Java、JPA、SprringBoot)

Chunk(Int)已弃用并标记为要删除

SpringBoot Kafka自动配置-适用于SASL_PLAYTEXT的SSLBundle 包,带SCRAM-SHA-512

如何只修改父类ChroniclerView位置0处的第一个嵌套ChroniclerView(child)元素?

Java连接池无法正常工作

如何在一行中使用Dijkstra中的Java Stream

根本不显示JavaFX阿拉伯字母

对角线填充二维数组

JavaFX,GridPane:在GridPane的列中生成元素将保持所有列的宽度

URI构造函数错误?