码迷,mamicode.com
首页 > 其他好文 > 详细

spark 数据分析 之数据清理

时间:2020-04-26 21:00:10      阅读:120      评论:0      收藏:0      [点我收藏+]

标签:rds   res   spl   input   output   double   lis   highlight   textfile   

//清理格式不匹配的数据

//此代码可以实现自动滤除掉无法转化为double类型的数据

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple7;

import java.util.ArrayList;
import java.util.List;

public class Filter {
    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setMaster("local").setAppName("filter");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        jsc.setLogLevel("Error");

        String inputPath = "./data/hqf.csv";
        JavaRDD<String> inputs = jsc.textFile(inputPath);
        //ambient,coolant,u_d,u_q,motor_speed,torque,i_d,i_q,pm,stator_yoke,stator_tooth,stator_winding,profile_id
        // 0        1       2   3         4      5     6   7  8    9            10           11          12          

        JavaRDD<ArrayList<Tuple7<String, String, String, String, String, String, String>>> mapRDD = inputs.map(new Function<String, ArrayList<Tuple7<String, String, String, String, String, String, String>>>() {
            @Override
            public ArrayList<Tuple7<String, String, String, String, String, String, String>> call(String one) throws Exception {
                String[] words = one.split(",");
                String others = "0.000000";
                int result = 0;
                int offset = 0;
                String[] tpres = new String[28];
                //ArrayList<Tuple7<String, String, String, String, String, String, String>> transList = new ArrayList<>();

                for (String word : words) {
                    result = isNumDouble(word);
                    if (result == -1) {
                        tpres[offset] = "error";
                        offset++;
                    }else {
                        tpres[offset] = word;
                        offset++;
                    }

                }
                ArrayList<Tuple7<String, String, String, String, String, String, String>> list = new ArrayList<>();
                Tuple7<String, String, String, String, String, String, String> tp1 = new Tuple7<>(tpres[0], tpres[1], tpres[2], tpres[3], tpres[4], tpres[5], tpres[6]);
//                Tuple7<String, String, String, String, String, String, String> tp2 = new Tuple7<>(tpres[7], tpres[8], tpres[9], tpres[10], tpres[11], tpres[12], tpres[13]);
//                Tuple7<String, String, String, String, String, String, String> tp3 = new Tuple7<>(tpres[14], tpres[15], tpres[16], tpres[17], tpres[18], tpres[20], tpres[21]);
//                Tuple7<String, String, String, String, String, String, String> tp4 = new Tuple7<>(tpres[22], tpres[23], tpres[24], tpres[25], others, others, others);

                list.add(tp1);
//                list.add(tp2);
//                list.add(tp3);
//                list.add(tp4);
                return list;
            }
        });
        JavaRDD<ArrayList<Tuple7<String, String, String, String, String, String, String>>> filterRDD = mapRDD.filter(new Function<ArrayList<Tuple7<String, String, String, String, String, String, String>>, Boolean>() {
            @Override
            public Boolean call(ArrayList<Tuple7<String, String, String, String, String, String, String>> lines) throws Exception {
                for (Tuple7<String, String, String, String, String, String, String> one : lines) {
                    if ("error".equals(one._1())) {
                        return false;
                    } else if ("error".equals(one._2())) {
                        return false;
                    } else if ("error".equals(one._3())) {
                        return false;
                    } else if ("error".equals(one._4())) {
                        return false;
                    } else if ("error".equals(one._5())) {
                        return false;
                    } else if ("error".equals(one._6())) {
                        return false;
                    } else if ("error".equals(one._7())) {
                        return false;
                    }
                }
                return true;
            }
        });
        String outputPath = "./data/result.csv";
        List<ArrayList<Tuple7<String, String, String, String, String, String, String>>> take = filterRDD.take(100);

        for (ArrayList<Tuple7<String, String, String, String, String, String, String>> ls:take){
            for (Tuple7<String, String, String, String, String, String, String> elem : ls){
                System.err.println(elem._1()+"\t" +
                                       elem._2()+"\t" +
                                       elem._3()+"\t" +
                                       elem._4()+"\t" +
                                       elem._5()+"\t" +
                                       elem._6()+"\t" +
                                       elem._7());
            }
        }

        filterRDD.foreach(new VoidFunction<ArrayList<Tuple7<String, String, String, String, String, String, String>>>() {
            @Override
            public void call(ArrayList<Tuple7<String, String, String, String, String, String, String>> lines) throws Exception {
                for (Tuple7<String, String, String, String, String, String, String> elem:lines){
                    System.err.println(elem._1()+"\t" +
                                       elem._2()+"\t" +
                                       elem._3()+"\t" +
                                       elem._4()+"\t" +
                                       elem._5()+"\t" +
                                       elem._6()+"\t" +
                                       elem._7());
                }
            }
        });
        filterRDD.saveAsTextFile(outputPath);

    }
    public static int isNumDouble(String word){
        try {
            Double.parseDouble(word);
        }catch (Exception e){
            return -1;
        }
        return 0;
    }
}

  

spark 数据分析 之数据清理

标签:rds   res   spl   input   output   double   lis   highlight   textfile   

原文地址:https://www.cnblogs.com/walxt/p/12781954.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!