在我的例子中,我使用JmsOutound Gateway进行MQ连接--inputChannel是ExecutorChannel.
目前,ServiceActivator没有任何输出通道. 网关接口方法返回CompletableFuture对象.
@MessagingGateway
public interface SimpleGateway {
@gateway(requestChannel = "mqRequestChannel")
CompletableFuture sendAndReceiveMqMessage(String message);
}
configuration class
@bean
public MessageChannel mqRequestChannel(AsyncTaskExecutor taskExecutor) {
return new ExecutorChannel(taskExecutor);
}
@ServiceActivator(inputChannel = "mqRequestChannel")
@Bean
public JmsOutboundGateway jmsOutboundGateway(... ) {
JmsOutboundGateway gateway = new JmsOutboundGateway();
gateway.setConnectionFactory(mqConnectionFactory);
gateway.setRequestDestinationName(requestDestination);
gateway.setReplyDestinationName(responseDestination);
gateway.setAsync(true);
....
....
return gateway;
}
@Autowired
private SimpleGateway simpleGateway;
CompletableFuture<String> mqResponse = simpleGateway.sendAndReceiveMqMessage("sdsadas");
我试着使用递归,但它不是最好的解决方案.
private void callGateway(int retry, int requestNum){
String dummyOfs = "dsfsdf";
CompletableFuture<String> response = simpleGateway.sendAndReceiveMqMessage(dummyOfs);
nCoreResponse.exceptionally(throwable -> { LOG.info("error occured");
LOG.info("[Error section CURRENT THREAD in gateway class call ]: {} , requestNum = {}, retry = {}", Thread.currentThread(), requestNum, retry );
if(MAX_RETRY > retry){
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
callGateway(retry + 1, requestNum);
}
return null;});
response.thenAccept((String someString) ->
{
LOG.info("[success ]: {} , returned value: {}", Thread.currentThread(), someString);
});
};
}
我还在考虑将重试推送到不同的线程池.
但对我来说,重要的是设置重试次数和后退时间,而且如果请求超时,则不会触发或停止重试.