我正在try 创建一个自定义命令,该命令应该使用ActiveMQ Classic队列中的消息.我使用的是php-amqplib,并创建了我的定制连接器和ActiveMQServiceProvider.我在运行sail artisan horizon:consume-activemq
时出现此错误
PhpAmqpLib\Exception\AMQPInvalidFrameException
Invalid frame type 65
...
7 app/Queue/Connectors/ActiveMQConnector.php:15
PhpAmqpLib\Connection\AMQPStreamConnection::__construct()
8 app/Console/Commands/ConsumeActiveMQMessages.php:29
App\Queue\Connectors\ActiveMQConnector::connect()
我查看了Docker日志(log),发现php-amqplib似乎不支持AMQP v1.0:
Connection attempt from non AMQP v1.0 client. AMQP,0,0,9,1
2024-02-22 13:24:44 WARN | Transport Connection to: tcp://192.168.65.1:32999 failed: org.apache.activemq.transport.amqp.AmqpProtocolException: Connection from the client using unsupported AMQP attempted
我是不是理解错了或者配置错了什么?
这些是我的配置:
queue.php
个
'connections' => [
...
'activemq' => [
'driver' => 'activemq',
'host' => env('ACTIVEMQ_HOST', 'localhost'),
'port' => env('ACTIVEMQ_PORT', 61613),
'username' => env('ACTIVEMQ_USERNAME', 'guest'),
'password' => env('ACTIVEMQ_PASSWORD', 'guest'),
'queue' => env('ACTIVEMQ_QUEUE', ''),
'exchange_name' => env('ACTIVEMQ_EXCHANGE_NAME', ''),
],
MyLocal.env.env
ACTIVEMQ_HOST=host.docker.internal
ACTIVEMQ_PORT=5672
ACTIVEMQ_USER=admin
ACTIVEMQ_PASSWORD=admin
ACTIVEMQ_QUEUE=activemqTest
ActiveMQServiceProvider.php
个
<?php
namespace App\Providers;
use App\Queue\Connectors\ActiveMQConnector;
use Illuminate\Queue\QueueManager;
use Illuminate\Support\ServiceProvider;
class ActiveMQServiceProvider extends ServiceProvider
{
/**
* Register services.
*/
public function register(): void
{
}
/**
* Bootstrap services.
*/
public function boot(): void
{
$this->app->make(QueueManager::class)->addConnector('activemq', function () {
return new ActiveMQConnector();
});
}
}
ActiveMQConnector.php
个
<?php
namespace App\Queue\Connectors;
use Illuminate\Queue\Connectors\ConnectorInterface;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ActiveMQConnector implements ConnectorInterface
{
/**
* @throws \Exception
*/
public function connect(array $config)
{
return new AMQPStreamConnection(
$config['host'],
$config['port'],
$config['username'],
$config['password'],
$config['vhost']
);
}
}
ConsumeActiveMQMessages.php
个
<?php
namespace App\Console\Commands;
use App\Queue\Connectors\ActiveMQConnector;
use Illuminate\Console\Command;
use PhpAmqpLib\Message\AMQPMessage;
class ConsumeActiveMQMessages extends Command
{
protected $signature = 'horizon:consume-activemq';
protected $description = 'Consume messages from ActiveMQ and process them within Horizon';
/**
* @throws \Exception
*/
public function handle()
{
$connector = new ActiveMQConnector();
$config = [
'host' => config('queue.connections.activemq.host'),
'port' => config('queue.connections.activemq.port'),
'username' => config('queue.connections.activemq.username'),
'password' =>config('queue.connections.activemq.password'),
'vhost' => config('queue.connections.activemq.vhost') !== null ?config('queue.connections.activemq.vhost') : '/',
];
$connection = $connector->connect($config);
$channel = $connection->channel();
$callback = function (AMQPMessage $message) {
$this->processMessage($message);
};
$channel->basic_consume(config('queue.connections.activemq.queue'), '', false, true, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();
}
protected function processMessage(AMQPMessage $message)
{
$this->info('Received message: ' . $message->getBody());
}
}
Laravel:V10.10 PHP V8.1
我try 更改主机和端口,但什么都没有更改.