码迷,mamicode.com
首页 > 其他好文 > 详细

(03)Storm编程案例

时间:2020-02-15 18:46:18      阅读:73      评论:0      收藏:0      [点我收藏+]

标签:exti   uil   gen   submit   输出   ted   初始   context   程序   

  本篇记录一下Storm的编程案例:将手机型号转换成大写,并且加上当前时间,再输出到文件。

  1、所需jar包

  解压安装包apache-storm-0.9.2-incubating.tar.gz,在apache-storm-0.9.2-incubating/lib下

  2、创建一个Spout类

  负责实时读取数据,然后发送给后续的bolt组件进行处理

package demo;

import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

//负责实时读取数据,然后发送给后续的bolt组件进行处理
public class DataSouceSpout extends BaseRichSpout {
    
    //模拟一些数据
    private String[] phones = {"iphone","xiaomi","moto","sunsumg","mate","huawei","nokia"};
    //得到上下文的信息
    private SpoutOutputCollector collector;
    
    //消息的处理方法
    public void nextTuple() {
        //模拟从外部读取数据
        Utils.sleep(500);
        //随机得到了一个手机的型号
        int index = new Random().nextInt(7);
        String phone = phones[index];
        //发送给后续的bolt
        this.collector.emit(new Values(phone));
    }

    //对这个spout进行初始化
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    //指定spout组件发送出去的数据的key
    public void declareOutputFields(OutputFieldsDeclarer declare) {
        declare.declare(new Fields("phone-name"));
    }
}

  3、创建一个Bolt类

  将Spout发送过来的数据,转成大写

package demo;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

//将Spout发送过来的数据,转成大写
public class MyBoltA extends BaseBasicBolt {

    //业务的处理逻辑方法
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        //从spout组件中获取数据
        //方式一:
        //String phone = tuple.getStringByField("phone-name");
        //方式二
        String phone = tuple.getString(0);
        //处理数据
        String upperPhone = phone.toUpperCase();
        //将数据发送给下一个组件继续处理
        collector.emit(new Values(upperPhone));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declare) {
        declare.declare(new Fields("upperphone"));
    }

}

  4、创建另一个Bolt类

  将BoltA发送过来的数据,加上时间,并且写到文件中

package demo;

import java.io.FileWriter;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

//将BoltA发送过来的数据,加上时间,并且写到文件中
public class MyBoltB extends BaseBasicBolt {

    private FileWriter fw = null;
    
    public void prepare(Map stormConf, TopologyContext context) {
        //对FileWriter进行初始化
        try {
            fw = new FileWriter("/usr/local/test/storm/mystormoutput.txt");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        //将boltA发送过来的数据,加上当前的时间
        //获取数据
        String upperPhoneName = tuple.getString(0);
        //业务逻辑
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String result = upperPhoneName +"   " + df.format(new Date());
        //将数据输出到文件系统中
        try {
            fw.write(result + "\n");
            fw.flush();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declare) {
        //这是最后一个bolt组件
    }

}

  5、创建一个组装类

  组装各个组件,并且提交任务到Storm集群

package demo;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;

//组装各个组件,并且提交任务到Storm集群
public class SubmitClient {

    public static void main(String[] args) throws Exception {
        //得到一个topology的构造器
        TopologyBuilder builder = new TopologyBuilder();
        //指定spout
        builder.setSpout("datasource-spout", new DataSouceSpout());
        //指定Bolt组件,还需要指定数据的来源
        builder.setBolt("boltA", new MyBoltA()).shuffleGrouping("datasource-spout");
        builder.setBolt("boltB", new MyBoltB()).shuffleGrouping("boltA");
        //生成一个具体的任务
        StormTopology phoneTopo = builder.createTopology();
        //指明任务的一些参数
        Config config = new Config();
        //希望storm集群分配6个worker来执行任务
        config.setNumWorkers(6);
        //提交任务
        StormSubmitter.submitTopology("mystormdemo", config, phoneTopo);
    }
}

  6、文件打包,发送服务器

  将这四个文件打成 stormDemo.jar,并且上传到Storm的服务器,临时存放在 /usr/local/test/storm

  7、运行程序

  首先启动Zookeeper和Storm,然后执行以下命令提交任务

[root@localhost apache-storm-0.9.2-incubating]# bin/storm jar /usr/local/test/storm/stormDemo.jar demo.SubmitClient

  如下图所示:

技术图片

  8、查看结果

[root@localhost storm]# tail -f /usr/local/test/storm/mystormoutput.txt

  程序执行成功,文件一输出到 mystormoutput.txt

技术图片

 

(03)Storm编程案例

标签:exti   uil   gen   submit   输出   ted   初始   context   程序   

原文地址:https://www.cnblogs.com/javasl/p/12312983.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!