码迷,mamicode.com
首页 > 编程语言 > 详细

python项目开发:用RabbitMQ实现异步RPC

时间:2019-02-12 13:15:07      阅读:416      评论:0      收藏:0      [点我收藏+]

标签:splay   serve   microsoft   input   none   color   ip地址   closed   sele   

程序要求:

              1. 用Rabbit MQ实现RPC

              1. 可以异步地执行多条命令

              2. 可以对一次性对多个机器执行命令

程序效果:

              ---》run dir host1 host2 。。。。

              ---》get task_id

              ---》taskId:xxxx   host: xxxxxx

              ---》check task_id

              --->打印结果

程序分析:

              为了达到异步地效果,可以使用多线程或协程,即每执行一条命令就启动一条线程或协程。客户端发送命令到队列、从返回队列接收结果分离,不能写到一起。

业务逻辑:

技术图片

代码实现:

README

技术图片
#author:Wu zhiHao

#博客地址:https://www.cnblogs.com/BUPT-MrWu/p/10364619.html

#程序目录框架:
 |--RPC
  |--RPC_server #服务端
   |--bin
    |--start.py #程序入口
   |--core
    |--RpcServer.py #服务端主要逻辑
  |--RPC_client #客户端
   |--bin
    |--start.py #程序入口
   |--core
    |--main.py #程序主要逻辑
   |--modules
    |--RpcClient.py #客户端主要逻辑
   |--conf
    |--settings.py #配置文件
  |--READ_ME

#命令格式:
 1. run command host1 host2..... #执行命令
 2. all_task #获取全部task_id
 3. check task_id #获取命令结果
 4. delete task_id #删除命令结果
View Code

RPC_server\\bin\\start.py

技术图片
import sys,os
BASE_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
sys.path.append(BASE_dir)
from core import RpcServer
if __name__ == __main__:
    obj = RpcServer.RpcServer()
    obj.channel.start_consuming()
View Code

RPC_server\\core\\RpcServer.py

技术图片
import pika
import os
import socket
from conf import settings
class RpcServer(object):
    def __init__(self):
        self.credentials = pika.PlainCredentials(settings.RabbitMq_name,settings.RabbitMq_password) #RabbiMQ用户认证
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(
                settings.RabbitMq_ip,settings.RabbitMq_port,"/",self.credentials,
            )
        )
        self.My_Ip = self.get_ip() #获取服务端IP地址
        self.channel = self.connection.channel()
        self.result = self.channel.queue_declare(exclusive=True)
        self.queue_name = self.result.method.queue
        self.channel.exchange_declare(
            exchange="Rpc",
            exchange_type="direct",
        )
        self.channel.queue_bind(
            exchange="Rpc",
            queue=self.queue_name,
            routing_key=self.My_Ip,
        )
        self.channel.basic_consume(
            self.on_response,
            queue=self.queue_name,
        )

    def on_response(self,ch,method,properties,body):
        command = body.decode()
        command_result = self.on_request(command)
        self.channel.basic_publish(
            exchange="",
            routing_key=properties.reply_to,
            properties=pika.BasicProperties(
                correlation_id=properties.correlation_id,
            ),
            body=command_result
        )


    def on_request(self,command):
        return os.popen(command).read()

    def get_ip(self):
        computer_name = socket.getfqdn(socket.gethostname(  ))
        computer_Ip = socket.gethostbyname(computer_name)
        return computer_Ip
View Code

RPC_client\\bin\\start.py

技术图片
import sys,os

BASE_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
sys.path.append(BASE_dir)
from core import main
if __name__ == __main__:
    obj = main.run()
    obj.start()
View Code

RPC_client\\core\\main.py

技术图片
import random
import threading
from modules import RpcClient

