码迷,mamicode.com
首页 > 其他好文 > 详细

RabbitMQ基础知识

时间:2018-08-03 00:54:35      阅读:221      评论:0      收藏:0      [点我收藏+]

标签:start   sage   输入   types   结果   queue   阻塞   python   用户   

RabbitMQ

关键在于消息的发布与消费、消息的路由。

在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key,可以视作Queue的name,
消费者将消息发送给Exchange时,一般会指定一个routing key
当binding key 与 routing key 相匹配时,消息就会被路由到对应的Queue中。

Exchange Types
fanout fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
direct direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。
topic 与direct类似,但是是模糊匹配,*”用于匹配一个单词,“#”用于匹配多个单词
binding key 类似 *.*.rabbit,routing key 为quick.orange.rabbit的消息会被路由到该Queue
headers headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。


对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

参考:http://www.diggerplus.org/archives/3110


开启RabbitMQ后台管理:
1.在rabbitMQ安装目录下的sbin目录,打开终端执行:rabbitmq-plugins.bat enable rabbitmq_management开启网页管理界面,然后重启rabbitMQ
2.浏览器中输入http://localhost:15672/
3.输入用户名和密码(默认为guest)

 

生产者

import pika

#########  生产者 #########
# 链接rabbit服务器(localhost是本机,如果是其他服务器请修改为ip地址)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
# 创建频道
channel = connection.channel()
# 创建一个队列名叫test
channel.queue_declare(queue=test)

# channel.basic_publish向队列中发送信息
# exchange -- 它使我们能够确切地指定消息应该到哪个队列去。
# routing_key 指定向哪个队列中发送消息
# body是要插入的内容, 字符串格式

while True:  # 循环向队列中发送信息,quit退出程序
    inp = input(">>>").strip()
    if inp == quit:
        break
    channel.basic_publish(exchange=‘‘,
                          routing_key=test,
                          body=inp)
    print("生产者向队列发送信息%s" % inp)

# 缓冲区已经flush而且消息已经确认发送到了RabbitMQ中,关闭链接
connection.close()

# 输出结果
# >> > python
# 生产者向队列发送信息python
# >> > quit

 

消费者

#!/usr/bin/env python 3
import pika

######### 消费者 #########
# 链接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
# 创建频道
channel = connection.channel()
# 如果生产者没有运行创建队列,那么消费者也许就找不到队列了。为了避免这个问题,所有消费者也创建这个队列,如果队列已经存在,则这条无效
channel.queue_declare(queue=test)


# 接收消息需要使用callback这个函数来接收,他会被pika库来调用,接受到的数据都是字节类型的
def callback(ch, method, properties, body):
    """
        ch : 代表 channel
        method :队列名
        properties : 连接rabbitmq时设置的属性
        body : 从队列中取到的内容,获取到的数据时字节类型
    """


    print(" [x] Received %r" % body)
# channel.basic_consume 表示从队列中取数据,如果拿到数据 那么将执行callback函数,callback是回调函数
# no_ack=True 表示消费完这个消息以后不主动把完成状态通知rabbitmq
channel.basic_consume(callback,
                      queue=test,
                      no_ack=True)
print( [*] 等待信息. To exit press CTRL+C)
# 永远循环等待数据处理和callback处理的数据,start_consuming方法会阻塞循环执行
channel.start_consuming()

# 输出结果,一直等待处理队列中的消息,不知终止,除非人为ctrl+c
#  [*]等待消息,To exit press CTRL+C
#  [x] Received b‘python‘

 

消费者acknowledgement消息不丢失的方法

# no_ack = False , 如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。在消费者端做设定条件。
# 生产者,代码同上,未改变
# 消费者代码


import pika
import time

# 链接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
# 创建频道
channel = connection.channel()
# 如果生产者没有运行创建队列,那么消费者创建队列,如果队列已存在,创建队列操作会被忽略
channel.queue_declare(queue=test)


# 回调函数
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(10)
    print(ok)
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 当上面消息处理完成后,通知rabbitmq,消息处理完成,不要在发送了


channel.basic_consume(callback,
                      queue=test,
                      no_ack=False)  # 表示消费完这个消息后,主动通知rabbitmq完成状态,如果不通知,rabbitmq会把这条消息重新放回队列中,避免丢失

print( [*] Waiting for messages. To exit press CTRL+C)
channel.start_consuming()

 

RabbitMQ基础知识

标签:start   sage   输入   types   结果   queue   阻塞   python   用户   

原文地址:https://www.cnblogs.com/jec1999/p/9410841.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!