标签:exchange host this moc result 参数 使用 esc data
# producer php bin/hyperf.php gen:amqp-producer DemoProducer # consumer php bin/hyperf.php gen:amqp-consumer DemoConsumer # 使用 command 盗用 DemoProducer 进行验证 php bin/hyperf.php gen:command TestCommand
producer 发个消息:
parent::__construct(‘t‘);@Inject() 注解注入$this->producer->produce(new DemoProducer(‘test‘. date(‘Y-m-d H:i:s‘)));<?php
declare(strict_types=1);
namespace App\Command;
use App\Amqp\Producer\DemoProducer;
use Hyperf\Amqp\Producer;
use Hyperf\Command\Command as HyperfCommand;
use Hyperf\Command\Annotation\Command;
use Hyperf\Di\Annotation\Inject;
use Psr\Container\ContainerInterface;
/**
* @Command
*/
class TestCommand extends HyperfCommand
{
/**
* @var ContainerInterface
*/
protected $container;
/**
* @Inject()
* @var Producer
*/
protected $producer;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
parent::__construct(‘t‘);
}
public function configure()
{
$this->setDescription(‘Hyperf Demo Command‘);
}
public function handle()
{
$this->producer->produce(new DemoProducer(‘test‘. date(‘Y-m-d H:i:s‘)));
}
}
愉快的玩耍起来:
# produce
php bin/hyperf.php t
# consume
php bin/hyperf.php start # 会使用 swoole process 启动 DemoConsumer
# 也可以访问 rabbitmq admin 控制台
http://localhost:15672
跟着 rabbitmq 官网 tutorial, 见识一下 hyperf 中的 amqp 有多简单
// consumer
/**
* @Consumer()
*/
class DemoConsumer extends ConsumerMessage
{
protected $exchange = ‘hello‘;
protected $type = Type::FANOUT;
protected $queue = ‘hello‘;
public function consume($data): string
{
var_dump($data);
return Result::ACK;
}
}
// producer
/**
* @Producer()
*/
class DemoProducer extends ProducerMessage
{
protected $exchange = ‘hello‘;
protected $type = Type::FANOUT;
protected $routingKey = ‘hello‘;
public function __construct($data)
{
$this->payload = $data;
}
}
work queue
设置一下 nums 参数, 就可以多进程.
// Consumer
/**
* @Consumer(nums=2)
*/
class DemoConsumer extends ConsumerMessage
{
protected $exchange = ‘task‘;
protected $type = Type::FANOUT;
protected $queue = ‘task‘;
public function consume($data): string
{
var_dump($data);
return Result::ACK;
}
}
// producer
/**
* @Producer()
*/
class DemoProducer extends ProducerMessage
{
protected $exchange = ‘task‘;
protected $type = Type::FANOUT;
protected $routingKey = ‘task‘;
public function __construct($data)
{
$this->payload = $data;
}
}
pub/sub
和上面的 hello world 一致
routing
终于看到 routing_key 的作用了
// consumer
/**
* @Consumer()
*/
class DemoConsumer extends ConsumerMessage
{
protected $exchange = ‘routing‘;
protected $type = Type::DIRECT;
// 这个 consumer 只消费 error 级别的日志
protected $queue = ‘routing.error‘;
protected $routingKey = ‘error‘;
public function consume($data): string
{
var_dump($data);
return Result::ACK;
}
}
/**
* @Consumer()
*/
class Demo2Consumer extends ConsumerMessage
{
protected $exchange = ‘routing‘;
protected $type = Type::DIRECT;
// 这个 consumer 消费所有级别的日志
protected $queue = ‘routing.all‘;
protected $routingKey = [
‘info‘,
‘warning‘,
‘error‘,
];
public function consume($data): string
{
var_dump($data);
return Result::ACK;
}
}
// producer
/**
* @Producer()
*/
class DemoProducer extends ProducerMessage
{
protected $exchange = ‘routing‘;
protected $type = Type::DIRECT;
public function __construct($data, $routingKey)
{
$this->routingKey = $routingKey;
$this->payload = $data;
}
}
// produce
$this->producer->produce(new DemoProducer(‘info‘. date(‘Y-m-d H:i:s‘), ‘info‘));
$this->producer->produce(new DemoProducer(‘warning‘. date(‘Y-m-d H:i:s‘), ‘warning‘));
$this->producer->produce(new DemoProducer(‘error‘. date(‘Y-m-d H:i:s‘), ‘error‘));
var_dump(‘done‘);
topics
和的, 和上面的 routing 差不多
// consume
/**
* @Consumer()
*/
class DemoConsumer extends ConsumerMessage
{
protected $exchange = ‘topics‘;
protected $type = Type::TOPIC;
protected $queue = ‘topics.t1‘;
// protected $routingKey = ‘#‘; // all
// protected $routingKey = ‘kern.*‘;
// protected $routingKey = ‘*.critical‘;
// protected $routingKey = ‘kern.critical‘;
protected $routingKey = [
‘kern.*‘,
‘*.critical‘,
];
public function consume($data): string
{
var_dump($data);
return Result::ACK;
}
}
// produce
/**
* @Producer()
*/
class DemoProducer extends ProducerMessage
{
protected $exchange = ‘topics‘;
protected $type = Type::TOPIC;
public function __construct($data, $routingKey)
{
$this->routingKey = $routingKey;
$this->payload = $data;
}
}
标签:exchange host this moc result 参数 使用 esc data
原文地址:https://www.cnblogs.com/kinwing/p/13605680.html