码迷,mamicode.com
首页 > 其他好文 > 详细

AMQP

时间:2014-06-20 13:42:54      阅读:227      评论:0      收藏:0      [点我收藏+]

标签:php amqp   amqpconnection   amqpchannel   amqpexchange   amqpqueue   

<?php 
/*
 * 消息队列
 * @access Queue.php
 * @author liuhaipeng <liuhaipeng@dhgate.com>
 * @create 2013-10-17 上午10:42:11
 * @version 1.0
 * @copyright Copyright &copy; 2013, DHgate.com
 */


class Scrm_Queue {
protected static $connect = NULL;
protected static $channel = NULL;
protected static $exchangeName = NULL;
protected static $queue = NULL;
protected static $queueName = NULL;

public function __construct($config, $queueName = NULL, $exchangeName = NULL) {
self::$queueName = $queueName;
self::$exchangeName = $exchangeName;

// 创建amqpconnection实例
$connect = new AMQPConnection($config);
if (! $connect->connect()) {
die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($connect);

self::$connect = &$connect;
self::$channel = &$channel;
}

public function __destruct() {
self::$connect->disconnect();
}

/**
* 写入消息队列
* @param unknown $message
*/
public function setQueue($message = array(), $routing_key = ‘key_1‘) {
// 创建交换机
$exchange = new AMQPExchange(self::$channel);
$exchange->setName(self::$exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); // 交换器进行持久化,即 RabbitMQ 重启后会自动重建
$exchange->declare();

// 创建队列
$queue = new AMQPQueue(self::$channel);
$queue->setName(self::$queueName);
$queue->setFlags(AMQP_DURABLE);
// $queue->declare(); // AMQP 1.2.0 由 declare() 改为 declareQueue()

// 绑定交换机与队列,并指定路由键
$queue->bind($exchange->getName(), $routing_key);

$message = json_encode($message);
return $exchange->publish($message, $routing_key);
}

/**
* 读取消息队列中未处理的消息
* @return multitype:mixed
*/
public function getQueue() {
$queue = new AMQPQueue(self::$channel);
$queue->setName(self::$queueName);
$queue->setFlags(AMQP_DURABLE); // 持久化
// $messageCount = $queue->declare();

$result = array();
$i = 0;
// AMQP_NOPARAM / AMQP_AUTOACK
while($message = $queue->get(AMQP_NOPARAM)) {
$getBody = $message->getBody();
$result[$i] = json_decode($getBody, true);
$queue->ack($message->getDeliveryTag());

$i++;
if ($i >= 500) break;
}
return $result;
}
}

AMQP,布布扣,bubuko.com

AMQP

标签:php amqp   amqpconnection   amqpchannel   amqpexchange   amqpqueue   

原文地址:http://blog.csdn.net/peng905/article/details/28425699

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!