标签:hadoop 云计算 spark hdfs 大数据 mapreduce
大家可能都知道很熟悉Spark的两种常见的数据读取方式(存放到RDD中):(1)、调用parallelize函数直接从集合中获取数据,并存入RDD中;Java版本如下:
1  | JavaRDD<Integer> myRDD = sc.parallelize(Arrays.asList(1,2,3));  | 
Scala版本如下:
1  | val myRDD= sc.parallelize(List(1,2,3))  | 
这种方式很简单,很容易就可以将一个集合中的数据变成RDD的初始化值;更常见的是(2)、从文本中读取数据到RDD中,这个文本可以是纯文本文件、可以是sequence文件;可以存放在本地(file://)、可以存放在HDFS(hdfs://)上,还可以存放在S3上。其实对文件来说,Spark支持Hadoop所支持的所有文件类型和文件存放位置。Java版如下:
01  | /////////////////////////////////////////////////////////////////////  | 
05  | http://www.yfteach.com  | 
06  | 
07  | 云帆大数据博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货  | 
08  | 微信公共帐号:yfteach  | 
09  | /////////////////////////////////////////////////////////////////////  | 
10  | import org.apache.spark.SparkConf;  | 
11  | import org.apache.spark.api.java.JavaRDD;  | 
12  | import org.apache.spark.api.java.JavaSparkContext;  | 
13  | 
14  | SparkConf conf = new SparkConf().setAppName("Simple Application");  | 
15  | JavaSparkContext sc = new JavaSparkContext(conf);  | 
16  | sc.addFile("wyp.data");  | 
17  | JavaRDD<String> lines = sc.textFile(SparkFiles.get("wyp.data"));  | 
Scala版本如下:
1  | import org.apache.spark.SparkContext  | 
2  | import org.apache.spark.SparkConf  | 
3  | 
4  | val conf = new SparkConf().setAppName("Simple Application")  | 
5  | val sc = new SparkContext(conf)  | 
6  | sc.addFile("spam.data")  | 
7  | val inFile = sc.textFile(SparkFiles.get("spam.data"))  | 
在实际情况下,我们需要的数据可能不是简单的存放在HDFS文本中,我们需要的数据可能就存放在Hbase中,那么我们如何用Spark来读取Hbase中的数据呢?本文的所有测试是基于Hadoop 2.2.0、Hbase 0.98.2、Spark 0.9.1,不同版本可能代码的编写有点不同。本文只是简单地用Spark来读取Hbase中的数据,如果需要对Hbase进行更强的操作,本文可能不能帮你。话不多说,Spark操作Hbase的核心的Java版本代码如下:
01  | import org.apache.hadoop.conf.Configuration;  | 
02  | import org.apache.hadoop.hbase.HBaseConfiguration;  | 
03  | import org.apache.hadoop.hbase.client.Result;  | 
04  | import org.apache.hadoop.hbase.client.Scan;  | 
05  | import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  | 
06  | import org.apache.hadoop.hbase.mapreduce.TableInputFormat;  | 
07  | import org.apache.hadoop.hbase.protobuf.ProtobufUtil;  | 
08  | import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;  | 
09  | import org.apache.hadoop.hbase.util.Base64;  | 
10  | import org.apache.hadoop.hbase.util.Bytes;  | 
11  | import org.apache.spark.api.java.JavaPairRDD;  | 
12  | import org.apache.spark.api.java.JavaSparkContext;  | 
13  | 
31  | 
23  | 
24  | JavaSparkContext sc = new JavaSparkContext(master, "hbaseTest",  | 
25  | System.getenv("SPARK_HOME"), System.getenv("JARS"));  | 
26  | 
27  | Configuration conf = HBaseConfiguration.create();  | 
28  | Scan scan = new Scan();  | 
29  | scan.addFamily(Bytes.toBytes("cf"));  | 
30  | scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("airName"));  | 
32  | try {  | 
33  | String tableName = "flight_wap_order_log";  | 
34  | conf.set(TableInputFormat.INPUT_TABLE, tableName);  | 
35  | ClientProtos.Scan proto = ProtobufUtil.toScan(scan);  | 
36  | String ScanToString = Base64.encodeBytes(proto.toByteArray());  | 
37  | conf.set(TableInputFormat.SCAN, ScanToString);  | 
38  | 
39  | JavaPairRDD<ImmutableBytesWritable, Result> myRDD =  | 
40  | sc.newAPIHadoopRDD(conf, TableInputFormat.class,  | 
41  | ImmutableBytesWritable.class, Result.class);  | 
42  | 
43  | catch (Exception e) {  | 
44  | e.printStackTrace();  | 
45  | }  | 
这样本段代码段是从Hbase表名为flight_wap_order_log的数据库中读取cf列簇上的airName一列的数据,这样我们就可以对myRDD进行相应的操作:
1  | System.out.println(myRDD.count());  | 
本段代码需要在pom.xml文件加入以下依赖:
01  | <dependency>  | 
02  | <groupId>org.apache.spark</groupId>  | 
03  | <artifactId>spark-core_2.10</artifactId>  | 
04  | <version>0.9.1</version>  | 
05  | </dependency>  | 
06  | 
07  | <dependency>  | 
08  | <groupId>org.apache.hbase</groupId>  | 
09  | <artifactId>hbase</artifactId>  | 
10  | <version>0.98.2-hadoop2</version>  | 
11  | </dependency>  | 
12  | 
13  | <dependency>  | 
14  | <groupId>org.apache.hbase</groupId>  | 
15  | <artifactId>hbase-client</artifactId>  | 
16  | <version>0.98.2-hadoop2</version>  | 
17  | </dependency>  | 
18  | 
19  | <dependency>  | 
20  | <groupId>org.apache.hbase</groupId>  | 
21  | <artifactId>hbase-common</artifactId>  | 
22  | <version>0.98.2-hadoop2</version>  | 
23  | </dependency>  | 
24  | 
25  | <dependency>  | 
26  | <groupId>org.apache.hbase</groupId>  | 
27  | <artifactId>hbase-server</artifactId>  | 
28  | <version>0.98.2-hadoop2</version>  | 
29  | </dependency>  | 
Scala版如下:
01  | import org.apache.spark._  | 
02  | import org.apache.spark.rdd.NewHadoopRDD  | 
03  | import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}  | 
04  | import org.apache.hadoop.hbase.client.HBaseAdmin  | 
05  | import org.apache.hadoop.hbase.mapreduce.TableInputFormat  | 
06  | 
22  | val conf = HBaseConfiguration.create()  | 
15  | /////////////////////////////////////////////////////////////////////  | 
16  | 
17  | object HBaseTest {  | 
18  | def main(args: Array[String]) {  | 
19  | val sc = new SparkContext(args(0), "HBaseTest",  | 
20  | System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))  | 
21  | 
23  | conf.set(TableInputFormat.INPUT_TABLE, args(1))  | 
24  | 
25  | val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],  | 
26  | classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],  | 
27  | classOf[org.apache.hadoop.hbase.client.Result])  | 
28  | 
29  | hBaseRDD.count()  | 
30  | 
31  | System.exit(0)  | 
32  | }  | 
33  | }  | 
我们需要在加入如下依赖:
1  | libraryDependencies ++= Seq(  | 
2  | "org.apache.spark" % "spark-core_2.10" % "0.9.1",  | 
3  | "org.apache.hbase" % "hbase" % "0.98.2-hadoop2",  | 
4  | "org.apache.hbase" % "hbase-client" % "0.98.2-hadoop2",  | 
5  | "org.apache.hbase" % "hbase-common" % "0.98.2-hadoop2",  | 
6  | "org.apache.hbase" % "hbase-server" % "0.98.2-hadoop2"  | 
7  | )  | 
在测试的时候,需要配置好Hbase、Hadoop环境,否则程序会出现问题,特别是让程序找到Hbase-site.xml配置文件
详情请加入QQ群:374152400 ,咨询课程顾问!
关注云帆教育微信公众号yfteach,第一时间获取公开课信息。
本文出自 “云帆大数据” 博客,请务必保留此出处http://yfteach01.blog.51cto.com/9428662/1629668
标签:hadoop 云计算 spark hdfs 大数据 mapreduce
原文地址:http://yfteach01.blog.51cto.com/9428662/1629668