码迷,mamicode.com
首页 > 其他好文 > 详细

DRPC详解

时间:2015-12-28 00:49:14      阅读:275      评论:0      收藏:0      [点我收藏+]

标签:

什么是DRPC?

分布式RPC(distributed RPC,DRPC)用于对Storm上大量的函数调用进行并行计算。

对于每一次函数调用,Storm集群上运行的拓扑接收调用函数的参数信息作为输入流,并将计算结果作为输出流发射出去。

 一句话概括:Storm进行计算,根据客户端提交的请求参数,而返回Storm计算的结果。

 DRPC通过DRPC Server来实现,DRPC Server的整体工作过程如下:

接收到一个RPC调用请求;

发送请求到Storm上的拓扑;

从Storm上接收计算结果;

将计算结果返回给客户端。

 

注:在client客户端看来,一个DRPC调用看起来和一般的RPC调用没什么区别

 

工作流程

技术分享

Client向DRPC Server发送被调用执行的DRPC函数名称及参数;

Storm上的topology通过DRPCSpout实现这一函数,从DPRC Server接收到函数调用流;

   DRPC Server会为每次函数调用生成唯一的id;

Storm上运行的topology开始计算结果,最后通过一个ReturnResults的Bolt连接到DRPC     Server,发送指定id的计算结果;

 

DRPC Server通过使用之前为每个函数调用生成的id,将结果关联到对应的发起调用的client,将计算结果返回给client。

 

DRPC包括服务端和客户端两部分:

1)服务端
服务端由四部分组成:包括一个DRPC Server, 一个 DPRC Spout,一个Topology和一个ReturnResult。


在实际使用中,主要有三个步骤:

a.启动Storm中的DRPC Server;

   首先,修改Storm/conf/storm.yaml中的drpc server地址;需要注意的是:必须修改所有Nimbus和supervisor上的配置文件,设置drpc server地址。否则在运行过程中可能无法返回结果。

  然后,通过 storm drpc命令启动drpc server。

b.创建一个DRPC 的Topology,提交到storm中运行。

  该Toplogy和普通的Topology稍有不同,可以通过两种方式创建:

  创建方法一:直接使用 Storm 提供的LinearDRPCTopologyBuilder。 (不过该方法在0.82版本中显示为已过期,不建议使用)

 1 package com.storm.drpc;
 2 
 3 import backtype.storm.Config;
 4 import backtype.storm.LocalCluster;
 5 import backtype.storm.LocalDRPC;
 6 import backtype.storm.StormSubmitter;
 7 import backtype.storm.drpc.LinearDRPCTopologyBuilder;
 8 import backtype.storm.topology.BasicOutputCollector;
 9 import backtype.storm.topology.OutputFieldsDeclarer;
10 import backtype.storm.topology.base.BaseBasicBolt;
11 import backtype.storm.tuple.Fields;
12 import backtype.storm.tuple.Tuple;
13 import backtype.storm.tuple.Values;
14 
15 public class BasicDRPCTopology {
16     
17    /**
18     *写多客户端的数据要做什么处理 ,就可以
19     *
20     */
21   public static class ExclaimBolt extends BaseBasicBolt {
22     private static final long serialVersionUID = 1L;
23 
24     @Override
25     public void execute(Tuple tuple, BasicOutputCollector collector) {
26       String input = tuple.getString(1);
27       
28       //第一列request请求ID
29       collector.emit(new Values(tuple.getValue(0), input + "!"));
30       
31     }
32 
33     @Override
34     public void declareOutputFields(OutputFieldsDeclarer declarer) {
35       declarer.declare(new Fields("id", "result"));
36     }
37   }
38 
39   public static void main(String[] args) throws Exception {
40      
41     /**
42         构建spout;
43         向DRPC Server返回结果;
44         为Bolt提供函数用于对tuples进行处理。
45      */
46     LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
47     //没有数据源,只是封装了一个算法模块
48     builder.addBolt(new ExclaimBolt(), 3);
49 
50     Config conf = new Config();
51 
52     if (args == null || args.length == 0) {
53         
54      //本地DRPC模式
55       LocalDRPC drpc = new LocalDRPC();
56       LocalCluster cluster = new LocalCluster();
57 
58       cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
59 
60       //客户端的代码
61       for (String word : new String[]{ "hello", "goodbye" }) {
62 
63           System.err.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));
64       }
65 
66       cluster.shutdown();
67       drpc.shutdown();
68     }
69     else {
70         
71       //远程DRPC提交,远程提交必须先启动服务
72       conf.setNumWorkers(3);
73       StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());
74     }
75   }
76 }

 

  创建方法二:

