标签:
RPC:Remote Procedure Call
DRPC:Distributed RPC


其中蓝色的bolt是需要用户自己定义的。
|
1
2
3
|
drpc.servers: - "vm1" - "vm2" |
注意:drpc.servers:前面不能有空格(yaml文件格式)
(2)、在上面配置的那些机器上运行DRPC Server
|
1
|
storm drpc |
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
package com.test.storm.bolt;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseBasicBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;public class MyBolt extends BaseBasicBolt{ private static final long serialVersionUID = 1L; @Override public void execute(Tuple input, BasicOutputCollector collector) { try { //input里有两个字段:{request:6170525749586968710,args:hello} //request字段是Long类型, args是String类型 Long id = input.getLong(0); System.out.println("request id: " + id); String args = input.getString(1); System.out.println("args: " + args); String result = args.toUpperCase(); //如果这个bolt是最有一个用户的bolt,则必须是两个字段id、result //如果是中间bolt,则第一个字段必须是id //第一个字段是request的id,第二个字段是处理后的结果 collector.emit(new Values(id, result)); } catch (Exception e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { //如果这个bolt是最有一个用户的bolt,则必须是两个字段,建议是id、result(经测试,字段名字可以随意) //如果是中间bolt,则第一个字段必须是id //后面的内置bolt会根据字段位置获取值,0、1 declarer.declare(new Fields("id", "result")); }} |
代码中需要注意的地方和解释。
2)构建topology
|
1
2
3
|
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder(FUNC_NAME);builder.addBolt(new MyBolt());StormTopology drpcTopology = builder.createLocalTopology(drpcServer); |
本地模式会在一个进程中模式DRPC Server,不需要绑定到端口,必须使用LocalDRPC对象才能调用方法,本地模式仅供测试用。
LocalDRPCTest.java:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
package com.test.storm.drpc;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.LocalDRPC;import backtype.storm.drpc.LinearDRPCTopologyBuilder;import backtype.storm.generated.StormTopology;import com.test.storm.bolt.MyBolt;public class LocalDRPCTest { private static final String FUNC_NAME = "upper"; public static void main(String[] args) { LocalDRPC drpcServer = new LocalDRPC(); LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder(FUNC_NAME); builder.addBolt(new MyBolt()); StormTopology drpcTopology = builder.createLocalTopology(drpcServer); LocalCluster cluster = new LocalCluster(); Config config = new Config(); config.setDebug(true); cluster.submitTopology("drpcupper", config, drpcTopology); String result = drpcServer.execute(FUNC_NAME, "hello"); System.out.println("result: " + result); drpcServer.shutdown(); cluster.shutdown(); }} |
RemoteDRPCTest.java如下:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
package com.test.storm.drpc;import backtype.storm.Config;import backtype.storm.StormSubmitter;import backtype.storm.drpc.LinearDRPCTopologyBuilder;import com.test.storm.bolt.MyBolt;public class RemoteDRPCTest { private static final String FUNC_NAME = "upper"; public static void main(String[] args) throws Exception { LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder(FUNC_NAME); builder.addBolt(new MyBolt()); StormSubmitter.submitTopology("drpcupper", new Config(), builder.createRemoteTopology()); }} |
在真实的Storm集群上运行,需要如下三个步骤:
(1)如上面(2、DRPC Server)说明的那样配置并运行DRPC Server
(2)提交DRPC topology到Storm集群上
|
1
|
storm jar drpc.jar com.test.storm.drpc.RemoteDRPCTest |
(3)客户端调用程序
DRPCClientTest:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
package com.test.storm.drpc;import backtype.storm.utils.DRPCClient;public class DRPCClientTest { public static void main(String[] args) throws Exception { DRPCClient client = new DRPCClient("vm1", 3772); String result = client.execute("upper", "hellmmo"); System.out.println(result); }} |
一次可以部署多个DRPC:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
package com.test.storm.drpc;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.LocalDRPC;import backtype.storm.drpc.LinearDRPCTopologyBuilder;import backtype.storm.generated.StormTopology;import com.test.storm.bolt.MyBolt;public class LocalDRPCMutipleTest { public static void main(String[] args) { LocalDRPC drpcServer = new LocalDRPC(); LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("upper"); builder.addBolt(new MyBolt()); StormTopology drpcTopology = builder.createLocalTopology(drpcServer); LinearDRPCTopologyBuilder builder2 = new LinearDRPCTopologyBuilder("upper2"); builder2.addBolt(new MyBolt()); StormTopology drpcTopology2 = builder2.createLocalTopology(drpcServer); LocalCluster cluster = new LocalCluster(); Config config = new Config(); config.setDebug(true); cluster.submitTopology("drpcupper", config, drpcTopology); cluster.submitTopology("drpcupper2", config, drpcTopology2); String result = drpcServer.execute("upper", "hello"); System.out.println("result1: " + result); String result2 = drpcServer.execute("upper2", "hello"); System.out.println("result2: " + result); drpcServer.shutdown(); cluster.shutdown(); }} |
标签:
原文地址:http://www.cnblogs.com/lishouguang/p/4566908.html