我有一个Wildfly23集群,它有两个 node (node-1、node-2),运行的是独立全HA配置文件. 两个集群 node bootstrap 并相互正确通信(据我所知).

我的目的是从 node 1发送一条关于主题的JMS消息,并让 node 1和 node 2上的消息驱动Bean(MDB)使用该消息.

MDB代码:

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.Message;
import javax.jms.MessageListener;

@MessageDriven(activationConfig = {
        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"), 
        @ActivationConfigProperty(propertyName = "destinationLookup", 
            propertyValue = "java:/jms/topic/myTopic"), 
        @ActivationConfigProperty(propertyName = "maxSession", propertyValue = "1")
})
public class ClusteredEventListener implements MessageListener {

    @Override
    public void onMessage(final Message message) {
        // consume message
    }

}

消息发布者代码:

import java.io.Serializable;

import javax.annotation.Resource;
import javax.ejb.Startup;
import javax.enterprise.context.ApplicationScoped;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;

@Startup
@ApplicationScoped
public class ClusteredEventSender {

    @Resource(lookup = "java:/jms/topic/myTopic")
    private Topic topic;

    @Resource(mappedName = "java:/ConnectionFactory")
    private TopicConnectionFactory connectionFactory;

    public void broadcast(final Serializable event) {
        try {
            try (TopicConnection connection = this.connectionFactory.createTopicConnection()) { 
                try (TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE)) {
                    try (MessageProducer messageProducer = session.createPublisher(this.topic)) { 
                        final ObjectMessage message = session.createObjectMessage(event);
                        messageProducer.send(message);
                    }
                }
            }
        } catch (final JMSException e) {
            log.error("Could not broadcast event to topic: " + event, e);
        }
    }

}

来自Standalone.xml的代码片段:

<subsystem xmlns="urn:jboss:domain:messaging-activemq:13.0">
    <server name="default">
        ...
        <jms-topic name="myTopic" entries="java:/jms/topic/myTopic"/>
        ...
    </server>
</subsystem>

Outcome is that the messages are only consumed on the node that generated them. The other node does not receive any message.

Experiments

我try 使用具有持久订阅的java:jboss/exported/jms/RemoteConnectionFactory,每个 node 使用唯一的clientIDsubscriptionName,并使用用户"jmsuser", 并使用Java:JBoss/EXPORTED(java:jboss/exported/jms/topic/myTopic)中的主题,更改/扩展MDB上的注释,如下所示:

...
@ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "java:jboss/exported/jms/topic/myTopic"),
@ActivationConfigProperty(propertyName = "subscriptionDurability", propertyValue = "Durable"), 
@ActivationConfigProperty(propertyName = "subscriptionName", propertyValue = "subscription-${jboss.node.name}"), 
@ActivationConfigProperty(propertyName = "clientID", propertyValue = "node-${jboss.node.name}"), 
@ActivationConfigProperty(propertyName = "connectionFactoryLookup", propertyValue = "java:jboss/exported/jms/RemoteConnectionFactory"), 
@ActivationConfigProperty(propertyName = "user", propertyValue = "jmsuser"), 
@ActivationConfigProperty(propertyName = "password", propertyValue = "jmsuser")
...

注意:"jmsuser"是使用批处理脚本Add-user.bat添加的,该脚本位于WildFly/bin目录中.它被分配了角色"Guest".客户角色在持久队列方面得到了扩展. 注释中的属性替换(以使clientIDsubscriptionName中的${jboss.node.name}正常工作) 已在Standalone.xml中激活:

<subsystem xmlns="urn:jboss:domain:ee:6.0">
    ...
    <annotation-property-replacement>true</annotation-property-replacement>
    ...
</subsystem>
...
<subsystem xmlns="urn:jboss:domain:messaging-activemq:13.0">
    <server name="default">
        ...
        <security-setting name="#">
            ...
            <role name="guest" delete-non-durable-queue="true"
               create-non-durable-queue="true"
               delete-durable-queue="true"
               create-durable-queue="true"
               consume="true"
               send="true" />
            ...
        </security-setting>
        ...
        <jms-topic name="myTopic" entries="java:jboss/exported/jms/topic/myTopic"/>
        ...
    </server>
