标签:master
mkdir -p /home/hadoop/zookeeper/da ta
cd /home/hadoop/zookeeper/data
dataDir=/home/hadoop/zookeeper/data server.1=10.10.113.41:2888:3888 server.2=10.10.113.42:2888:3888 server.3=10.10.113.43:2888:3888
#zookeeper export ZOOKEEPER==/home/hadoop/zookeeper PATH=$PATH:$ZOOKEEPER/bin
zkServer.sh start
PATH=$PATH:/home/hadoop/storm
storm.zookeeper.servers: #zk地址 - "10.10.113.41" - "10.10.113.42" - "10.10.113.43" nimbus.host: "10.10.113.41" #master 节点地址 supervisor.slots.ports: #worker端口 我这里 所有的节点配置文件都是一样的,因为master 节点不参与计算所以这块配置上也可以。 - 6700 - 6701 - 6702 - 6703 storm.local.dir: "/home/hadoop/storm/data" #数据存放地址
nohup bin/storm nimbus >/dev/null 2>&1 & #启动主节点 nohup bin/storm ui >/dev/null 2>&1 & #启动stormUI nohup bin/storm logviewer >/dev/null 2>&1 & #启动logviewer 功能
nohup bin/storm supervisor >/dev/null 2>&1 & nohup bin/storm logviewer >/dev/null 2>&1 &
ui.port=8089
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.3</version> </dependency>
packagecn.oraclers.storm;
importbacktype.storm.Config;
importbacktype.storm.StormSubmitter;
importbacktype.storm.generated.AlreadyAliveException;
importbacktype.storm.generated.InvalidTopologyException;
importbacktype.storm.spout.SpoutOutputCollector;
importbacktype.storm.task.OutputCollector;
importbacktype.storm.task.TopologyContext;
importbacktype.storm.topology.OutputFieldsDeclarer;
importbacktype.storm.topology.TopologyBuilder;
importbacktype.storm.topology.base.BaseRichBolt;
importbacktype.storm.topology.base.BaseRichSpout;
importbacktype.storm.tuple.Fields;
importbacktype.storm.tuple.Tuple;
importbacktype.storm.tuple.Values;
importbacktype.storm.utils.Utils;
importjava.util.HashMap;
importjava.util.Map;
importjava.util.Random;
publicclassWordCount{
publicstaticclassSpoutSourceextendsBaseRichSpout{
Mapmap;
TopologyContexttopologyContext;
SpoutOutputCollectorspoutOutputCollector;
Randomrandom;
@Override
publicvoidopen(Mapmap,TopologyContexttopologyContext,SpoutOutputCollectorspoutOutputCollector){
map=map;
topologyContext=topologyContext;
spoutOutputCollector=spoutOutputCollector;
random=random;
}
String[]sentences=newString[]{"thecowjumpedoverthemoon","anappleadaykeepsthedoctoraway",
"fourscoreandsevenyearsago","snowwhiteandthesevendwarfs","iamattwowithnature"};
@Override
publicvoidnextTuple(){
Utils.sleep(1000);
for(Stringsentence:sentences){
spoutOutputCollector.emit(newValues(sentence));
}
}
@Override
publicvoiddeclareOutputFields(OutputFieldsDeclareroutputFieldsDeclarer){
outputFieldsDeclarer.declare(newFields("sentence"));
}
}
publicstaticclassSplitBoltSourceextendsBaseRichBolt{
Mapmap;
TopologyContexttopologyContext;
OutputCollectoroutputCollector;
@Override
publicvoidprepare(Mapmap,TopologyContexttopologyContext,OutputCollectoroutputCollector){
map=map;
topologyContext=topologyContext;
outputCollector=outputCollector;
}
@Override
publicvoidexecute(Tupletuple){
Stringsentence=tuple.getStringByField("sentence");
String[]words=sentence.split("");
for(Stringword:words){
this.outputCollector.emit(newValues(word));
}
}
@Override
publicvoiddeclareOutputFields(OutputFieldsDeclareroutputFieldsDeclarer){
outputFieldsDeclarer.declare(newFields("word"));
}
}
publicstaticclassSumBoltSourceextendsBaseRichBolt{
Mapmap;
TopologyContexttopologyContext;
OutputCollectoroutputCollector;
@Override
publicvoidprepare(Mapmap,TopologyContexttopologyContext,OutputCollectoroutputCollector){
this.map=map;
this.topologyContext=topologyContext;
this.outputCollector=outputCollector;
}
Map<String,Integer>mapCount=newHashMap<String,Integer>();
@Override
publicvoidexecute(Tupletuple){
Stringword=tuple.getStringByField("word");
Integercount=mapCount.get(word);
if(count==null){
count=0;
}
count++;
mapCount.put(word,count);
outputCollector.emit(newValues(word,count));
}
@Override
publicvoiddeclareOutputFields(OutputFieldsDeclareroutputFieldsDeclarer){
outputFieldsDeclarer.declare(newFields("word","count"));
}
}
publicstaticvoidmain(String[]args)throwsAlreadyAliveException,InvalidTopologyException{
TopologyBuilderbuilder=newTopologyBuilder();
builder.setSpout("data_source",newSpoutSource());
builder.setBolt("bolt_split",newSplitBoltSource()).shuffleGrouping("data_source");
builder.setBolt("bolt_sum",newSplitBoltSource()).fieldsGrouping("bolt_split",newFields("word"));
try{
ConfigstormConf=newConfig();
stormConf.setDebug(true);
StormSubmitter.submitTopology("Clustertopology",stormConf,builder.createTopology());
}catch(AlreadyAliveExceptione){
e.printStackTrace();
}catch(InvalidTopologyExceptione){
e.printStackTrace();
}
}
}
./storm jar storm jar sd-1.0-SNAPSHOT.jar cn.oraclers.storm.WordCount
本文出自 “8155900” 博客,请务必保留此出处http://8165900.blog.51cto.com/8155900/1721542
标签:master
原文地址:http://8165900.blog.51cto.com/8155900/1721542