标签:
标签: spark
修改scala版本
项目创建完成后默认使用的是scala的2.11.7 版本。要手动将版本换成2.10.X。在项目名称右击选择properties,在弹出窗口点击,scala Compiler,在右侧窗口,选中Use Project settings, 将scala Installation 修改为Latest 2.10 bundle(dynamic).点击apply,点击ok。scala版本变成2.10.6。
找到依赖的spark jar文件并导入到eclipse中。
所依赖的jar文件是
spark-1.6.0-bin-hadoop2.6\lib\spark-assembly-1.6.0-hadoop2.6.0.jar。
在项目名称上右击,选择build path ->configure build path。在弹出框中点击library,点击右侧的addExternalJARs,然后选择
park-assembly-1.6.0-hadoop2.6.0.jar点击打开,然后点击ok。
在src上右击new ->package 填入package的name为com.dt.spark。
在包的名字上右击选择new ->scala class 。在弹出框中Name 中,在增加WordCount。点击finish。
在方法内部讲关键字class 改成object ,然后创建main方法。
package com.dt.sparkimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.rdd.RDDobject WordCount{def main(args: Array[String]): Unit ={/**第1步,创建spark的配置对象sparkconf,设置spark运行时的配置信息,如通过setMaster设置程序连接的spark* 集群的master的URL,如果设置为local则在本地运行。* */val conf = new SparkConf()conf.setAppName("The first spark app")conf.setMaster("local")/**第2步,创建SparkContext对象,SparkContext是spark程序所有功能的唯一入口,其作用是初始化spark应用程序的* 核心组件,包括DAGScheduler,TaskScheduler,SchedulerBackend* */val sc = new SparkContext(conf)/**第3步,根据数据源(HDFS,HBase,Local FS)通过SparkContext来创建RDD* 数据被RDD划分为一系列的Partitions,分配到每个partition的数据属于一个Task的处理范畴* */val lines = sc.textFile("G://datarguru spark//tool//spark-1.4.0-bin-hadoop2.6//README.md", 1) //读取本地文件并设置一个partition/**第4步,对初始的RDD进行Transformation级别的处理,如map、filter高阶函数编程,进行具体计算**/val words = lines.flatMap{ line => line.split(" ")}//对每行字符串进行单词拆分,并把所有拆分结果通过flat合并成一个大的单词集合val pairs = words.map{ word => (word, 1)} //在单词拆分基础上对每个单词实例计数为1val wordCounts = pairs.reduceByKey(_+_)//相同的key,value累加wordCounts.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))sc.stop()}}
在运行过程中会出现WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable。java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. 这个错误。但是在local模式下,这个是正常的。因为spark是和hadoop编译在一起的,我们在window 下开发,缺少hadoop的配置。这不是程序错误,也不影响我们的任何功能。
import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.rdd.RDDobject WordCount_Cluster {def main(args: Array[String]){/**第1步,创建spark的配置对象sparkconf,设置spark运行时的配置信息,如通过setMaster设置程序连接的spark* 集群的master的URL,如果设置为local则在本地运行。* */val conf = new SparkConf() //创建SparkConf对象conf.setAppName("Wow,My First Spark App!")// conf.setMaster("spark://master:7077")/**第2步,创建SparkContext对象,SparkContext是spark程序所有功能的唯一入口,其作用是初始化spark应用程序的* 核心组件,包括DAGScheduler,TaskScheduler,SchedulerBackend* */val sc = new SparkContext(conf)/**第3步,根据数据源(HDFS,HBase,Local FS)通过SparkContext来创建RDD* 数据被RDD划分为一系列的Partitions,分配到每个partition的数据属于一个Task的处理范畴* *///val lines = sc.textFile("hdfs://Master:9000/library/wordcount/input/Data") //读取HDFS文件并切分成不同的Partionsval lines = sc.textFile("/library/wordcount/input/Data") //读取HDFS文件并切分成不同的Partions/**第4步,对初始的RDD进行Transformation级别的处理,如map、filter高阶函数编程,进行具体计算**/val words = lines.flatMap { line =>line.split(" ")} //对每一行的字符串进行单词拆分并把所有行的拆分结果通过flat合并成为一个大的单词集合val pairs = words.map { word => (word, 1) }val wordCounts = pairs.reduceByKey(_+_) //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)wordCounts.collect.foreach(wordNumberPair =>println(wordNumberPair._1 + " : " + wordNumberPair._2))//需要添加collect,表示集群sc.stop()}}
将程序达成jar 包
在项目名称上右击点击export选择java 下的jar file,点击next,选择输出目录,输入文件名,点击next,点击next,然后点击完成。导出jar 包。
在spark中执行wordcount方法。
将jar 放到linux系统某个目录中。执行
./spark-submit --class com.dt.spark.WordCount_Cluster --master spark://worker1:7077 ./wordcount.jar
也可以将以上命令保存到.sh文件中,直接执行sh文件即可。
由于spark1.6需要scala 2.10.X版本的。推荐 2.10.4,java版本最好是1.8。所以提前我们要需要安装好java和scala并在环境变量中配置好
安装完成以后启动IDEA,并进行配置,默认即可,然后点击ok以后,设置ui风格,然后点击next 会出现插件的选择页面,默认不需求修改,点击next,选择安装scala语言,点击install 按钮(非常重要,以为要开发spark程序所以必须安装),等安装完成以后点击start启动IDEA。
点击 create new project ,然后填写project name为“Wordcount”,选择项目的保存地址project location。
然后设置project sdk即java 的安装目录。点击右侧的new 按钮,选择jdk,然后选择java 的安装路径即可。
然后选择scalasdk。点击右侧的create ,默认出现时2.10.x 版本的scala,点击ok即可。然后点击finish。
点击file->project structure 来设置工程的libraries。核心是添加spark的jar依赖。选择Libraries ,点击右侧的加号,选择java,选择spark1.6.0 的spark-1.6.0-bin-hadoop2.6\lib\spark-assembly-1.6.0-hadoop2.6.0.jar。点击ok。稍等片刻后然后点击ok(Libraries作用于WordCount),然后点击apply,点击ok。(这一步很重要,如果没有无法编写spark的代码)
在src上右击new ->package 填入package的name为com.dt.spark。
在包的名字上右击选择new ->scala class 。在弹出框中填写Name ,并制定kind为object ,点击ok。
package com.dt.sparkimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.rdd.RDDobject WordCount{def main(args: Array[String]): Unit ={/**第1步,创建spark的配置对象sparkconf,设置spark运行时的配置信息,如通过setMaster设置程序连接的spark* 集群的master的URL,如果设置为local则在本地运行。* */val conf = new SparkConf()conf.setAppName("The first spark app")conf.setMaster("local")/**第2步,创建SparkContext对象,SparkContext是spark程序所有功能的唯一入口,其作用是初始化spark应用程序的* 核心组件,包括DAGScheduler,TaskScheduler,SchedulerBackend* */val sc = new SparkContext(conf)/**第3步,根据数据源(HDFS,HBase,Local FS)通过SparkContext来创建RDD* 数据被RDD划分为一系列的Partitions,分配到每个partition的数据属于一个Task的处理范畴* */val lines = sc.textFile("G://datarguru spark//tool//spark-1.4.0-bin-hadoop2.6//README.md", 1) //读取本地文件并设置一个partition/**第4步,对初始的RDD进行Transformation级别的处理,如map、filter高阶函数编程,进行具体计算**/val words = lines.flatMap{ line => line.split(" ")}//对每行字符串进行单词拆分,并把所有拆分结果通过flat合并成一个大的单词集合val pairs = words.map{ word => (word, 1)} //在单词拆分基础上对每个单词实例计数为1val wordCounts = pairs.reduceByKey(_+_)//相同的key,value累加wordCounts.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))sc.stop()}}
在代码去右击选择点击run”wordCount”来运行程序。在生成环境下肯定是写自动化shell 脚本自动提交程序的。
注意:如果val sc = new SparkContext(conf)报错,并且没有运行结果,需要将scala的module改成scala 2.10版本的。具体操作:File->project structure -> Dependencies ->删除scala 2.11.x的module.-> 左上角的“+” -> scala ->选中scala2.10.4 -> apply
package com.dtimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.rdd.RDDobject Word{def main(args: Array[String]): Unit ={/**第1步,创建spark的配置对象sparkconf,设置spark运行时的配置信息,如通过setMaster设置程序连接的spark* 集群的master的URL,如果设置为local则在本地运行。* */val conf = new SparkConf()conf.setAppName("The first spark app")//conf.setMaster("spark://master:7077")/**第2步,创建SparkContext对象,SparkContext是spark程序所有功能的唯一入口,其作用是初始化spark应用程序的* 核心组件,包括DAGScheduler,TaskScheduler,SchedulerBackend* */val sc = new SparkContext(conf)/**第3步,根据数据源(HDFS,HBase,Local FS)通过SparkContext来创建RDD* 数据被RDD划分为一系列的Partitions,分配到每个partition的数据属于一个Task的处理范畴* */val lines = sc.textFile("/library/wordcount/input/Data")/**第4步,对初始的RDD进行Transformation级别的处理,如map、filter高阶函数编程,进行具体计算**/val words = lines.flatMap{ line => line.split(" ")}//对每行字符串进行单词拆分,并把所有拆分结果通过flat合并成一个大的单词集合val pairs = words.map{ word => (word, 1)} //在单词拆分基础上对每个单词实例计数为1val wordCountsSorted = pairs.reduceByKey(_+_).map(pairs=>(pairs._2, pairs._1)).sortByKey(false).map(pair=>(pair._1, pair._2))//相同的key,value累加并且排名wordCountsSorted.collect.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))sc.stop()}}
将程序达成jar 包
点击file->project structure,在弹出的页面点击Artifacts,点击右侧的“+”,选择jar –> from modules with dependencies,在弹出的页面中,设置好main class 然后点击ok,在弹出页面修改Name(系统生成的name不规范)、导出位置并删除scala和spark的jar(因为集群环境中已经存在)点击ok 。然后在菜单栏中点击build –> Artifacts ,在弹出按钮中,点击bulid,会自动开始打包。
在spark中执行wordcount方法。
将jar 放到linux系统某个目录中。执行
./bin/spark-submit --class com.dt.spark.Word --master spark://master:7077 ./word.jar
注意事项:
为什么不能再ide开发环境中,直接发布spark程序到spark集群中?
1. 开发机器的内存和cores的限制,默认情况情况下,spark程序的dirver在提交spark程序的机器上,如果在idea中提交程序的话,那idea机器就必须非常强大。
2. Dirver要指挥workers的运行并频繁的发生同学,如果开发环境和spark集群不在同样一个网络下,就会出现任务丢失,运行缓慢等多种不必要的问题。
3. 这是不安全的。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.dt.spark</groupId><artifactId>SparkApps</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><name>SparkApps</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><version>1.6.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.10</artifactId><version>1.6.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.10</artifactId><version>1.6.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId><version>1.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka_2.10</artifactId><version>1.6.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-graphx_2.10</artifactId><version>1.6.0</version></dependency></dependencies><build><sourceDirectory>src/main/java</sourceDirectory><testSourceDirectory>src/main/test</testSourceDirectory><plugins><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><maniClass></maniClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>org.codehaus.mojo</groupId><artifactId>exec-maven-plugin</artifactId><version>1.3.1</version><executions><execution><goals><goal>exec</goal></goals></execution></executions><configuration><executable>java</executable><includeProjectDependencies>false</includeProjectDependencies><classpathScope>compile</classpathScope><mainClass>com.dt.spark.SparkApps.WordCount</mainClass></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.6</source><target>1.6</target></configuration></plugin></plugins></build></project>
package com.dt;import java.util.Arrays;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.VoidFunction;import scala.Function;import scala.Tuple2;public class WordCount {public static void main(String[] args){SparkConf conf = new SparkConf().setAppName("The first name of spark").setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);//其底层就是scala的SparkContextJavaRDD<String> lines = sc.textFile("G://datarguru spark//tool//spark-1.4.0-bin-hadoop2.6//README.md");JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>(){@Overridepublic Iterable<String> call(String line)throws Exception{return Arrays.asList(line.split(" "));}});JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>(){@Overridepublic Tuple2<String, Integer> call(String word)throws Exception{return new Tuple2<String, Integer>(word, 1);}});JavaPairRDD<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>(){ //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)@Overridepublic Integer call(Integer v1, Integer v2)throws Exception{return v1+v2;}});wordsCount.foreach(new VoidFunction<Tuple2<String, Integer>>(){@Overridepublic void call(Tuple2<String, Integer>pair)throws Exception{System.out.println(pair._1 + ":" + pair._2);}});sc.close();}}
在代码区右击run as -> java application 。来运行此程序并查看运行结果。
package com.dt;import java.util.Arrays;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.VoidFunction;import scala.Function;import scala.Tuple2;public class WordCount {public static void main(String[] args){SparkConf conf = new SparkConf().setAppName("The first name of spark");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> lines = sc.textFile("/library/wordcount/input/Data");JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>(){@Overridepublic Iterable<String> call(String line)throws Exception{return Arrays.asList(line.split(" "));}});JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>(){@Overridepublic Tuple2<String, Integer> call(String word)throws Exception{return new Tuple2<String, Integer>(word, 1);}});JavaPairRDD<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>(){@Overridepublic Integer call(Integer v1, Integer v2)throws Exception{return v1+v2;}});wordsCount.foreach(new VoidFunction<Tuple2<String, Integer>>(){@Overridepublic void call(Tuple2<String, Integer>pair)throws Exception{System.out.println(pair._1 + ":" + pair._2);}});sc.close();}}
jar打包在集群中submit
即用Spark作单词计数统计,数据到底是怎么流动的,参看一图:
从数据流动的视角分析数据到底是怎么被处理
sc.textFile("helloSpark.txt").flatMap(_.split(" ")).map(word=>(word,1)).reduceByKey(_+_).saveAsTextFile(outputPathwordcount)
(1)在IntelliJ IDEA中编写下面代码:
package com.dt.sparkimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextobject WordCount {def main(args: Array[String]){valconf = new SparkConf()conf.setAppName("Wow, My First Spark App!")conf.setMaster("local")valsc = new SparkContext(conf)val lines = sc.textFile("D://tmp//helloSpark.txt", 1)val words = lines.flatMap { line =>line.split(" ") }val pairs = words.map { word => (word,1) }valwordCounts = pairs.reduceByKey(_+_)wordCounts.foreach(wordNumberPair =>println(wordNumberPair._1 + " : " + wordNumberPair._2))sc.stop()}}(2)在D盘下地tmp文件夹下新建helloSpark.txt文件,内容如下:Hello Spark Hello ScalaHello HadoopHello FlinkSpark is awesome(3)在WordCount代码区域点击右键选择Run ‘WordCount‘。可以得到如下运行结果:Flink : 1Spark : 2is : 1Hello : 4awesome : 1Hadoop : 1Scala : 1
def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {assertNotStopped()hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],minPartitions).map(pair => pair._2.toString)}
可以看出在进行了hadoopFile之后又进行了map操作。
HadoopRDD从HDFS上读取分布式文件,并且以数据分片的方式存在于集群之中。
/*** Return a new RDD by applying a function to all elements of this RDD.*/def map[U: ClassTag](f: T => U): RDD[U] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))}
读取到的一行数据(key,value的方式),对行的索引位置不感兴趣,只对其value事情兴趣。pair时有个匿名函数,是个tuple,取第二个元素。
此处又产生了MapPartitionsRDD。MapPartitionsRDD基于hadoopRDD产生的Parition去掉行的KEY。
注:可以看出一个操作可能产生一个RDD也可能产生多个RDD。如sc.textFile就产生了两个RDD:hadoopRDD和MapParititionsRDD。
下一步:
val words = lines.flatMap { line =>line.split(" ") }
对每个Partition中的每行进行单词切分,并合并成一个大的单词实例的集合。
FlatMap做的一件事就是对RDD中的每个Partition中的每一行的内容进行单词切分。
这边有4个Partition,对单词切分就变成了一个一个单词,
/*** Return a new RDD by first applying a function to all elements of this* RDD, and then flattening the results.*/def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))}
可以看出flatMap又产生了一个MapPartitionsRDD,此时的各个Partition都是拆分后的单词。
下一步:
val pairs = words.map { word => (word,1) }
将每个单词实例变为形如word=>(word,1)
map操作就是把切分后的每个单词计数为1。
根据源码可知,map操作又会产生一个MapPartitonsRDD。此时的MapPartitionsRDD是把每个单词变成Array(""Hello",1),("Spark",1)等这样的形式。
下一步:
valwordCounts = pairs.reduceByKey(_+_)
reduceByKey是进行全局单词计数统计,对相同的key的value相加,包括local和reducer同时进行reduce。所以在map之后,本地又进行了一次统计,即local级别的reduce。
shuffle前的Local Reduce操作,主要负责本地局部统计,并且把统计后的结果按照分区策略放到不同的File。
下一Stage就叫Reducer了,下一阶段假设有3个并行度的话,每个Partition进行Local Reduce后都会把数据分成三种类型。最简单的方式就是用HashCode对其取模。
至此都是stage1。
Stage内部完全基于内存迭代,不需要每次操作都有读写磁盘,所以速度非常快。
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)}/*** Merge the values for each key using an associative and commutative reduce function. This will* also perform the merging locally on each mapper before sending results to a reducer, similarly* to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.*/def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {reduceByKey(new HashPartitioner(numPartitions), func)}/*** Merge the values for each key using an associative and commutative reduce function. This will* also perform the merging locally on each mapper before sending results to a reducer, similarly* to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/* parallelism level.*/def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {reduceByKey(defaultPartitioner(self), func)}
可以看到reduceByKey内部有combineByKeyWithClassTag。combineByKeyWithClassTag的源码如下:
def combineByKeyWithClassTag[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true,serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0if (keyClass.isArray) {if (mapSideCombine) {throw new SparkException("Cannot use map-side combining with array keys.")}if (partitioner.isInstanceOf[HashPartitioner]) {throw new SparkException("Default partitioner cannot partition array keys.")}}val aggregator = new Aggregator[K, V, C](self.context.clean(createCombiner),self.context.clean(mergeValue),self.context.clean(mergeCombiners))if (self.partitioner == Some(partitioner)) {self.mapPartitions(iter => {val context = TaskContext.get()new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))}, preservesPartitioning = true)} else {new ShuffledRDD[K, V, C](self, partitioner).setSerializer(serializer).setAggregator(aggregator).setMapSideCombine(mapSideCombine)}}
可以看出在combineByKeyWithClassTag内又new 了一个ShuffledRDD。
ReduceByKey有两个作用:
1. 进行Local级别的Reduce,减少网络传输。
2. 把当前阶段的内容放到本地磁盘上供shuffle使用。
产生Shuffle数据就需要进行分类,MapPartitionsRDD时其实已经分好类了,最简单的分类策略就是Hash分类。
ShuffledRDD需要从每台机上抓取同一单词。
reduceByKey发生在哪里?
Stage2全部都是reduceByKey
统计完的结果:(“Hello”,4)只是一个Value,而不是Key:"Hello",value:4。但输出到文件系统时需要KV的格式,现在只有Value,所以需要造个KEY。
defsaveAsTextFile(path: String){this.map(x => (NullWritable.get())),new Text(x.toStirng)).saveAsHadoopFile[TextOutputFormat[NullWritable,Text]](path)}
this.map把当前的值(x)变成tuple。tuple的Key是Null,Value是(“Hello”,4)。
为什么要为样?因为saveAsHadoopFile时要求以这样的格式输出。Hadoop需要KV的格式!!
map操作时把key舍去了,输出时就需要通过生成Key。
第一个Stage有哪些RDD?HadoopRDD、MapPartitionsRDD、MapPartitionsRDD、MapPartitionsRDD、MapPartitionsRDD
第二个Stage有哪些RDD?ShuffledRDD、MapPartitionsRDD
只有Collect 或saveAsTextFile会触发作业,其他的时候都没有触发作业(Lazy)
@refuil 2016-07-19
10:35 字数 13686 阅读 0spark快速入门与WordCount程序机制深度解析 spark研习第二季
标签:
原文地址:http://blog.csdn.net/refuil/article/details/51991616