</subsystem>
...

修改后的消息发布者代码:

public class ClusteredEventSender {

    @Resource(lookup = "java:jboss/exported/jms/topic/myTopic")
    private Topic topic;

    @Resource(lookup = "java:jboss/exported/jms/RemoteConnectionFactory")
    private TopicConnectionFactory connectionFactory;

    public void broadcast(final Serializable event) {
        try {
            try (TopicConnection connection = this.connectionFactory.createTopicConnection("jmsuser", "jmsuser")) {
                try (TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE)) {
                    try (MessageProducer messageProducer = session.createPublisher(this.topic)) { 
                        final ObjectMessage message = session.createObjectMessage(event);
                        messageProducer.send(message);
                    }
                }
            }
        } catch (final JMSException e) {
            log.error("Could not broadcast event to topic: " + event, e);
        }
    }

}

Experiments outcome: The connection to the RemoteConnectionFactory works, but nevertheless the behaviour remains the same as before.

My questions are:个 如何在WildFly集群中实现使用JMS/ActiveMQ的发布/订阅?消息驱动的Bean是什么样子,消息是如何发送的?需要什么配置?

推荐答案

感谢@ehsavoie的暗示,我们设法解决了这个问题.简而言之,解决方案:

  • 没有必要使用java:jboss/exported/jms/topic/myTopicjava:jboss/exported/jms/RemoteConnectionFactory.subscriptionNameclientIduserpassword的任何@ActivationConfigProperty条目也不是.问题中描述的所有实验都是死胡同.
  • 创建从 node 1到 node 2的连接器(如注释中所建议的)具有一定的效果,因为从那时起, node 1能够向 node 2发送事件.然而,由于我们使用的是JGroups发现(如在Full-ha中一样),这仅仅是发现没有正确工作的线索.
  • After changing the cluster password from the default "CHANGE ME!!" (duh!) 和 adding the attribute jms-connection-factory="java:jboss/DefaultJMSConnectionFactory" to the <default-bindings> in our st和alone.xml everything worked as expected. Those two little rascals slipped away during setup.

因此,相关的St和alone.xml代码片断如下:

<cluster password="something else than CHANGE ME!!"/>

<default-bindings ... jms-connection-factory="java:jboss/DefaultJMSConnectionFactory" ... />

Update

为了更好地定位:上述代码片段的位置如下:

<server xmlns="urn:jboss:domain:16.0">
    ...
    <profile>
        ...
        <subsystem xmlns="urn:jboss:domain:messaging-activemq:13.0">
            <server name="default">
                <cluster .../>
                ...
            </server>
        </subsystem>
        ...
        <subsystem xmlns="urn:jboss:domain:ee:6.0">
            <default-bindings .../>
        </subsystem>
        ...
    </profile>
    ...
</server>

Java相关问答推荐

Java 22模式匹配不适用于记录模式匹配.给出汇编问题

当一个链表中间有一个循环时,它的松散部分会发生什么?

Listview—在Android Java中正确链接项目时出错

如何在Javascript中设置文本区域圆角的样式

参数值[...]与预期类型java.util.Date不匹配

流迭代列表<;对象>;上的NoSuchElementException

使用Mockito进行的Junit测试失败

与Spring Boot相关的实体未正确保存

try 判断可选参数是否为空时出现空类型安全警告

自定义批注的外推属性值

搜索列表返回多个频道

Groovy/Java:匹配带引号的命令选项

在Java中将.GRF转换为图像文件

为什么JavaFX MediaPlayer音频播放在Windows和Mac上运行良好,但在Linux(POPOS/Ubuntu)上却有问题?

在具有Quarkus Panache的PostgreSQL中将JSON数据存储为JSONB时,会将其存储为转义字符串

当使用不同的参数类型调用时,为什么围绕Objects.equals的类型安全包装不会失败?

如何在Selenium上继续使用最新的WebDriver版本

如何在单元测试中获得我的装饰Mapstruct映射器的实例?

Java 21保护模式的穷尽性

ExecutorService:如果我向Executor提交了太多任务,会发生什么?