安装

composer require hyperf/amqp

默认配置

配置类型默认值备注
hoststringlocalhostHost
portint5672端口号
userstringguest用户名
passwordstringguest密码
vhoststring/vhost
concurrent.limitint0同时消费的数量
poolobject连接池配置
paramsobject基本配置

配置

<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/

return [
'default' => [
'host' => env('AMQP_HOST', 'localhost'),
'port' => env('AMQP_PORT', 5672),
'user' =>env('AMQP_USER','admin'),
'password' => env('AMQP_PASSWORD','admin123'),
'vhost' => env('AMQP_VHOST', '/'),
'concurrent' => [
'limit' => 1,
],
'pool' => [
'min_connections' => 1,
'max_connections' => 10,
'connect_timeout' => 10.0,
'wait_timeout' => 3.0,
'heartbeat' => -1,
],
'params' => [
'insist' => false,
'login_method' => 'AMQPLAIN',
'login_response' => null,
'locale' => 'en_US',
'connection_timeout' => 3.0,
'read_write_timeout' => 6.0,
'context' => null,
'keepalive' => false,
'heartbeat' => 3,
],
],
];

消费者

<?php

declare(strict_types=1);

namespace App\Amqp\Consumer;

use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;

/**
* @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf",maxConsumption=4, name ="DemoConsumer", nums=1, enable=true)
*/
class DemoConsumer extends ConsumerMessage
{
//消费
public function consume($data): string
{
var_dump($data);
sleep(3);
var_dump(Result::ACK);
return Result::ACK;
}
}

生产者

<?php

declare(strict_types=1);

namespace App\Amqp\Producer;

use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Message\ProducerMessage;

/**
* @Producer(exchange="hyperf", routingKey="hyperf")
*/
class DemoProducer extends ProducerMessage
{
public function __construct($data)
{
$this->payload = $data;
}
}

发表评论

电子邮件地址不会被公开。 必填项已用*标注