码迷,mamicode.com
首页 > 其他好文 > 详细

storm group 的介绍与使用

时间:2019-06-21 18:45:23      阅读:116      评论:0      收藏:0      [点我收藏+]

标签:cluster   err   最小   dsd   cout   根据   public   oba   ===   

一.stream group分组介绍

 Stream 的分组分为随机分组、字段分组、全部分组、全局分组、无分组、直接分组,自定义分组

二.group的介绍

  1.Shuffle grouping:通过tuple获取任务到supout,然后再由spout将任务分发到Bolt上。这种分组是随机性的,没有规律可言,任务的多少可能会跟被分配机器性能有关。

  2.Fields grouping :   根据指定字段将tuple进行分组。例如,根据“user-id”字段,相同“user-id”的tuple总是分发到task上,不同“user-id”的tuple可能分发到不同的task上。

  3.All grouping  : tuple被复制到bolt的所有任务。这种类型需要谨慎使用。

  4.Global grouping : 在多个bolt中对数据进行了一系列的操作,在最后一个bolt时需要对前面bolt操作的数据进行整合,这里就需要用global grouping 分组来进行整合。确切的说,是分配给ID最小的那个task执行

  5.Direct grouping : global grouping 实现刚好相反的作用。这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务接收。

  6.local or shuffle grouping :  假如上游的组件是spoutbolt,下游是一个bolt时,假如通过shufflefield分组恰好上游的task到下游的task时,两个work恰好是同一个work,都在一个jvm进程里面,正常情况下,我们会起多个solt点,让上游的task发送到下游的task时分配了2jvm进程,会通过tcp/rcp的方式进行通信,但是,如果上游的task和下游的task时在同一个进程时是没必要进行通信的,所以采用了本地或随机分组方式,减少网络通信的消耗,提高storm的运算效率

  7.None grouping :  你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到BoltsSpouts订阅它们的同一线程去执行(如果可能)。

  8.Custom grouping : 一般情况下,我们不会自定义grouping

三.group 的具体实现

1.随机分组(Shuffle grouping)

=========================================  Topology  ===============================================
public class ShuffleGroupingTopology {
    
    private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingTopology.class);
    
    public static void main(String[] args) {
        
        TopologyBuilder builder = new TopologyBuilder();
        
        builder.setSpout("ShuffleGroupingSpout", new ShuffleGroupingSpout(),1);
        builder.setBolt("ShuffleGroupingBolt",new ShuffleGroupingBolt(),2).shuffleGrouping("ShuffleGroupingSpout");
        builder.setBolt("ShuffleGroupingBolt2",new ShuffleGroupingBolt2(),2).shuffleGrouping("ShuffleGroupingBolt");
        
        Config config = new Config();
        config.setNumWorkers(3);
        try {
            StormSubmitter.submitTopology("ShuffleGroupingTopology", config, builder.createTopology());
            log.warn("==================================================");
            log.warn("the topology {} is submitted.","ShuffleGroupingTopology");
            log.warn("==================================================");
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }
}
=====================================   Spout  =========================================
public class ShuffleGroupingSpout extends BaseRichSpout{ private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingSpout.class); private SpoutOutputCollector collector; private TopologyContext context; private AtomicInteger ai; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; this.context = context; this.ai = new AtomicInteger(); log.warn("ShuffleGroupingSpout ------> open:hashcode{} ---->taskId:{}",this.hashCode(),context.getThisTaskId()); } @Override public void nextTuple() { int i =this.ai.getAndIncrement(); if(i<10){ log.warn("ShuffleGroupingSpout ------> nextTuple:hashcode:{} ---->taskId:{} ----->value:{}",this.hashCode(),context.getThisTaskId(),i); collector.emit(new Values(i)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("i")); } @Override public void close() { log.warn("ShuffleGroupingSpout ------> close:hashcode:{} ---->taskId:{} ",this.hashCode(),context.getThisTaskId()); } } ====================================== Bolt ============================================ public class ShuffleGroupingBolt extends BaseBasicBolt{ private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingBolt.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) { this.context = context; log.warn("ShuffleGroupingBolt ------> prepare:hashcode:{} ----> taskId:{} ",this.hashCode(),context.getThisTaskId()); } @Override public void execute(Tuple input, BasicOutputCollector collector) { Integer i = input.getIntegerByField("i"); collector.emit(new Values(i*10)); log.warn("ShuffleGroupingBolt ------> prepare:hashcode:{} ---->taskId:{} ---->value:{}",this.hashCode(),context.getThisTaskId(),i); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("result")); } @Override public void cleanup() { log.warn("ShuffleGroupingBolt ------> cleanup:hashcode:{} ---->taskId:{} ",this.hashCode(),context.getThisTaskId()); } } ====================================== Bolt2 ============================================ public class ShuffleGroupingBolt2 extends BaseBasicBolt{ private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingBolt2.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) { this.context = context; log.warn("ShuffleGroupingBolt2 ------> prepare:hashcode:{} ----> taskId:{} ",this.hashCode(),context.getThisTaskId()); } @Override public void execute(Tuple input, BasicOutputCollector collector) { Integer i = input.getIntegerByField("result"); log.warn("ShuffleGroupingBolt2 ------> prepare:hashcode:{} ---->taskId:{} ---->result:{}",this.hashCode(),context.getThisTaskId(),i); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { //nothing to do } @Override public void cleanup() { log.warn("ShuffleGroupingBolt2 ------> cleanup:hashcode:{} ---->taskId:{} ",this.hashCode(),context.getThisTaskId()); } }

2.字段分组(Fields grouping)

//======================================  Topology ============================================
public class FieldsShuffleTopology {
  private static final Logger log = LoggerFactory.getLogger(FieldsShuffleTopology.class);
    
