我有一个Wildfly23集群,它有两个 node (node-1、node-2),运行的是独立全HA配置文件. 两个集群 node bootstrap 并相互正确通信(据我所知).
我的目的是从 node 1发送一条关于主题的JMS消息,并让 node 1和 node 2上的消息驱动Bean(MDB)使用该消息.
MDB代码:
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.Message;
import javax.jms.MessageListener;
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"),
@ActivationConfigProperty(propertyName = "destinationLookup",
propertyValue = "java:/jms/topic/myTopic"),
@ActivationConfigProperty(propertyName = "maxSession", propertyValue = "1")
})
public class ClusteredEventListener implements MessageListener {
@Override
public void onMessage(final Message message) {
// consume message
}
}
消息发布者代码:
import java.io.Serializable;
import javax.annotation.Resource;
import javax.ejb.Startup;
import javax.enterprise.context.ApplicationScoped;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
@Startup
@ApplicationScoped
public class ClusteredEventSender {
@Resource(lookup = "java:/jms/topic/myTopic")
private Topic topic;
@Resource(mappedName = "java:/ConnectionFactory")
private TopicConnectionFactory connectionFactory;
public void broadcast(final Serializable event) {
try {
try (TopicConnection connection = this.connectionFactory.createTopicConnection()) {
try (TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE)) {
try (MessageProducer messageProducer = session.createPublisher(this.topic)) {
final ObjectMessage message = session.createObjectMessage(event);
messageProducer.send(message);
}
}
}
} catch (final JMSException e) {
log.error("Could not broadcast event to topic: " + event, e);
}
}
}
来自Standalone.xml的代码片段:
<subsystem xmlns="urn:jboss:domain:messaging-activemq:13.0">
<server name="default">
...
<jms-topic name="myTopic" entries="java:/jms/topic/myTopic"/>
...
</server>
</subsystem>
Outcome is that the messages are only consumed on the node that generated them. The other node does not receive any message.个
Experiments个
我try 使用具有持久订阅的java:jboss/exported/jms/RemoteConnectionFactory
,每个 node 使用唯一的clientID
和subscriptionName
,并使用用户"jmsuser",
并使用Java:JBoss/EXPORTED(java:jboss/exported/jms/topic/myTopic
)中的主题,更改/扩展MDB上的注释,如下所示:
...
@ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "java:jboss/exported/jms/topic/myTopic"),
@ActivationConfigProperty(propertyName = "subscriptionDurability", propertyValue = "Durable"),
@ActivationConfigProperty(propertyName = "subscriptionName", propertyValue = "subscription-${jboss.node.name}"),
@ActivationConfigProperty(propertyName = "clientID", propertyValue = "node-${jboss.node.name}"),
@ActivationConfigProperty(propertyName = "connectionFactoryLookup", propertyValue = "java:jboss/exported/jms/RemoteConnectionFactory"),
@ActivationConfigProperty(propertyName = "user", propertyValue = "jmsuser"),
@ActivationConfigProperty(propertyName = "password", propertyValue = "jmsuser")
...
注意:"jmsuser"是使用批处理脚本Add-user.bat添加的,该脚本位于WildFly/bin目录中.它被分配了角色"Guest".客户角色在持久队列方面得到了扩展.
注释中的属性替换(以使clientID
和subscriptionName
中的${jboss.node.name}
正常工作)
已在Standalone.xml中激活:
<subsystem xmlns="urn:jboss:domain:ee:6.0">
...
<annotation-property-replacement>true</annotation-property-replacement>
...
</subsystem>
...
<subsystem xmlns="urn:jboss:domain:messaging-activemq:13.0">
<server name="default">
...
<security-setting name="#">
...
<role name="guest" delete-non-durable-queue="true"
create-non-durable-queue="true"
delete-durable-queue="true"
create-durable-queue="true"
consume="true"
send="true" />
...
</security-setting>
...
<jms-topic name="myTopic" entries="java:jboss/exported/jms/topic/myTopic"/>
...
</server>
</subsystem>
...
修改后的消息发布者代码:
public class ClusteredEventSender {
@Resource(lookup = "java:jboss/exported/jms/topic/myTopic")
private Topic topic;
@Resource(lookup = "java:jboss/exported/jms/RemoteConnectionFactory")
private TopicConnectionFactory connectionFactory;
public void broadcast(final Serializable event) {
try {
try (TopicConnection connection = this.connectionFactory.createTopicConnection("jmsuser", "jmsuser")) {
try (TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE)) {
try (MessageProducer messageProducer = session.createPublisher(this.topic)) {
final ObjectMessage message = session.createObjectMessage(event);
messageProducer.send(message);
}
}
}
} catch (final JMSException e) {
log.error("Could not broadcast event to topic: " + event, e);
}
}
}
Experiments outcome: The connection to the RemoteConnectionFactory works, but nevertheless the behaviour remains the same as before.个
My questions are:个 如何在WildFly集群中实现使用JMS/ActiveMQ的发布/订阅?消息驱动的Bean是什么样子,消息是如何发送的?需要什么配置?