码迷,mamicode.com
首页 > 其他好文 > 详细

大数据之数据存Hbase

时间:2020-07-10 00:43:32      阅读:74      评论:0      收藏:0      [点我收藏+]

标签:actor   get   create   hsi   sources   textfile   exec   trim   test   

package com.sjw.flink

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.hadoop.conf
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put}
import org.apache.hadoop.hbase.util.Bytes


object HBaseSinkTest {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val dataDS: DataStream[String] = env.readTextFile("src/main/resources/buy_log.txt")

val buyDS: DataStream[Buy] = dataDS.map(data => {
val arr: Array[String] = data.split(",")
Buy(arr(0).trim, arr(1).trim.toDouble)
})
buyDS.addSink(new MyHBaseSink())


env.execute()
}

}

case class Buy(id:String,count:Double)

class MyHBaseSink() extends RichSinkFunction[Buy]{

//建立连接
var conn:Connection = null

override def open(parameters: Configuration): Unit ={
val conf= HBaseConfiguration.create()
conf.set(HConstants.ZOOKEEPER_QUORUM,"sunjunwei1.com,sunjunwei2.com,sunjunwei3.com")
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT,"2181")
//实例化
conn = ConnectionFactory.createConnection(conf)
}

override def invoke(value: Buy, context: SinkFunction.Context[_]): Unit ={

val table = conn.getTable(TableName.valueOf("stu"))

val put = new Put(Bytes.toBytes(value.id))
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("count"),Bytes.toBytes(value.count.toString))

table.put(put)
}

override def close(): Unit = {
conn.close()
}
}

大数据之数据存Hbase

标签:actor   get   create   hsi   sources   textfile   exec   trim   test   

原文地址:https://www.cnblogs.com/whyuan/p/13276942.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!