    public static void main(String[] args) {
        
        TopologyBuilder builder = new TopologyBuilder();
        
        builder.setSpout("FieldsShuffleSpout", new FieldsShuffleSpout(),1);
        builder.setBolt("FieldsShuffleUpperBolt",new FieldsShuffleUpperBolt(),2).shuffleGrouping("FieldsShuffleSpout");
        builder.setBolt("FieldsShuffleFinalBolt",new FieldsShuffleFinalBolt(),2).fieldsGrouping("FieldsShuffleUpperBolt", new Fields("upperName"));
        
        Config config = new Config();
        config.setNumWorkers(3);
        try {
            StormSubmitter.submitTopology("FieldsShuffleTopology", config, builder.createTopology());
            log.warn("==================================================");
            log.warn("the topology {} is submitted.","FieldsShuffleTopology");
            log.warn("==================================================");
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }
}

======================================  Spout  ============================================

public class FieldsShuffleSpout extends BaseRichSpout {
    private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingSpout.class);

    private SpoutOutputCollector collector;
    private TopologyContext context;
    private List<String> list;
    private int index;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        this.context = context;
        this.index = 0;
        this.list = Arrays.asList("Hello", "Hello", "Hello", "Hello", "Hello", "Hello", "Word", "Word");
        log.warn("FieldsShuffleSpout open:hashcode{} taskId:{}", this.hashCode(), context.getThisTaskId());
    }

    @Override
    public void nextTuple() {
        if (index < list.size()) {
            String s = list.get(index++);
            log.warn("FieldsShuffleSpout nextTuple:hashcode:{} taskId:{} value:{}", this.hashCode(),
                    context.getThisTaskId(), s);
            collector.emit(new Values(s));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("i"));
    }

    @Override
    public void close() {
        log.warn("FieldsShuffleSpout close:hashcode:{} taskId:{} ", this.hashCode(), context.getThisTaskId());
    }
}

======================================  Bolt ==========================================
public class FieldsShuffleUpperBolt extends BaseBasicBolt{

private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingBolt.class);
    
    private  TopologyContext context;
    
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        this.context = context;
        log.warn("FieldsShuffleUpperBolt ------> prepare:hashcode:{} ----> taskId:{} ",this.hashCode(),context.getThisTaskId());
        
    }
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String s = input.getStringByField("i");
        collector.emit(new Values(s.toUpperCase()));
        log.warn("FieldsShuffleUpperBolt ------> prepare:hashcode:{} ---->taskId:{} ---->String:{}",this.hashCode(),context.getThisTaskId(),s);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("upperName"));
    }
    
    @Override
    public void cleanup() {
        log.warn("FieldsShuffleUpperBolt ------> cleanup:hashcode:{} ---->taskId:{} ",this.hashCode(),context.getThisTaskId());
    }
}

======================================  Bolt2 ==========================================
public class FieldsShuffleFinalBolt extends BaseBasicBolt{
    
    private static final Logger log = LoggerFactory.getLogger(FieldsShuffleFinalBolt.class);
    
    private  TopologyContext context;
    
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        this.context = context;
        log.warn("FieldsShuffleFinalBolt ------> prepare:hashcode:{} ----> taskId:{} ",this.hashCode(),context.getThisTaskId());
        
    }
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String s = input.getStringByField("upperName");
        log.warn("FieldsShuffleFinalBolt ------> prepare:hashcode:{} ---->taskId:{} ---->String:{}",this.hashCode(),context.getThisTaskId(),s);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        //nothing to do
    }
    
    @Override
    public void cleanup() {
        log.warn("FieldsShuffleFinalBolt ------> cleanup:hashcode:{} ---->taskId:{} ",this.hashCode(),context.getThisTaskId());
    }
}

