我有一个ActiveMQ Artemis服务器,工作正常.现在我想在服务器上为使用pub—sub的 node 应用程序启用AMQP.虽然我的 node pub子能够连接到它,但即使连接是有效的,它们也不会发布或接收.为什么我的wine 吧不发送为什么我的潜艇不接收?&

我将遵循publishersubscriber的例子来描述AMQP Rhea on Github.两个都可以连接到localhost:5672.

下面是我的ActiveMQ Artemis JMS服务器实现.请注意,我添加了2个端口和addAcceptorConfiguration(一个用于Artemis,另一个用于AMQP的默认端口).

//this is my jmsserver
/*
 * This Java source file was generated by the Gradle 'init' task.
 */
package testSupport.artemis.server;

import java.util.List;
import java.util.stream.Collectors;

import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;

public class JMSServer {
    private static int qId = 0;

    private ActiveMQServer server;
    private String errMsg = "";

    /**
     * Factory method to create an instance of a JMS Server
     * @param topics list of topics to add to the server
     * @return
     */
    public static JMSServer createJMSServer(List<String> topics) {
        JMSServer s = new JMSServer();
        s.start();
        if (topics != null) {
            s.setTopics(topics);
        }
        return s;
    }

    /**
     * Factory method to create an instance of a JMS Server
     * @return
     */
    public static JMSServer createJMSServer() {
        return createJMSServer(null);
    }

    /**
     * Updates the server config with settings required to connect invm or from
     * another process on localhost
     * 
     * @param config
     */
    public static void updateConfig(Configuration config) {
        try {
            config.setPersistenceEnabled(false)
                  .setSecurityEnabled(false)
                  .addAcceptorConfiguration("tcp", "tcp://localhost:61616")
                  .addAcceptorConfiguration("amqp", "tcp://localhost:5672");

            // SpringBoot may have already created an invm connector at 0, so only add one if it doesnt exist
            if (config.getAcceptorConfigurations().stream().noneMatch(poo -> poo.getParams().containsKey("serverId") &&
                    Integer.valueOf(poo.getParams().get("serverId").toString()) == 0)) {
                config.addAcceptorConfiguration("invm", "vm://0");
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    /**
     * Default Constructor
     */
    public JMSServer() {
        try {
            Configuration config = new ConfigurationImpl();
            updateConfig(config);
            server = ActiveMQServers.newActiveMQServer(config);
        } catch (Exception ex) {
            errMsg = ex + ": " + ex.getMessage();
            ex.printStackTrace();
        }
    }

    /**
     * Start the JMS Server
     * @return
     */
    public boolean start() {
        boolean success = false;
        try {
            server.start();

            for (int i = 0; i < 50; i++) {
                Thread.sleep(100);
                if (server.isActive()) {
                    success = true;
                    break;
                }
            }
        } catch (Exception ex) {
            errMsg = ex + ": " + ex.getMessage();
            ex.printStackTrace();
        }

        return success;
    }

    /**
     * Stop the JMS Server
     */
    public void stop() {
        try {
            server.stop();
        } catch (Exception ex) {
            errMsg = ex + ": " + ex.getMessage();
            ex.printStackTrace();
        }
    }

    /**
     * Get the Error Message
     * @return
     */
    public String getErrMsg() {
        return errMsg;
    }

    /**
     * Set a list of topics to add to this server
     * @param topics
     * @return
     */
    public boolean setTopics(List<String> topics) {
        boolean success = true;

        if (!server.isActive()) {
            errMsg = "Topics cannot be set until the server has been started.";
            return false;
        }

        // add the topics
        for (String t : topics) {
            try {
                SimpleString addr = SimpleString.toSimpleString(t);
                QueueConfig qcfg = QueueConfig.builderWith(qId, addr, addr).autoCreated(false)
                        .autoDelete(false)
                        .durable(true)
                        .build();

                server.getQueueFactory().createQueueWith(qcfg);
                server.addAddressInfo(new AddressInfo(addr, RoutingType.MULTICAST));
                qId++;
            } catch (Exception ex) {
                errMsg = ex + ": " + ex.getMessage();
                success = false;
            }
        }
        return success;
    }

}

Update: 因此, comments 建议我必须在类路径中添加artemis-amqp-protocol.在Gradle中,我确保包括artemis-amqp-protocolartemis-jms-server以及匹配的版本,如下所示:

dependencies {
    implementation 'org.apache.activemq:artemis-jms-server:2.22.0'
    implementation 'org.apache.activemq:artemis-amqp-protocol:2.22.0'
    // other dependencies here...
}

推荐答案

您必须确保artemis-amqp-protocol jar在您的类路径上,否则代理将无法支持AMQP.如果是,那么你应该会看到一条日志(log)消息,上面写着:

AMQ221043:找到方案模块:[artemis—amqp—protocol].添加协议支持:AMQP.

然后稍后你会看到这样的东西:

AMQ221020:在本地主机:5672处启动了EPOLL接受器,用于方案[AMQP].

如果你没有看到这些(或非常接近这些),那么代理将不支持AMQP.

Java相关问答推荐

Android -如何修复Java.time.zone. ZoneRulesExcept:未知时区ID:Europe/Kyiv

Java中是否有某种类型的池可以避免重复最近的算术运算?

如何在运行时动态创建表(使用Java、JPA、SprringBoot)

蒙蒂霍尔比赛结果不正确

将响应转换为带值的键

Spring和可编辑";where";@Query

搜索列表返回多个频道

如何从日志(log)行中删除包名称?

如何使用路径过渡方法使 node 绕圆旋转?

当构造函数创建一个新实例时,Java为什么需要&new";

为什么这种递归会有这样的行为?

如果执行@BeForeEach#repository.save(),则测试中的UnitTest最终UUID会发生更改

将BlockingQueue+守护程序线程替换为执行器

如何在特定关键字后提取与模式匹配的多个值?

spring 更新多项管理关系

将@Transactional添加到Spring框架中链下的每个方法会产生什么效果?

try 添加;按流派搜索;在Web应用程序上,但没有;I don’我不知道;It’这个代码错了

Java 21保护模式的穷尽性

java构造函数中的冻结操作何时发生?

移动二维数组的行