码迷,mamicode.com
首页 > 其他好文 > 详细

rabbitmq

时间:2020-11-18 12:27:34      阅读:7      评论:0      收藏:0      [点我收藏+]

标签:持久   方法   pre   对象   back   live   sage   etc   block   

  1. rabbitmq安装

    • 使用docker搜索、拉取镜像、运行为容器

      docker search rabbitmq
      docker pull rabbitmq   若不指定版本,默认拉取最新的版本
      docker run -d --name rabbit -p 5672:5672 -p 15672:15672 --hostname my-rabbit -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123 458123c67b79 最后为rabbit的镜像ID
      
  2. RabbitMQ概述

    • 有一些业务逻辑是不能立刻完成,会阻塞程序,类似于发送短信、邮件,这些都需要服务器给第三方发起请求,不能立刻得到结果。这就会造成用户操作过程停滞,所以解决办法就是将这些耗时操作,放到队列中去异步执行。
    • 有三个重要的组成部分,生产者、队列、消费者。生产者就是web后端程序,队列使用rabbitmq,消费者就是向第三方发起请求。
  3. RabbitMQ整体架构

    • rabbitmq-server内部包括两部分:
      • 交换机 将生产者传递过来的消息,根据自身的要求,转发给队列
      • 队列(先进先出) 队列与消费者事先有协议,消费者会从固定的队列中取得要执行的任务。
  4. 生产者的实现

    • 整个过程需要5步
    import pika
    
    # 1. 获得与rabbitmq代理连接对象
    connection_host = ‘192.168.1.38‘
    connection_credentials = pika.PlainCredentials(‘root‘, ‘123‘)
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host=connection_host, credentials=connection_credentials))
    
    # 2. 通过连接对象,获得channel(管道)对象,用于操作rabbitmq
    channel = connection.channel()
    
    # 3. 创建名字my_queue的消息队列,如果存在不创建
    channel.queue_declare(‘my_queue‘)
    
    # 4. 发送消息到rabbitmq中名字为my_queue的队列中
    channel.basic_publish(exchange=‘‘, routing_key=‘my_queue‘, body=‘hello word‘)
    
    # 5. 关闭和rabbitmq代理的连接
    connection.close()
    
    print(‘消息发送完毕!‘)
    
  5. 消费者的实现

    • 最后一步开启监听后,队列中一旦加入任务,就会被消费者取走,并执行处理函数。
    import pika
    
    # 1. 获取与rabbitmq的连接对象
    connection_host = ‘192.168.1.38‘
    connection_credentials = pika.PlainCredentials(‘root‘, ‘123‘)
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=connection_host, credentials=connection_credentials))
    
    # 2. 通过连接对象获得channel对象,用于操作rabbitmq
    channel = connection.channel()
    
    # 3. 创建名字为my_queue的消息队列,如果不存在就创建
    channel.queue_declare(queue=‘my_queue‘)
    
    
    # 4. 按照rabbitmq要求定义消息处理函数
    def callback(ch, method, properties, body):
        print(‘接收到的消息是:‘, body)
        # 此刻接收到的消息是二进制格式
    
    # 5. 关联队列,并设置队列中的消息处理函数
    # channel.basic_consume(callback, queue=‘my_queue‘, no_ack=True)  以前版本的写法,后改为下面方式
    channel.basic_consume(‘my_queue‘, callback, False)
    
    # 6. 启动并开始处理消息,该程序会一直运行,进行监听
    channel.start_consuming()
    
    
  6. 任务队列

    task.py文件中内容
    
    import time
    
    def send_email():
        print(‘开始发送邮件‘)
        time.sleep(3)
        print(‘邮件发送完毕‘)
    
    
    def send_message():
        print(‘开始发送短信‘)
        time.sleep(3)
        print(‘短信发送完毕‘)
        
        
    consumer.py文件中内容
    task_list = {
        ‘email‘: task.send_email,
        ‘message‘: task.send_message
    }
    # 任务后不可以加(),否则会立即执行
    # 4. 按照rabbitmq要求定义消息处理函数
    def callback(ch, method, properties, body):
        task_name = body.decode()
        # 判断生产者发送的消息是否在消费者中注册过,如果没有注册过就提示错误
        if task_name not in task_list:
            print(‘error:{}任务没有注册‘.format(task_name))
            return
        task_list[task_name]()
    
    producter.py文件中的内容
    # 4. 发送消息到rabbitmq中名字为my_queue的队列中
    channel.basic_publish(exchange=‘‘, routing_key=‘work_queue‘, body=‘message‘)
    只需要改变body中的内容,就可以生产出不同的任务到队列中去
    
  7. 消息确认机制

    1. 原因:会出现的意外情况:消费者取到任务以后,并未执行完成任务,就死了。

    2. rabbitmq默认会在将消息发送给消费者以后,会将任务从队列中删掉。

    3. 使用消息确认机制,若消费者意外死亡,则不能给队列反馈,队列就不会删除被该消费者取走的任务。

    4. 实现方法:

      def callback(ch, method, properties, body):
          task_name = body.decode()
          # 判断生产者发送的消息是否在消费者中注册过,如果没有注册过就提示错误
          if task_name not in task_list:
              print(‘error:{}任务没有注册‘.format(task_name))
              return
          task_list[task_name]()
          # 消息处理完成后,确认消息
          ch.basic_ack(delivery_tag=method.delivery_tag)
          
         
      channel.basic_consume(‘work_queue‘, callback, True)
      
  8. 循环调度机制

    1. 默认是循环调度机制,就是消费之轮流去队列中取任务
  9. 公平调度机制---在consumer文件中设置公平调度

    1. 消息确认机制打开

    2. 设置公平调度机制,消费者不确认,就不要再给该消费之任务了,因为他目前的耗时任务还在执行,可以把任务给其他已经确认了的消费者。

      在消费者中设置公平调度机制
      channel.basic_qos(prefetch_count=1)
      
  10. 队列及其中的消息持久化---在product文件中设置队列及消息持久化

    1. 重启rabbitmq服务器会使队列和任务消失

    2. 解决方法:

      • 在product文件中,生成队列的代码中加参数

        # 3. 创建名字my_queue的消息队列,如果存在不创建
        channel.queue_declare(‘work_queue‘, durable=True)
        durable=True就可以保持,队列的持久化
        
        # 4. 发送消息到rabbitmq中名字为my_queue的队列中
        channel.basic_publish(exchange=‘‘, routing_key=‘work_queue‘, body=‘message‘, properties=pika.BasicProperties(delivery_mode=2))
        # properties=pika.BasicProperties(delivery_mode=2)可以保持消息持久化
        
        
  11. 交换机的三种模式

    生产者
    import pika
    
    # 1. 获得与rabbitmq代理连接对象
    connection_host = ‘192.168.1.38‘
    connection_credentials = pika.PlainCredentials(‘root‘, ‘123‘)
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host=connection_host, credentials=connection_credentials))
    
    # 2. 通过连接对象,获得channel(管道)对象,用于操作rabbitmq
    channel = connection.channel()
    
    # 3. 创建名字my_queue的消息队列,如果存在不创建
    channel.queue_declare(‘work_queue‘, durable=True)
    
    # 4. 发送消息到rabbitmq中名字为my_queue的队列中
    channel.basic_publish(exchange=‘‘, routing_key=‘work_queue‘, body=‘message‘, properties=pika.BasicProperties(delivery_mode=2))
    # 只要生产者,发送不同的body,就会消费者中的处理函数,就会调用不同的task
    
    # 5. 关闭和rabbitmq代理的连接
    connection.close()
    
    print(‘消息发送完毕!‘)
    
    
    
    消费者
    import pika
    import task
    
    task_list = {
        ‘email‘: task.send_email,
        ‘message‘: task.send_message
    }
    # 任务后不可以加(),否则会立即执行
    
    # 1. 获取与rabbitmq的连接对象
    connection_host = ‘192.168.1.38‘
    connection_credentials = pika.PlainCredentials(‘root‘, ‘123‘)
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=connection_host, credentials=connection_credentials))
    
    # 2. 通过连接对象获得channel对象,用于操作rabbitmq
    channel = connection.channel()
    
    # 3. 创建名字为my_queue的消息队列,如果不存在就创建
    channel.queue_declare(queue=‘work_queue‘)
    
    
    # 4. 按照rabbitmq要求定义消息处理函数
    def callback(ch, method, properties, body):
        task_name = body.decode()
        # 判断生产者发送的消息是否在消费者中注册过,如果没有注册过就提示错误
        if task_name not in task_list:
            print(‘error:{}任务没有注册‘.format(task_name))
            return
        task_list[task_name]()
        # 消息处理完成后,确认消息
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    # 设置公平调度机制
    channel.basic_qos(prefetch_count=1)
    
    # 5. 关联队列,并设置队列中的消息处理函数
    # channel.basic_consume(callback, queue=‘my_queue‘, no_ack=True)  以前版本的写法,后改为下面方式
    channel.basic_consume(‘work_queue‘, callback, False)
    
    # 6. 启动并开始处理消息
    channel.start_consuming()
    
    
    任务
    import time
    
    def send_email():
        print(‘开始发送邮件‘)
        time.sleep(1)
        print(‘邮件发送完毕‘)
    
    
    def send_message():
        print(‘开始发送短信‘)
        time.sleep(1)
        print(‘短信发送完毕‘)

rabbitmq

标签:持久   方法   pre   对象   back   live   sage   etc   block   

原文地址:https://www.cnblogs.com/yuyangbk/p/13963363.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!