3.全部分组(All grouping

  tuple数据将会被复制到下游的所有的bolt的任务中。这种类型需要谨慎使用。

 

 

public class AllGroupTopology {
    
    private static final Logger log = LoggerFactory.getLogger(AllGroupTopology .class);
    
    public static void main(String[] args) {
        
        TopologyBuilder builder = new TopologyBuilder();
        
        builder.setSpout("AllGrouppingSpout", new AllGrouppingSpout(),1);
        builder.setBolt("AllBolt1",new AllBolt1(),1).shuffleGrouping("AllGrouppingSpout");
        builder.setBolt("AllBolt2",new AllBolt2(),1).shuffleGrouping("AllGrouppingSpout");
         
        Config config = new Config();
        config.setNumWorkers(3);
        try {
            StormSubmitter.submitTopology("AllGroupTopology", config, builder.createTopology());
            log.warn("==================================================");
            log.warn("the topology {} is submitted.","AllGroupTopology");
            log.warn("==================================================");
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }
}
======================================  AllGrouppingSpout ==========================================
public class AllGrouppingSpout extends BaseRichSpout {
    
    private static final Logger log = LoggerFactory.getLogger(AllGrouppingSpout.class);

    private SpoutOutputCollector collector;
    private TopologyContext context;
    private List<String> list;
    private int index;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        this.context = context;
        this.index = 0;
        this.list = Arrays.asList("Hello", "Hello", "Hello", "Hello", "Hello", "Hello", "Word", "Word");
        log.warn("AllGrouppingSpout open:hashcode{} taskId:{}", this.hashCode(), context.getThisTaskId());
    }

    @Override
    public void nextTuple() {
        if (index < list.size()) {
            String s = list.get(index++);
            log.warn("AllGrouppingSpout nextTuple:hashcode:{} taskId:{} value:{}", this.hashCode(),
                    context.getThisTaskId(), s);
            collector.emit(new Values(s));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("name"));
    }
}
======================================  AllBolt1  ==========================================
public class AllBolt1 extends BaseBasicBolt{

    private static final Logger log = LoggerFactory.getLogger(AllBolt1.class);
    
    private  TopologyContext context;
    
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        this.context = context;
        log.warn("AllBolt1 ------> prepare:hashcode:{} ----> taskId:{} ",this.hashCode(),context.getThisTaskId());
    }
    
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String s = input.getStringByField("name");
        collector.emit(new Values(s.toUpperCase()));
        log.warn("AllBolt1 execute:hashcode:{} ---->taskId:{} ---->String:{}",this.hashCode(),context.getThisTaskId(),s);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        //nothing to do 
    }
}
======================================  AllBolt2  ==========================================

public class AllBolt2 extends BaseBasicBolt{

    private static final Logger log = LoggerFactory.getLogger(AllBolt2.class);
    
    private  TopologyContext context;
    
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        this.context = context;
        log.warn("AllBolt2 ------> prepare:hashcode:{} ----> taskId:{} ",this.hashCode(),context.getThisTaskId());
    }
    
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String s = input.getStringByField("name");
        collector.emit(new Values(s.toUpperCase()));
        log.warn("AllBolt2 execute :hashcode:{} ---->taskId:{} ---->String:{}",this.hashCode(),context.getThisTaskId(),s);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        //nothing to do 
    }
}

//通过查看日志得出,2bolt都收到了spout8条数据,也就是说,shuffle groupping的分组并没有起到作用,还是从spout中获取了8条数据,所以是将任务的副本全拷贝过来了

 

 

4.全局分组(Global grouping):

 

  在多个bolt中对数据进行了一系列的操作,在最后一个bolt时需要对前面bolt操作的数据进行整合,这里就需要用global grouping 分组来进行整合。

  确切的说,是分配给ID最小的那个task执行。

 

  需求:随机将数据分发到doubleBolt上,在最后一个bolt上做整合操作

 

public class GlobalGroupingTopology {
    
    private final static Logger log = LoggerFactory.getLogger(GlobalGroupingTopology.class);
    
