码迷,mamicode.com
首页 > 其他好文 > 详细

Hyperf AMQP -- 简单实例

时间:2020-09-16 12:43:45      阅读:131      评论:0      收藏:0      [点我收藏+]

标签:exchange   host   this   moc   result   参数   使用   esc   data   

# producer
php bin/hyperf.php gen:amqp-producer DemoProducer

# consumer
php bin/hyperf.php gen:amqp-consumer DemoConsumer

# 使用 command 盗用 DemoProducer 进行验证
php bin/hyperf.php gen:command TestCommand

producer 发个消息:

  • 设置 command 的名字: parent::__construct(‘t‘);
  • 使用 @Inject() 注解注入
  • 发消息, 一行搞定: $this->producer->produce(new DemoProducer(‘test‘. date(‘Y-m-d H:i:s‘)));
<?php

declare(strict_types=1);

namespace App\Command;

use App\Amqp\Producer\DemoProducer;
use Hyperf\Amqp\Producer;
use Hyperf\Command\Command as HyperfCommand;
use Hyperf\Command\Annotation\Command;
use Hyperf\Di\Annotation\Inject;
use Psr\Container\ContainerInterface;

/**
 * @Command
 */
class TestCommand extends HyperfCommand
{
    /**
     * @var ContainerInterface
     */
    protected $container;

    /**
     * @Inject()
     * @var Producer
     */
    protected $producer;

    public function __construct(ContainerInterface $container)
    {
        $this->container = $container;

        parent::__construct(‘t‘);
    }

    public function configure()
    {
        $this->setDescription(‘Hyperf Demo Command‘);
    }

    public function handle()
    {
        $this->producer->produce(new DemoProducer(‘test‘. date(‘Y-m-d H:i:s‘)));
    }
}

  

愉快的玩耍起来:

# produce
php bin/hyperf.php t

# consume
php bin/hyperf.php start # 会使用 swoole process 启动 DemoConsumer

# 也可以访问 rabbitmq admin 控制台
http://localhost:15672

撸一撸 rabbitmq 官网 tutorial

跟着 rabbitmq 官网 tutorial, 见识一下 hyperf 中的 amqp 有多简单

// consumer
/**
 * @Consumer()
 */
class DemoConsumer extends ConsumerMessage
{
    protected $exchange = ‘hello‘;
    protected $type = Type::FANOUT;
    protected $queue = ‘hello‘;

    public function consume($data): string
    {
        var_dump($data);
        return Result::ACK;
    }
}

// producer
/**
 * @Producer()
 */
class DemoProducer extends ProducerMessage
{
    protected $exchange = ‘hello‘;
    protected $type = Type::FANOUT;
    protected $routingKey = ‘hello‘;
    public function __construct($data)
    {
        $this->payload = $data;
    }
}
work queue
设置一下 nums 参数, 就可以多进程.


// Consumer
/**
 * @Consumer(nums=2)
 */
class DemoConsumer extends ConsumerMessage
{
    protected $exchange = ‘task‘;
    protected $type = Type::FANOUT;
    protected $queue = ‘task‘;

    public function consume($data): string
    {
        var_dump($data);
        return Result::ACK;
    }
}

// producer
/**
 * @Producer()
 */
class DemoProducer extends ProducerMessage
{
    protected $exchange = ‘task‘;
    protected $type = Type::FANOUT;
    protected $routingKey = ‘task‘;
    public function __construct($data)
    {
        $this->payload = $data;
    }
}
pub/sub
和上面的 hello world 一致

routing
终于看到 routing_key 的作用了


// consumer
/**
 * @Consumer()
 */
class DemoConsumer extends ConsumerMessage
{
    protected $exchange = ‘routing‘;
    protected $type = Type::DIRECT;
    // 这个 consumer 只消费 error 级别的日志
    protected $queue = ‘routing.error‘;
    protected $routingKey = ‘error‘;

    public function consume($data): string
    {
        var_dump($data);
        return Result::ACK;
    }
}

/**
 * @Consumer()
 */
class Demo2Consumer extends ConsumerMessage
{
    protected $exchange = ‘routing‘;
    protected $type = Type::DIRECT;
    // 这个 consumer 消费所有级别的日志
    protected $queue = ‘routing.all‘;
    protected $routingKey = [
        ‘info‘,
        ‘warning‘,
        ‘error‘,
    ];

    public function consume($data): string
    {
        var_dump($data);
        return Result::ACK;
    }
}

// producer
/**
 * @Producer()
 */
class DemoProducer extends ProducerMessage
{
    protected $exchange = ‘routing‘;
    protected $type = Type::DIRECT;
    public function __construct($data, $routingKey)
    {
        $this->routingKey = $routingKey;
        $this->payload = $data;
    }
}

// produce
$this->producer->produce(new DemoProducer(‘info‘. date(‘Y-m-d H:i:s‘), ‘info‘));
$this->producer->produce(new DemoProducer(‘warning‘. date(‘Y-m-d H:i:s‘), ‘warning‘));
$this->producer->produce(new DemoProducer(‘error‘. date(‘Y-m-d H:i:s‘), ‘error‘));
var_dump(‘done‘);
topics
和的, 和上面的 routing 差不多


// consume
/**
 * @Consumer()
 */
class DemoConsumer extends ConsumerMessage
{
    protected $exchange = ‘topics‘;
    protected $type = Type::TOPIC;
    protected $queue = ‘topics.t1‘;
    // protected $routingKey = ‘#‘; // all
    // protected $routingKey = ‘kern.*‘;
    // protected $routingKey = ‘*.critical‘;
    // protected $routingKey = ‘kern.critical‘;
    protected $routingKey = [
        ‘kern.*‘,
        ‘*.critical‘,
    ];

    public function consume($data): string
    {
        var_dump($data);
        return Result::ACK;
    }
}

// produce
/**
 * @Producer()
 */
class DemoProducer extends ProducerMessage
{
    protected $exchange = ‘topics‘;
    protected $type = Type::TOPIC;
    public function __construct($data, $routingKey)
    {
        $this->routingKey = $routingKey;
        $this->payload = $data;
    }
}

  

Hyperf AMQP -- 简单实例

标签:exchange   host   this   moc   result   参数   使用   esc   data   

原文地址:https://www.cnblogs.com/kinwing/p/13605680.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!