class run(object):
    def __init__(self):
        self.client = RpcClient.RpcClient()
        self.information = {}

    def start(self):
        while True:
            try:
                command = input("-->")
                if not command:continue
                t = threading.Thread(target=self.select,args=(command,))
                t.start()
            except Exception as e:
                print(e)

    def select(self,command):
        ‘‘‘解析命令‘‘‘
        try:
            keyword = command.split()[0]
            func = getattr(self,keyword)
            func(command)
        except Exception as e:
            print(e)

    def run(self,command):
        ‘‘‘执行命令‘‘‘
        try:
            task_id = str(random.randint(100,1000))
            self.information[task_id] = {}
            keyword = command.split()[1]
            for host in command.split()[2:]:
                result = self.client.on_request(host,keyword)
                self.information[task_id][host] = [result[0],result[1]]
        except Exception as e:
            print(e)

    def check(self,command):
        ‘‘‘获取命令结果‘‘‘
        try:
            task_id = command.split()[1]
            for host in self.information[task_id]:
                corr_id = self.information[task_id][host][0]
                callback_queue = self.information[task_id][host][1]
                command_result = self.client.get_response(corr_id,callback_queue)
                print("%s:\n%s"%(host,command_result))
        except Exception as e:
            print(e)

    def all_task(self,command):
        ‘‘‘获取全部task_id‘‘‘
        try:
            for task_id in self.information:
                all_host = []
                for host in self.information[task_id]:
                    all_host.append(host)
                print("task_id: %s  host: %s\n"%(task_id,all_host))
        except Exception as e:
            print(e)
    def delete(self,command):
        ‘‘‘删除命令结果‘‘‘
        try:
            for task_id in command.split()[1:]:
                self.information.pop(task_id)
        except Exception as e:
            print(e)
View Code

RPC_client\\conf\\settings.py

技术图片
RabbitMq_name = "XXX" #RabbitMq用户名
RabbitMq_password = "XXX" #rabbitmq用户密码
RabbitMq_ip = "XXX" #RabbitMq端的IP地址
RabbitMq_port = 5672 #RabbitMq端的端口号
View Code

RPC_client\\mudules\\RpcClient.py

技术图片
import pika
import uuid
from conf import settings
class RpcClient(object):
    def __init__(self):
        self.credentials = pika.PlainCredentials(settings.RabbitMq_name,settings.RabbitMq_password) #RabbiMQ用户认证
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(
                settings.RabbitMq_ip,settings.RabbitMq_port,"/",self.credentials,
            )
        )
        self.channel = self.connection.channel()

    def get_response(self,corr_id,callback_queue):
        ‘‘‘从队列里取值‘‘‘
        self.corr_id = corr_id
        self.response = None
        self.channel.basic_consume(
            self.on_response,
            queue=callback_queue,
        )
        while self.response is None:
            self.connection.process_data_events() #非阻塞版的start_consuming
        return self.response

    def on_response(self,ch,method,properties,body):
        ‘‘‘当队列里有数据时执行‘‘‘
        if self.corr_id == properties.correlation_id:
            self.response = body.decode()

    def on_request(self,host,command):
        ‘‘‘发送命令‘‘‘
        result = self.channel.queue_declare(exclusive=False) #生成另一个queue时,这个queue不会消失
        callback_queue = result.method.queue #返回queue
        corr_id = str(uuid.uuid4()) #验证码
        self.channel.exchange_declare(
            exchange="Rpc",
            exchange_type="direct"
        )
        self.channel.basic_publish(
            exchange="Rpc",
            routing_key=host,
            properties=pika.BasicProperties(
                correlation_id=corr_id,
                reply_to=callback_queue,
            ),
            body=command,
        )
        return corr_id,callback_queue #返回验证值和返回queue
View Code

程序执行实例:

技术图片

技术图片

技术图片

python项目开发:用RabbitMQ实现异步RPC

标签:splay   serve   microsoft   input   none   color   ip地址   closed   sele   

原文地址:https://www.cnblogs.com/BUPT-MrWu/p/10364619.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!