    public static void main(String[] args) {

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("NumberGenerateSpout", new NumberGenerateSpout(), 1);
        builder.setBolt("NumberDoubleBolt", new NumberDoubleBolt(), 2).shuffleGrouping("NumberGenerateSpout");
        builder.setBolt("NumberPrintBolt", new NumberPrintBolt(), 2).globalGrouping("NumberDoubleBolt");

        Config config = new Config();
        config.setNumWorkers(4);
        try {
            StormSubmitter.submitTopology("GlobalGroupingTopology", config, builder.createTopology());
            log.warn("==================================================");
            log.warn("the topology {} is submitted.", "GlobalGroupingTopology");
            log.warn("==================================================");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
// ==============================  spout =======================================
public class NumberGenerateSpout extends BaseRichSpout{
    
    private SpoutOutputCollector collector;
    
    private AtomicInteger counter;
    
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        this.counter = new AtomicInteger(0);
        
    }

    @Override
    public void nextTuple() {
        while(counter.get()< 10){
            collector.emit(new Values(counter.getAndIncrement()));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("i"));
    }
}

// =========================== bolt ====================================
public class NumberDoubleBolt extends BaseBasicBolt{ @Override public void execute(Tuple input, BasicOutputCollector collector) { Integer value = input.getIntegerByField("i"); collector.emit(new Values(value*2,10)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("i","constant")); } } // ========================== bolt ==================================== public class NumberPrintBolt extends BaseBasicBolt{ private final static Logger logger = LoggerFactory.getLogger(NumberPrintBolt.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) { this.context = context; logger.warn("============== perpare TaskID:{}",context.getThisTaskId()); } @Override public void execute(Tuple input, BasicOutputCollector collector) { Integer i = input.getIntegerByField("i"); Integer constant = input.getIntegerByField("constant"); logger.warn("taskID:{},instantID:{},i:{},constant:{}",context.getThisTaskId(),this,i,constant); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // nothing to do } }

 

5.直接分组(Direct grouping

  global grouping 实现刚好相反的作用,这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务接收。

public class DirectGroupingTopology {
    
    private static final Logger log = LoggerFactory.getLogger(DirectGroupingTopology.class);
    
    public static void main(String[] args) {
        
        TopologyBuilder builder = new TopologyBuilder();
        
        builder.setSpout("NumberGenerateSpout", new NumberGenerateSpout(),1);
        builder.setBolt("NumberDoubleBolt",new NumberDoubleBolt(),2).directGrouping("NumberGenerateSpout");
        
        Config config = new Config();
        config.setNumWorkers(3);
        try {
            StormSubmitter.submitTopology("DirectGroupingTopology", config, builder.createTopology());
            log.warn("==================================================");
            log.warn("the topology {} is submitted.","DirectGroupingTopology");
            log.warn("==================================================");
            
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }
}

// ============================= spout ===============================
public class NumberGenerateSpout extends BaseRichSpout { private SpoutOutputCollector collector; private AtomicInteger counter; private int destTaskID; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; counter = new AtomicInteger(0); List<Integer> tasks = context.getComponentTasks("NumberDoubleBolt"); destTaskID = tasks.stream().mapToInt(Integer::intValue).max().getAsInt(); } @Override public void nextTuple() { while(counter.get() < 10){ collector.emitDirect(destTaskID,new Values(counter.getAndIncrement())); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(true,new Fields("i")); } }
// =============================== bolt ==============================================
public class NumberDoubleBolt extends BaseBasicBolt{ private static final Logger log = LoggerFactory.getLogger(NumberDoubleBolt.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) { this.context = context; log.warn(" =================== taskId:{}",context.getThisTaskId()); }; @Override public void execute(Tuple input, BasicOutputCollector collector) { Integer value = input.getIntegerByField("i"); log.warn("taskID:{},instanceID:{},value:{}",context.getThisTaskId(),this,value); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // nothing to do } }

 

 

6.本地或随机分组local or shuffle grouping

 

public class LocalLogTopology {
    
    private static final Logger log = LoggerFactory.getLogger(LocalLogTopology.class);
    
    public static void main(String[] args) {
        
        TopologyBuilder builder = new TopologyBuilder();
        
        builder.setSpout("LogSpout", new LogSpout(),1);
        builder.setBolt("LogParseBolt",new LogParseBolt(),1).localOrShuffleGrouping("LogSpout");
        builder.setBolt("LogPrintBolt",new LogPrintBolt(),2).localOrShuffleGrouping("LogSpout");
        
        Config config = new Config();
        config.setDebug(false);
        config.setNumWorkers(4);
        try {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LocalLogTopology", config, builder.createTopology());
            
            log.warn("==================================================");
            log.warn("the topology {} is submitted.","LocalLogTopology");
            log.warn("==================================================");
            
            TimeUnit.SECONDS.sleep(120);
            
            cluster.killTopology("LocalLogToplogy");
            cluster.shutdown();
            
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }
}
// =============================== spout ==============================
public class LogSpout extends BaseRichSpout{
    
    private SpoutOutputCollector collector;
    
    private List<String> list;
    
    private int index;
    
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        this.index = 0;
        this.list = Arrays.asList("JAVA,COLLECTION","JAVA,IO","JAVA,THREAD","JAVA,LAMBDA","BIG_DATA,STORM",
                "BIG_DATA,KAFKA","BIG_DATA,HADDOP","BIG_DATA,FLUME","BIG_DATA,KAFKA","C,c");
        
    }
    
    @Override
    public void nextTuple() {
        while(index < list.size()){
            collector.emit(new Values(list.get(index++)));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("entity"));
    }
}
//============================== bolt =====================================
public class LogParseBolt extends BaseBasicBolt{

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String entity = input.getStringByField("entity");
        List<String> list = Splitter.on(",").splitToList(entity);
        
        collector.emit(new Values(list.get(0),list.get(1))); 
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("category","item"));
    }

}

//============================= bolt ===========================================
public class LogPrintBolt extends BaseBasicBolt{ private static final Logger LOG = LoggerFactory.getLogger(LogPrintBolt.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) { this.context = context; LOG.info("=========================================="); LOG.info("prepare taskID:{}",context.getThisTaskId()); LOG.info("=========================================="); } @Override public void execute(Tuple input, BasicOutputCollector collector) { String category = input.getStringByField("category"); String item = input.getStringByField("item"); LOG.info("=========================================="); LOG.info("execute:category:{},item:{},taskID:{}",category,item,context.getThisTaskId()); LOG.info("=========================================="); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // nothing to do } }

 

 

7.无分组(None grouping)

 

  你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到BoltsSpouts订阅它们的同一线程去执行(如果可能)。

 

 

8.Custom grouping(自定义)

  

public class LocalLogTopology {
    
    private static final Logger log = LoggerFactory.getLogger(LocalLogTopology.class);
    
    public static void main(String[] args) {
        
        TopologyBuilder builder = new TopologyBuilder();
        
        builder.setSpout("LogSpout", new LogSpout(),1);
        builder.setBolt("LogParseBolt",new LogParseBolt(),1).localOrShuffleGrouping("LogSpout");
        builder.setBolt("LogPrintBolt",new LogPrintBolt(),2).customGrouping("LogParseBolt", new HighTaskIDGrouping());
        
        Config config = new Config();
        config.setDebug(false);
        config.setNumWorkers(4);
        try {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LocalLogTopology", config, builder.createTopology());
            
            log.warn("==================================================");
            log.warn("the topology {} is submitted.","LocalLogTopology");
            log.warn("==================================================");
            
            TimeUnit.SECONDS.sleep(120);
            
            cluster.killTopology("LocalLogToplogy");
            cluster.shutdown();
            
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }
}
// =============================== spout ==============================
public class LogSpout extends BaseRichSpout{
    
    private SpoutOutputCollector collector;
    
    private List<String> list;
    
    private int index;
    
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        this.index = 0;
        this.list = Arrays.asList("JAVA,COLLECTION","JAVA,IO","JAVA,THREAD","JAVA,LAMBDA","BIG_DATA,STORM",
                "BIG_DATA,KAFKA","BIG_DATA,HADDOP","BIG_DATA,FLUME","BIG_DATA,KAFKA","C,c");
        
    }
    
    @Override
    public void nextTuple() {
        while(index < list.size()){
            collector.emit(new Values(list.get(index++)));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("entity"));
    }
}
//============================== bolt =====================================
public class LogParseBolt extends BaseBasicBolt{

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String entity = input.getStringByField("entity");
        List<String> list = Splitter.on(",").splitToList(entity);
        
        collector.emit(new Values(list.get(0),list.get(1))); 
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("category","item"));
    }

}

//============================= custom grouping ===========================================

public class HighTaskIDGrouping implements CustomStreamGrouping{
    
    private int taskID;
    
    @Override
    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
        //List<Integer> targetTasks: 下游所有的tasks的集合
        ArrayList<Integer> tasks = new ArrayList<>(targetTasks);
        Collections.sort(tasks);        //从小到大排列
        this.taskID = tasks.get(tasks.size() -1);
    }

    @Override
    public List<Integer> chooseTasks(int taskId, List<Object> values) {
         
        return Arrays.asList(taskID);
    }

}

 

storm group 的介绍与使用

标签:cluster   err   最小   dsd   cout   根据   public   oba   ===   

原文地址:https://www.cnblogs.com/MrRightZhao/p/11066077.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!