码迷,mamicode.com
首页 > 编程语言 > 详细

Python-RabbitMQ消息队列

时间:2018-02-23 23:46:00      阅读:275      评论:0      收藏:0      [点我收藏+]

标签:ima   connect   service   import   sum   语言   erlang   ann   png   

python中的线程queue可以实现不同线程间的通信,进程queue可以实现python不同进程间的通信

RabbitMQ消息队列就相当于中间人,可以实现独立进程间的通信,也可以实现在不同编程语言中进行通信

技术分享图片

windows环境下安装完成RabbitMQ后,输入cmd命令services.msc,然后在服务中开启RabbitMQ的服务,使用RabbitMQ要安装Erlang语言环境

技术分享图片

Ubuntu环境下安装RabbitMQ

sch01ar@ubuntu:~$ sudo apt install rabbitmq-server
sch01ar@ubuntu:~$ sudo rabbitmq-server start

RabbitMQ默认的监听端口为5672

发送消息端

# -*- coding:utf-8 -*-
__author__ = "MuT6 Sch01aR"

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘))  # 建立一个socket

channel = connection.channel()  # 声明一个管道,在管道里进行通信

channel.queue_declare(queue=‘q‘)  # 在管道里声明一个名q为queue

channel.basic_publish(  # 发送消息
                      exchange=‘‘,
                      routing_key=‘q‘,  # queue名字
                      body=‘Hello World!‘,  # 要发送的消息
                      )

print(‘数据发送完成‘)
connection.close()  # 关闭队列

接收消息端

# -*- coding:utf-8 -*-
__author__ = "MuT6 Sch01aR"

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘))

channel = connection.channel()  # 声明一个管道,在管道里进行通信

channel.queue_declare(queue=‘q‘)  # 多声明一个queue,防止当此程序比发送消息端先启动时报错

def callback(ch, method, properties, body):
    print(ch, method, properties, body)
    # ch为管道的内存地址,method为关于queue队列的一些信息,body为消息内容
    print(‘收到数据:‘, body)

channel.basic_consume(  # basic_consume开始消费消息
                      callback,  # 如果收到消息就调用callback函数来处理消息
                      queue=‘q‘,  # 从q队列里接收消息
                      no_ack=True,  # 是否不确认消息是否处理完,默认为False
                     )

print(‘开始等待消息‘)

channel.start_consuming()  # 开始接收消息,如果没有消息就会卡在这,直到有消息

开启三个接收消息端

技术分享图片

发送消息端发送一个消息,最先开启的接收消息端先收到消息

技术分享图片

发送消息端再发送消息的话,接收消息的就是第二开启的接收消息端,然后是第三个接收消息端,之后再是第一个

RabbitMQ会轮询发消息

RabbitMQ安装目录下的skin目录里rabbitmqctl.bat可以查看当前队列情况

rabbitmqctl.bat list_queues

技术分享图片

接收消息端处理消息时要跟服务器端确认消息处理的情况,以防止接收消息端在处理消息时突然停止运行导致消息丢失

发送消息端

# -*- coding:utf-8 -*-
__author__ = "MuT6 Sch01aR"

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘))  # 建立一个socket

channel = connection.channel()  # 声明一个管道,在管道里进行通信

channel.queue_declare(queue=‘q‘)  # 在管道里声明一个名q为queue

channel.basic_publish(  # 发送消息
                      exchange=‘‘,
                      routing_key=‘q‘,  # queue名字
                      body=‘Hello World!‘,  # 要发送的消息
                      )

print(‘数据发送完成‘)
connection.close()  # 关闭队列

接收消息端

# -*- coding:utf-8 -*-
__author__ = "MuT6 Sch01aR"

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘))

channel = connection.channel()  # 声明一个管道,在管道里进行通信

channel.queue_declare(queue=‘q‘)  # 多声明一个queue,防止当此程序比生产者程序先启动时报错

def callback(ch, method, properties, body):
    print(ch, method, properties, body)
    # ch为管道的内存地址,method为关于queue队列的一些信息,body为消息内容
    time.sleep(20)
    print(‘收到数据:‘, body)
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 跟服务器端确认消息已经处理完成
    print(‘消息处理完成‘)

channel.basic_consume(  # basic_consume开始消费消息
                      callback,  # 如果收到消息就调用callback函数来处理消息
                      queue=‘q‘,  # 从q队列里接收消息
                     )

print(‘开始等待消息‘)

channel.start_consuming()  # 开始接收消息,如果没有消息就会卡在这,直到有消息

开启3个消息接收端,1个发送消息端

技术分享图片

开启3个接收消息端,等待接收消息

技术分享图片

发送消息端发送消息,第一个启动的接收消息端接收到消息

技术分享图片

然后关掉第一个接收消息端,第二个启动的接收消息端收到消息

技术分享图片

然后是第三个,第三个之后还是第一个,除非消息处理完成

消息持久化

如果接收消息端正在接收消息的时候,服务器端(RabbitMQ)断了,接收消息端就会报错,消息就会丢失

技术分享图片

如果不想服务器端突然断开而导致消息丢失,可以使消息持久化

发送消息端

# -*- coding:utf-8 -*-
__author__ = "MuT6 Sch01aR"

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘))  # 建立一个socket

channel = connection.channel()  # 声明一个管道,在管道里进行通信

channel.queue_declare(queue=‘q‘, durable=True)  # 在管道里声明一个名q为queue,durable为队列持久化

channel.basic_publish(  # 发送消息
                      exchange=‘‘,
                      routing_key=‘q‘,  # queue名字
                      body=‘Hello World!‘,  # 要发送的消息
                      properties=pika.BasicProperties(delivery_mode=2)  # 使消息持久化
                      )
print(‘数据发送完成‘)
connection.close()  # 关闭队列

接收消息端

# -*- coding:utf-8 -*-
__author__ = "MuT6 Sch01aR"

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘))

channel = connection.channel()  # 声明一个管道,在管道里进行通信

channel.queue_declare(queue=‘q‘,durable=True)  # durable为队列持久化

def callback(ch, method, properties, body):
    print(ch, method, properties, body)
    # ch为管道的内存地址,method为关于queue队列的一些信息,body为消息内容
    time.sleep(20)
    print(‘收到数据:‘, body)
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 跟服务器端确认消息已经处理完成
    print(‘消息处理完成‘)

channel.basic_consume(  # basic_consume开始消费消息
                      callback,  # 如果收到消息就调用callback函数来处理消息
                      queue=‘q‘,  # 从q队列里接收消息
                     )

print(‘开始等待消息‘)

channel.start_consuming()  # 开始接收消息,如果没有消息就会卡在这,直到有消息

这样的话,即使服务器端断开了,队列和消息也还会在

技术分享图片

如果没有使队列和消息持久化的话,服务器端重启后,队列和消息就没了

技术分享图片

 

Python-RabbitMQ消息队列

标签:ima   connect   service   import   sum   语言   erlang   ann   png   

原文地址:https://www.cnblogs.com/sch01ar/p/8463618.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!