码迷,mamicode.com
首页 > 其他好文 > 详细

封装rabbitmq

时间:2019-08-31 16:56:46      阅读:71      评论:0      收藏:0      [点我收藏+]

标签:测试   direct   rect   cti   接收   今天   color   roc   队列   

今天又抽时间用php封装了rabbitmq,使用的框架是yaf

Consumer如下:

<?php

namespace RabbitMq;
class Consumer
{
    public $exchange_name = "exchange_1";
    public $route_name = "route_1";
    public $queue_name = "queue_1";
    public $conn = null;
    public $channel = null;
    public $exchange = null;
    public $queue = null;

    public function __construct(string $exchange_name = "", string $route_name = "", string $queue_name = "")
    {
        if ($exchange_name) $this->exchange_name = $exchange_name;
        if ($route_name) $this->route_name = $route_name;
        if ($queue_name) $this->queue_name = $queue_name;
        $this->init();
//        $this->createChannel();
//        $this->createQueue();
    }

    public function init()
    {
        //创建连接和channel
        $this->conn = new \AMQPConnection(MqConfig::$config);
        if (!$this->conn->connect()) {
            die("Cannot connect to the broker!\n");
        }

    }

    public function createChannel()
    {
        $this->channel = new \AMQPChannel($this->conn);

        //创建交换机
        $this->exchange = new \AMQPExchange($this->channel);
        $this->exchange->setName($this->exchange_name);
        $this->exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
        $this->exchange->setFlags(AMQP_DURABLE); //持久化
        //echo "Exchange Status:" . $this->exchange->declare() . "\n";


    }

    public function createQueue()
    {
        //创建队列
        $this->queue = new \AMQPQueue($this->channel);
        $this->queue->setName($this->queue_name);
        $this->queue->setFlags(AMQP_DURABLE); //持久化
        //echo "Message Total:" . $this->queue->declare() . "\n";
        //绑定交换机与队列,并指定路由键
        echo ‘Queue Bind: ‘ . $this->queue->bind($this->exchange_name, $this->route_name) . "\n";

        //阻塞模式接收消息
        echo "接收到的消息:\n";
        while (True) {
            $this->queue->consume(function ($envelope, $queue) {
                $msg = $envelope->getBody();
                echo $msg . "\n"; //处理消息
                $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
            });
            //$q->consume(‘processMessage‘, AMQP_AUTOACK); //自动ACK应答
        }
        $this->conn->disconnect();
    }
}
Publisher如下
<?php

namespace RabbitMq;
class Publisher
{
    public $exchange_name = "exchange_1";
    public $route_name = "route_1";
    public $conn = null;
    public $channel = null;
    public $exchange = null;

    public function __construct(string $exchange_name = "", string $route_name = "")
    {
        if ($exchange_name) $this->exchange_name = $exchange_name;
        if ($route_name) $this->route_name = $route_name;
        $this->init();
    }

    public function init()
    {
        //创建连接和channel
        $this->conn = new \AMQPConnection(MqConfig::$config);
        if (!$this->conn->connect()) {
            die("Cannot connect to the broker!\n");
        }

    }

    public function createChannel()
    {
        $this->channel = new \AMQPChannel($this->conn);
        //创建交换机对象
        $this->exchange = new \AMQPExchange($this->channel);
        $this->exchange->setName($this->exchange_name);
    }

    public function publishMsg()
    {
        for ($i = 0; $i < 5; ++$i) {
            sleep(1);//休眠1秒
            //发送的消息内容
            $message = "测试消息,你好啊!" . date("h:i:s");
            echo "发送消息:哈哈哈:" . $this->exchange->publish($message, $this->route_name) . "\n";
        }
        $this->conn->disconnect();
    }
}

简单调用:

调用consumer:

$consumer = new \RabbitMq\Consumer();
$consumer->createChannel();
$consumer->createQueue();

调用Publisher:

$publisher = new \RabbitMq\Publisher();
$publisher->createChannel();
$publisher->publishMsg();

 

封装rabbitmq

标签:测试   direct   rect   cti   接收   今天   color   roc   队列   

原文地址:https://www.cnblogs.com/allen-spot/p/11439471.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!