码迷,mamicode.com
首页 > 其他好文 > 详细

redis,rabbitmq

时间:2016-11-15 01:09:42      阅读:181      评论:0      收藏:0      [点我收藏+]

标签:表达   require   current   负载   wait   default   nbsp   重新定义   font   

 

1、redis

 

2、rabbitmq

 

安装python rabbitmq module

 

pip  install pika

 

  • 最简单的消息队列通信

技术分享

 

生产者生产消息向hello队列发送消息,消费者从队列hello接受消息进行消费

 

producer:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
channel = connection.channel()

channel.queue_declare(queue=‘hello‘)

channel.basic_publish(exchange=‘‘,
                      routing_key=‘hello‘,
                      body=‘Hello World!‘)
print(" [x] Sent ‘Hello World!‘")
connection.close()

  

 

 consumer:

 

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
channel = connection.channel()

channel.queue_declare(queue=‘hello‘)

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue=‘hello‘,
                      no_ack=True)

print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()

  

 

Now we can try out our programs in a terminal. First, let‘s send a message using our send.pyprogram:

 $ python send.py
 [x] Sent ‘Hello World!‘

The producer program send.py will stop after every run. Let‘s receive it:

 $ python receive.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received ‘Hello World!‘



  • work queue

技术分享

 

 在这种模式下,rabbitMQ会把p生产的消息分发给多个消费者,rabbitmq默认是轮询依次分发给每个消费者。

 

经常有如下需求:

When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren‘t lost: we need to mark both the queue and messages as durable.

也就是queue持久化和消息持久化。

 

queue持久化:

channel.queue_declare(queue=task_queue, durable=True)   #声明队列持久化。但是rabbitmq不支持对已经存在的queue的参数进行重新定义;只能声明新的queue进行持久化

 

消息持久化:

 

 

channel.basic_publish(exchange=‘‘,
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

 

 

 

消息确认:

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code once RabbitMQ delivers message to the customer it immediately removes it from memory. In this case, if you kill a worker we will lose the message it was just processing. We‘ll also lose all the messages that were dispatched to this particular worker but were not yet handled.

But we don‘t want to lose any tasks. If a worker dies, we‘d like the task to be delivered to another worker.

 

 

Message acknowledgments are turned on by default. In previous examples we explicitly turned them off via the no_ack=True flag. It‘s time to remove this flag and send a proper acknowledgment from the worker, once we‘re done with a task.

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count(.) )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue=hello)

 

 

 

公平分发:

 

 

如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了

This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don‘t dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.

 

channel.basic_qos(prefetch_count=1)

 

 

带queue持久化、消息持久化、消息确认,公平分发这些特性的代码示例

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=localhost))
channel = connection.channel()

channel.queue_declare(queue=task_queue, durable=True)

message =  .join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange=‘‘,
                      routing_key=task_queue,
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print(" [x] Sent %r" % message)
connection.close()

 

 

 

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=localhost))
channel = connection.channel()

channel.queue_declare(queue=task_queue, durable=True)
print( [*] Waiting for messages. To exit press CTRL+C)

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b.))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue=task_queue)

channel.start_consuming()

 

 

  • 发布与订阅:

 

The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn‘t even know if a message will be delivered to any queue at all.

Instead, the producer can only send messages to an exchange. An exchange is a very simple thing.

 

exchange 有多种类型,不同的类型使用不同的规则推送消息给队列。有如下几种类型:

 

fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的哪个queue可以接收消息

topic: 所有符合routingKey(此时可以是一个表达式)的queue可以接收消息,比direct进行更细致的过滤!

 

 

1)fanout:

 

发布者

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=localhost))
channel = connection.channel()

channel.exchange_declare(exchange=logs,
                         type=fanout)

message =  .join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange=logs,
                      routing_key=‘‘,
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

 

订阅者

 

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=localhost))
channel = connection.channel()

channel.exchange_declare(exchange=logs,
                         type=fanout)

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange=logs,
                   queue=queue_name)

print( [*] Waiting for logs. To exit press CTRL+C)

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

 

 

2)  direct

发布者

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=localhost))
channel = connection.channel()
 
channel.exchange_declare(exchange=direct_logs,
                         type=direct)
 
severity = sys.argv[1] if len(sys.argv) > 1 else info
message =  .join(sys.argv[2:]) or Hello World!
channel.basic_publish(exchange=direct_logs,
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

 

订阅者:

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=localhost))
channel = connection.channel()
 
channel.exchange_declare(exchange=direct_logs,
                         type=direct)
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
 
for severity in severities:
    channel.queue_bind(exchange=direct_logs,
                       queue=queue_name,
                       routing_key=severity)
 
print( [*] Waiting for logs. To exit press CTRL+C)
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

 

 

 topic

技术分享

 

 

发布者:

 

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=localhost))
channel = connection.channel()
 
channel.exchange_declare(exchange=topic_logs,
                         type=topic)
 
routing_key = sys.argv[1] if len(sys.argv) > 1 else anonymous.info
message =  .join(sys.argv[2:]) or Hello World!
channel.basic_publish(exchange=topic_logs,
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

 

生产者:

 

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=localhost))
channel = connection.channel()
 
channel.exchange_declare(exchange=topic_logs,
                         type=topic)
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)
 
for binding_key in binding_keys:
    channel.queue_bind(exchange=topic_logs,
                       queue=queue_name,
                       routing_key=binding_key)
 
print( [*] Waiting for logs. To exit press CTRL+C)
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

 

redis,rabbitmq

标签:表达   require   current   负载   wait   default   nbsp   重新定义   font   

原文地址:http://www.cnblogs.com/pengxuann/p/6060734.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!