我正在try 创建一个自定义命令,该命令应该使用ActiveMQ Classic队列中的消息.我使用的是php-amqplib,并创建了我的定制连接器和ActiveMQServiceProvider.我在运行sail artisan horizon:consume-activemq时出现此错误

 PhpAmqpLib\Exception\AMQPInvalidFrameException 

  Invalid frame type 65

...

  7   app/Queue/Connectors/ActiveMQConnector.php:15
      PhpAmqpLib\Connection\AMQPStreamConnection::__construct()

  8   app/Console/Commands/ConsumeActiveMQMessages.php:29
      App\Queue\Connectors\ActiveMQConnector::connect()

我查看了Docker日志(log),发现php-amqplib似乎不支持AMQP v1.0:

Connection attempt from non AMQP v1.0 client. AMQP,0,0,9,1
2024-02-22 13:24:44  WARN | Transport Connection to: tcp://192.168.65.1:32999 failed: org.apache.activemq.transport.amqp.AmqpProtocolException: Connection from the client using unsupported AMQP attempted

我是不是理解错了或者配置错了什么?

这些是我的配置:

queue.php

    'connections' => [

...
        'activemq' => [
            'driver' => 'activemq',
            'host' => env('ACTIVEMQ_HOST', 'localhost'),
            'port' => env('ACTIVEMQ_PORT', 61613),
            'username' => env('ACTIVEMQ_USERNAME', 'guest'),
            'password' => env('ACTIVEMQ_PASSWORD', 'guest'),
            'queue' => env('ACTIVEMQ_QUEUE', ''),
            'exchange_name' => env('ACTIVEMQ_EXCHANGE_NAME', ''),
        ],

MyLocal.env.env

ACTIVEMQ_HOST=host.docker.internal
ACTIVEMQ_PORT=5672
ACTIVEMQ_USER=admin
ACTIVEMQ_PASSWORD=admin
ACTIVEMQ_QUEUE=activemqTest

ActiveMQServiceProvider.php

<?php

namespace App\Providers;

use App\Queue\Connectors\ActiveMQConnector;
use Illuminate\Queue\QueueManager;
use Illuminate\Support\ServiceProvider;

class ActiveMQServiceProvider extends ServiceProvider
{
    /**
     * Register services.
     */
    public function register(): void
    {
    }

    /**
     * Bootstrap services.
     */
    public function boot(): void
    {
        $this->app->make(QueueManager::class)->addConnector('activemq', function () {
            return new ActiveMQConnector();
        });
    }
}

ActiveMQConnector.php

<?php

namespace App\Queue\Connectors;

use Illuminate\Queue\Connectors\ConnectorInterface;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ActiveMQConnector implements ConnectorInterface
{
    /**
     * @throws \Exception
     */
    public function connect(array $config)
    {
        return new AMQPStreamConnection(
            $config['host'],
            $config['port'],
            $config['username'],
            $config['password'],
            $config['vhost']
        );
    }
}

ConsumeActiveMQMessages.php

<?php

namespace App\Console\Commands;

use App\Queue\Connectors\ActiveMQConnector;
use Illuminate\Console\Command;
use PhpAmqpLib\Message\AMQPMessage;

class ConsumeActiveMQMessages extends Command
{
    protected $signature = 'horizon:consume-activemq';

    protected $description = 'Consume messages from ActiveMQ and process them within Horizon';

    /**
     * @throws \Exception
     */
    public function handle()
    {
        $connector = new ActiveMQConnector();
        $config = [
            'host' => config('queue.connections.activemq.host'),
            'port' => config('queue.connections.activemq.port'),
            'username' => config('queue.connections.activemq.username'),
            'password' =>config('queue.connections.activemq.password'),
            'vhost' => config('queue.connections.activemq.vhost') !== null ?config('queue.connections.activemq.vhost') : '/',
        ];

        $connection = $connector->connect($config);

        $channel = $connection->channel();

        $callback = function (AMQPMessage $message) {
            $this->processMessage($message);
        };

        $channel->basic_consume(config('queue.connections.activemq.queue'), '', false, true, false, false, $callback);

        while ($channel->is_consuming()) {
            $channel->wait();
        }

        $channel->close();
        $connection->close();
    }

    protected function processMessage(AMQPMessage $message)
    {
        $this->info('Received message: ' . $message->getBody());
    }
}

Laravel:V10.10 PHP V8.1

我try 更改主机和端口,但什么都没有更改.

推荐答案

ActiveMQ(classic 和Artemis)使用ISO标准AMQP规范AMQP 1.0,而php-amqplib似乎只支持0.9.1草案标准,因此您无法使用该客户端连接到ActiveMQ.如果你需要使用AMQP 1.0运行时,你需要找到一个支持AMQP 1.0的php客户端.

这两个代理也都支持STOMP协议,我猜有适用于PHP的STOMP客户端.

Php相关问答推荐

如果活动订阅者的购物车中有订阅项目,则在WooCommerce签出中显示通知

在PHP中替换数组中的文本并同时使用其他函数

将变体设置字段添加到WooCommerce中的特定变量产品

Laravel关系-为用户获取属于组织的所有团队

如何在PHP中出现弃用警告时触发错误?

如何实现对数据库表中多列的唯一约束?

在Symfony 6.4中将工作流动态添加到服务

使用CODIGNITER3中的OR和AND子句进行查询

在个人资料页面上显示个人资料图像

在带有livewire 3的laravel中使用规则方法时,无法进行实时验证

函数存储到变量中

Rappasoft Datatables V2(我想在另一个表具有相同值列时显示数据)

更改WooCommerce checkout 中的错误消息以获得不可用的送货方式

更改特定产品标签的 Woocommerce 产品名称

从 WooCommerce 购物车中删除总计部分,同时保留小计行

如何使用 PHP 将带有图像的文本上传到 Linkedin?

htaccess 重定向子域错误的 url

使用用户元数据填写 woocommerce checkout 字段

使用 RSA 2048 公钥验证 RSA PKCS#1v1.5 SHA 256 签名

如何限制for循环php中的条目数