码迷,mamicode.com
首页 > 其他好文 > 详细

spark示例——WordCount修改版

时间:2014-10-07 14:59:43      阅读:200      评论:0      收藏:0      [点我收藏+]

标签:style   blog   http   color   io   ar   java   for   文件   

java代码:

注:打包的时候一个依赖jar都不要。

import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public final class JavaWordCount {
    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) throws Exception {

        if (args != null && args.length < 2) {
            System.err.println("Usage: JavaWordCount <inputfile> <output>");
            System.exit(1);
        }

        SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
        JavaSparkContext ctx = new JavaSparkContext(sparkConf);
        JavaRDD<String> lines = ctx.textFile(args[0], 1);

        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            private static final long serialVersionUID = -5362343744041430068L;

            @Override
            public Iterable<String> call(String s) {
                return Arrays.asList(SPACE.split(s));
            }
        });

        JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String, Integer> call(String s) {
                return new Tuple2<String, Integer>(s, 1);
            }
        });

        JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
            private static final long serialVersionUID = 3338227964635666047L;

            @Override
            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }
        });

        List<Tuple2<String, Integer>> output = counts.collect();
        for (Tuple2<?, ?> tuple : output) {
            System.out.println(tuple._1() + ": " + tuple._2());
        }

        // output方式1:保存到文件中:
        //counts.saveAsTextFile(args[1]);
        
        // output方式2:保存到hdfs中:
        counts.saveAsHadoopFile(args[1], Text.class, IntWritable.class, TextOutputFormat.class);

        ctx.stop();
    }
}

spark on yarn 运行脚本: 

#!/bin/sh

echo $SPARK_JAR
spark-submit --class org.apache.spark.examples.JavaWordCount --master yarn-client --num-executors 3 --driver-memory 400m --executor-memory 500m --executor-cores 1 /usr/local/spark/spark-example-1.0.0-SNAPSHOT.jar hdfs://cluster1:9000/tmp/1.txt hdfs://cluster1:9000/out/spark/wordcount

 

执行结果:

bubuko.com,布布扣 

 

spark示例——WordCount修改版

标签:style   blog   http   color   io   ar   java   for   文件   

原文地址:http://www.cnblogs.com/muzhongjiang/p/4009208.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!