码迷,mamicode.com
首页 > 编程语言 > 详细

spark2.x由浅入深深到底系列六之RDD java api调用scala api的原理

时间:2017-09-18 11:03:03      阅读:213      评论:0      收藏:0      [点我收藏+]

标签:spark   大数据   javaapi   老汤   rdd   

RDD java api其实底层是调用了scala的api来实现的,所以我们有必要对java api是怎么样去调用scala api,我们先自己简单的实现一个scala版本和java版本的RDD和SparkContext


一、简单实现scala版本的RDD和SparkContext

class RDD[T](value: Seq[T]) {
  //RDD的map操作
  def map[U](f: T => U): RDD[U] = {
    new RDD(value.map(f))
  }
  
  def iterator[T] = value.iterator
  
}

class SparkContext {
  //创建一个RDD
  def createRDD(): RDD[Integer] = new RDD[Integer](Seq(1, 2, 3))

}


二、简单实现java版本的RDD和SparkContext

//这个时java中的一个接口
//我们可以将scala中的map需要的函数其实就是对应着java中的一个接口
package com.twq.javaapi.java7.function;
public interface Function<T1, R> extends Serializable {
  R call(T1 v1) throws Exception;
}

//这边实现的java版的RDD和SparkContext其实还是用scala代码实现,只不过这些scala代码可以被java代码调用了
import java.util.{Iterator => JIterator}
import scala.collection.JavaConverters._
import com.twq.javaapi.java7.function.{Function => JFunction}
//每一个JavaRDD都会含有一个scala的RDD,用于调用该RDD的api
class JavaRDD[T](val rdd: RDD[T]) {

  def map[R](f: JFunction[T, R]): JavaRDD[R] =
    //这里是关键,调用scala RDD中的map方法
    //我们将java的接口构造成scala RDD的map需要的函数函数
    new JavaRDD(rdd.map(x => f.call(x)))
  //我们需要将scala的Iterator转成java版的Iterator
  def iterator: JIterator[T] = rdd.iterator.asJava

}

//每个JavaSparkContext含有一个scala版本的SparkContext
class JavaSparkContext(sc: SparkContext) {
  def this() = this(new SparkContext())
  //转调scala版本的SparkContext来实现JavaSparkContext的功能
  def createRDD(): JavaRDD[Integer] = new JavaRDD[Integer](sc.createRDD())
}

三、写java代码调用rdd java api

package com.twq.javaapi.java7;

import com.twq.javaapi.java7.function.Function;
import com.twq.rdd.api.JavaRDD;
import com.twq.rdd.api.JavaSparkContext;

import java.util.Iterator;

/**
 * Created by tangweiqun on 2017/9/16.
 */
public class SelfImplJavaRDDTest {
    public static void main(String[] args) {
        //初始化JavaSparkContext
        JavaSparkContext jsc = new JavaSparkContext();
        //调用JavaSparkContext的api创建一个RDD
        JavaRDD<Integer> firstRDD = jsc.createRDD();
        //对创建好的firstRDD应用JavaRDD中的map操作
        JavaRDD<String> strRDD = firstRDD.map(new Function<Integer, String>() {
            @Override
            public String call(Integer v1) throws Exception {
                return v1 + "test";
            }
        });
        //将得到的RDD的结果打印,结果为
        //1test
        //2test
        //3test
        Iterator<String> result = strRDD.iterator();
        while (result.hasNext()) {
            System.out.println(result.next());
        }
    }
}


以上就是RDD java api调用scala api的实现原理,虽然只举了map操作,但是其他的类似于flatMap操作的实现都是类似的


接下来可以详细了解RDD java的每一个api


我们可以spark core RDD api来详细理解scala中的每一个api。。。

spark2.x由浅入深深到底系列六之RDD java api调用scala api的原理

标签:spark   大数据   javaapi   老汤   rdd   

原文地址:http://7639240.blog.51cto.com/7629240/1966131

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!