标签:
SparkConf conf =new SparkConf().setMaster("local").setAppName("analysis");
JavaSparkContext sc =new JavaSparkContext(conf);
JavaRDD<String> rdd=sc.textFile("c:\\tests.txt");
final Accumulator<Integer> blankLines =sc.accumulator(0);
JavaRDD<String> callSigns = rdd.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) throws Exception {
if (s.equals(""))
blankLines.add(1);
return Arrays.asList(s.split(" "));
}
}
);
callSigns.saveAsTextFile("output.txt");
System.out.println("Blank Lines: "+blankLines.value());
//广播变量,每个的国家区号表final Broadcast<String[]> signPrefixes =sc.broadcast(loadCallSignTable());JavaPairRDD<String,Integer> countryContactCounts =contactCounts.mapToPair(new PairFunction<Tuple2<String,Integer>,String,Integer>(){public Tuple2<String,Integer> call<Tuple2<String,Integer>callSignCount){//获取区号前缀String sign=callSignCount._1();//获取该前缀,和广播变量比较,得到其对应的国家String country=lookupCountry(sign,signPrefixed.value());//将(电话,通信次数)转换为(国家,通信次数)PairRDDreturn new Tuple2(country,callSignCount._2());}}//针对同一个国家做归并,求每一个国家的通信次数之和).reduceByKey(new SumInts());//保存结果contryContactCounts.saveAsTextFile(outputDir+"/countrys.txt")
/*我们有一个在线的业余呼号数据库,可以用这个数据库查询日志中级路过的联系人呼号列表。可以通过基于分区的操作,在每个分区中共享一个数据库连接池*/JavaPairRDD<String,CallLog[]>contactsContactLists =validCallSigns.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>,String,CallLog[]>(){public Iterable<Tuple2<String,CallLog[]>> call(Iterator<String> input){ArrayList<Tuple2<String,CallLog[]>> callSignLogs =new ArrayList<Tuple2<String, CallLog[]>>();Arraylist<Tuple2<String,ContentExchange>> requests =new ArrayList<Tuple2<String,ContentExchange>>();ObjectMapper mapper=createMapper();HttpClient client =new HttpClient();try{client.start();while(input.hasNext()){requests.add(createRequestForSign(input.next(),client));}for(Tuple2<String,ContentExchange> signExchange :requests){callSignLogs.add(fetchResultFromRequest(mapper,signExchange));}}catch(Exception e){e.printStackTrace();}return callSignLogs;}});
| 函数名 | 调用所提供的 | 返回的 | 对于RDD[T]的函数签名 |
| mapPartitions() | 该分区中元素的迭代器 | 返回的元素的迭代器 | f:(Iteraotr[T]) -> Iterator[U] |
| mapPartitionsWithIndex() | 分区序号,以及每个分区中的元素迭代器 | 返回的元素的迭代器 | f:(Int,Iterator[T]) -> Iterator[U] |
| foreachPartitions() | 元素迭代器 | 无 | f:(Iterator[T]) ->Unit |
JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8));
JavaRDD<Tuple2<Integer, Integer>> middle = rdd.mapPartitions(
new FlatMapFunction<Iterator<Integer>, Tuple2<Integer, Integer>>() {
public Iterable<Tuple2<Integer, Integer>> call(Iterator<Integer> integerIterator) throws Exception {
List<Tuple2<Integer,Integer>> list =new ArrayList<Tuple2<Integer, Integer>>();
int sum=0;
int num=0;
while(integerIterator.hasNext()){
sum+=integerIterator.next();
num++;
}
list.add(new Tuple2<Integer, Integer>(sum,num));//每个分区只创建了一次Tuple2对象
return list;
}
}
);
Tuple2<Integer, Integer> reduce = middle.reduce(
new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) throws Exception {
return new Tuple2<Integer, Integer>(v1._1()+v2._1(),v1._2()+v2._2());
}
}
);
System.out.println("the average of the numbers is "+(double) reduce._1()/reduce._2());
| 方法 | 含义 |
| count() | RDD中元素的个数 |
| mean() | 元素的平均值 |
| sum() | 总和 |
| max() | 最大值 |
| min() | 最小值 |
| variance() | 元素的方差 |
| sampleVariance() | 从采样中计算出的方差 |
| stdev() | 标准差 |
| sampleStdev() | 采样的标准差 |
List<Double> list =new ArrayList<Double>();
for(int i=0;i<10;i++){
list.add(i*1.0);
}
JavaDoubleRDD rdd =sc.parallelizeDoubles(list);
System.out.println("元素个数: "+rdd.count());
System.out.println("平均值: "+rdd.mean());
System.out.println("和: "+rdd.sum());
System.out.println("方差: "+rdd.variance());
System.out.println("标准差: "+rdd.stdev());
System.out.println("采样标准差: "+rdd.sampleStdev());
public class stat extends StatCounter {
public double cv(){
if(count()==0)
return Double.NaN;
return stdev()/mean();
}
public double cv_format_percent(){
//标明是无效数据,这种情况cv的值一定大于5%
if(count()==0)
return Double.POSITIVE_INFINITY;
return
Math.abs(100*cv());
}
}
/** Add a value into this StatCounter, updating the internal statistics. */def merge(value: Double): StatCounter = {//此处mu是平均值,m2是每个数与平均数的差的平方和val delta = value - mun += 1mu += delta / nm2 += delta * (value - mu)maxValue = math.max(maxValue, value)minValue = math.min(minValue, value)this}
标签:
原文地址:http://www.cnblogs.com/zhoudayang/p/5008018.html