码迷,mamicode.com
首页 > 数据库 > 详细

【sparkSQL】创建DataFrame

时间:2018-05-15 14:01:21      阅读:205      评论:0      收藏:0      [点我收藏+]

标签:表名   examples   path   builder   list   定义   AC   tin   .sql   

首先我们要创建SparkSession

val spark = SparkSession.builder()
                        .appName("test")
                        .master("local")
                        .getOrCreate()
import spark.implicits._ //将RDD转化成为DataFrame并支持SQL操作        

然后我们通过SparkSession来创建DataFrame

1.使用toDF函数创建DataFrame

 通过导入(importing)spark.implicits, 就可以将本地序列(seq), 数组或者RDD转为DataFrame。

 只要这些数据的内容能指定数据类型即可。

import spark.implicits._
val df = Seq(
  (1, "zhangyuhang", java.sql.Date.valueOf("2018-05-15")),
  (2, "zhangqiuyue", java.sql.Date.valueOf("2018-05-15"))
).toDF("id", "name", "created_time")

技术分享图片

注意:如果直接用toDF()而不指定列名字,那么默认列名为"_1", "_2"

我们可以通过df.withColumnRenamed("_1", "newName1").withColumnRenamed("_2", "newName2")进行修改列名

2.使用createDataFrame函数创建DataFrame

通过schema + row 来创建

import org.apache.spark.sql.types._
//定义dataframe的结构的schema
val schema = StructType(List(
    StructField("id", IntegerType, nullable = false),
    StructField("name", StringType, nullable = true),
    StructField("create_time", DateType, nullable = true)
))
//定义dataframe内容的rdd
val rdd = sc.parallelize(Seq(
  Row(1, "zhangyuhang", java.sql.Date.valueOf("2018-05-15")),
  Row(2, "zhangqiuyue", java.sql.Date.valueOf("2018-05-15"))
))
//创建dataframe
val df = spark.createDataFrame(rdd, schema)

技术分享图片

3.通过文件直接创建DataFrame

 (1)使用parquet文件创建  

val df = spark.read.parquet("hdfs:/path/to/file")

 (2)使用json文件创建

val df = spark.read.json("examples/src/main/resources/people.json")

 (3)使用csv文件创建

val df = spark.read
        .format("com.databricks.spark.csv")
        .option("header", "true") //reading the headers
        .option("mode", "DROPMALFORMED")
        .load("csv/file/path")

 (4)使用Hive表创建

spark.table("test.person") // 库名.表名 的格式
     .registerTempTable("person")  // 注册成临时表
spark.sql(
      """
        | select *
        | from person
        | limit 10
      """.stripMargin).show()

记得,最后我们要调用spark.stop()来关闭SparkSession。  

 

 

 

【sparkSQL】创建DataFrame

标签:表名   examples   path   builder   list   定义   AC   tin   .sql   

原文地址:https://www.cnblogs.com/zzhangyuhang/p/9040442.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!