标签:统计 接下来 双子座 table 字符 creat foreach 类型 group by
最近看到有个例子很有意思,特地重现下。
这一步,我们使用Spark SQL按照星座对2000W数据进行分组统计, 看看哪个星座的人最喜欢开房。
当然, 使用纯Spark也可以完成我们的分析, 因为实际Spark SQL最终是利用Spark来完成的。
实际测试中发现这些数据并不是完全遵守一个schema, 有些数据的格式是不对的, 有些数据的数据项也是错误的。 在代码中我们要剔除那么干扰数据。
反正我们用这个数据测试者玩, 并没有严格的要求去整理哪些错误数据。
先看代码:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)import sqlContext.createSchemaRDDcase class  Customer(name: String, gender: String, ctfId: String, birthday: String, address: String)val customer = sc.textFile("/mnt/share/2000W/*.csv").map(_.split(",")).filter(line => line.length > 7).map(p => Customer(p(0), p(5), p(4), p(6), p(7))).distinct()customer.registerTempTable("customer")def toInt(s: String):Int = {try {s.toInt} catch {case e:Exception => 9999}}def myfun(birthday: String) : String = {var rt = "未知"if (birthday.length == 8) {val md = toInt(birthday.substring(4))if (md >= 120 & md <= 219)rt = "水瓶座"else if  (md >= 220 & md <= 320)rt = "双鱼座"else if  (md >= 321 & md <= 420)rt = "白羊座"else if  (md >= 421 & md <= 521)rt = "金牛座"else if  (md >= 522 & md <= 621)rt = "双子座"else if  (md >= 622 & md <= 722)rt = "巨蟹座"else if  (md >= 723 & md <= 823)rt = "狮子座"else if  (md >= 824 & md <= 923)rt = "处女座"else if  (md >= 924 & md <= 1023)rt = "天秤座"else if  (md >= 1024 & md <= 1122)rt = "天蝎座"else if  (md >= 1123 & md <= 1222)rt = "射手座"else if  ((md >= 1223 & md <= 1231) | (md >= 101 & md <= 119))rt = "摩蝎座"elsert = "未知"}rt}sqlContext.registerFunction("constellation", (x:String) => myfun(x))var result = sqlContext.sql("SELECT constellation(birthday), count(constellation(birthday)) FROM customer group by constellation(birthday)")result.collect().foreach(println)为了使用spark sql,你需要引入 sqlContext.createSchemaRDD. Spark sql一个核心对象就是SchemaRDD。 上面的import可以隐式的将一个RDD转换成SchemaRDD。
接着定义了Customer类,用来映射每一行的数据, 我们只使用每一行很少的信息, 像地址,email等都没用到。
接下来从2000W文件夹中读取所有的csv文件, 创建一个RDD并注册表customer。
因为没有一个内建的函数可以将出生一起映射为星座, 所以我们需要定义一个映射函数myfun, 并把它注册到SparkContext中。 这样我们就可以在sql语句中使用这个函数。 类似地,字符串的length函数当前也不支持, 你可以增加一个这样的函数。 因为有的日期不正确,所有特别增加了一个”未知”的星座。 错误数据可能有两种, 一是日期出错, 而是此行格式不对,将其它字段映射成了出生日期。 我们在分析的时候忽略它们好了。
然后执行一个分组的sql语句。这个sql语句查询结果类型为SchemaRDD, 也继承了RDD所有的操作。
最后将结果打印出来。
| [双子座,1406018][双鱼座,1509839][摩蝎座,2404812][金牛座,1406225][水瓶座,1635358][巨蟹座,1498077][处女座,1666009][天秤座,1896544][白羊座,1409838][射手座,1614915][未知,160483][狮子座,1613529] | 
看起来魔蝎座的人最喜欢开房了, 明显比其它星座的人要多。
我们也可以分析一下开房的男女比例:
| 
1 
2 
3 | ......result = sqlContext.sql("SELECT gender, count(gender) FROM customer where gender = ‘F‘ or gender = ‘M‘ group by gender")result.collect().foreach(println) | 
结果显示男女开房的人数大约是2:1
| 
1 
2 | [F,6475461][M,12763926] | 
标签:统计 接下来 双子座 table 字符 creat foreach 类型 group by
原文地址:http://www.cnblogs.com/hd-zg/p/6187927.html