直接使用 Storm 提供的通用TopologyBuilder。 不过需要自己手动加上开始的DRPCSpout和结束的ReturnResults。
其实Storm 提供的LinearDRPCTopologyBuilder也是通过这种封装而来的。

 

 

 1 package com.storm.drpc;
 2 
 3 import backtype.storm.Config;
 4 import backtype.storm.LocalCluster;
 5 import backtype.storm.LocalDRPC;
 6 import backtype.storm.StormSubmitter;
 7 import backtype.storm.drpc.DRPCSpout;
 8 import backtype.storm.drpc.ReturnResults;
 9 import backtype.storm.generated.AlreadyAliveException;
10 import backtype.storm.generated.InvalidTopologyException;
11 import backtype.storm.topology.BasicOutputCollector;
12 import backtype.storm.topology.OutputFieldsDeclarer;
13 import backtype.storm.topology.TopologyBuilder;
14 import backtype.storm.topology.base.BaseBasicBolt;
15 import backtype.storm.tuple.Fields;
16 import backtype.storm.tuple.Tuple;
17 import backtype.storm.tuple.Values;
18 
19 
20 public class ManualDRPC {
21   public static class ExclamationBolt extends BaseBasicBolt {
22 
23     private static final long serialVersionUID = 1L;
24 
25     @Override
26     public void declareOutputFields(OutputFieldsDeclarer declarer) {
27       declarer.declare(new Fields("result", "return-info"));
28     }
29 
30     @Override
31     public void execute(Tuple tuple, BasicOutputCollector collector) {
32       String arg = tuple.getString(0);
33       Object retInfo = tuple.getValue(1);
34       collector.emit(new Values(arg + "!!!", retInfo));
35     }
36 
37   }
38 
39   public static void main(String[] args) {
40 
41     TopologyBuilder builder = new TopologyBuilder();
42     LocalDRPC drpc = new LocalDRPC();
43 
44     
45   //远程DRPC提交,远程提交必须先启动服务
46     if (args.length > 0) {
47             //开始的Spout
48             DRPCSpout spout = new DRPCSpout("exclamation");
49             builder.setSpout("drpc", spout);
50             //真正处理的Bolt
51             builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");
52             //结束的ReturnResults
53             builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");
54 
55             Config conf = new Config();
56             try {
57                 StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
58             } catch (AlreadyAliveException e) {
59                 e.printStackTrace();
60             } catch (InvalidTopologyException e) {
61                 e.printStackTrace();
62             }
63     }else {
64 
65     
66         //本地DRPC提交
67         DRPCSpout spout = new DRPCSpout("exclamation", drpc);
68         builder.setSpout("drpc", spout);
69         builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");
70         builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");
71 
72         LocalCluster cluster = new LocalCluster();
73         Config conf = new Config();
74         cluster.submitTopology("exclaim", conf, builder.createTopology());
75         
76          System.err.println(drpc.execute("exclamation", "aaa"));
77          System.err.println(drpc.execute("exclamation", "bbb"));
78     }
79     
80   }
81 }

 

 

 

客户端调用代码:

 1 package com.storm.drpc;
 2 
 3 import org.apache.thrift7.TException;
 4 
 5 import backtype.storm.generated.DRPCExecutionException;
 6 import backtype.storm.utils.DRPCClient;
 7 /**
 8  * DRPC客户端调用代码
 9  */
10 public class MyDRPCclient {
11 
12     /**
13      指定DRPC地址和端口,storm.yaml文件的配置:
14     drpc.servers: 
15        -  "drpcserver1" 
16        -  "drpcserver12"
17      */    
18     public static void main(String[] args) {
19         
20         DRPCClient client = new DRPCClient("192.168.1.107", 3772);
21         try {
22             String result = client.execute("exclamation", "hello,world");
23             
24             System.out.println(result);
25         } catch (TException e) {
26             e.printStackTrace();
27         } catch (DRPCExecutionException e) {
28             e.printStackTrace();
29         } 
30 
31     }
32 
33 }

 

 在Storm执行 storm jar   ./storm_drpc.jar   com.storm.drpc.ManualDRPC    testDrpc

 

运行结果  :  hello,world !!!

 

DRPC详解

标签:

原文地址:http://www.cnblogs.com/thinkpad/p/5081244.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!