composer require hyperf/amqp
配置 | 类型 | 默认值 | 备注 |
---|
host | string | localhost | Host |
port | int | 5672 | 端口号 |
user | string | guest | 用户名 |
password | string | guest | 密码 |
vhost | string | / | vhost |
concurrent.limit | int | 0 | 同时消费的数量 |
pool | object | | 连接池配置 |
params | object | | 基本配置 |
配置
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
return [
'default' => [
'host' => env('AMQP_HOST', 'localhost'),
'port' => env('AMQP_PORT', 5672),
'user' =>env('AMQP_USER','admin'),
'password' => env('AMQP_PASSWORD','admin123'),
'vhost' => env('AMQP_VHOST', '/'),
'concurrent' => [
'limit' => 1,
],
'pool' => [
'min_connections' => 1,
'max_connections' => 10,
'connect_timeout' => 10.0,
'wait_timeout' => 3.0,
'heartbeat' => -1,
],
'params' => [
'insist' => false,
'login_method' => 'AMQPLAIN',
'login_response' => null,
'locale' => 'en_US',
'connection_timeout' => 3.0,
'read_write_timeout' => 6.0,
'context' => null,
'keepalive' => false,
'heartbeat' => 3,
],
],
];
消费者
<?php
declare(strict_types=1);
namespace App\Amqp\Consumer;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
/**
* @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf",maxConsumption=4, name ="DemoConsumer", nums=1, enable=true)
*/
class DemoConsumer extends ConsumerMessage
{
//消费
public function consume($data): string
{
var_dump($data);
sleep(3);
var_dump(Result::ACK);
return Result::ACK;
}
}
生产者
<?php
declare(strict_types=1);
namespace App\Amqp\Producer;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Message\ProducerMessage;
/**
* @Producer(exchange="hyperf", routingKey="hyperf")
*/
class DemoProducer extends ProducerMessage
{
public function __construct($data)
{
$this->payload = $data;
}
}