码迷,mamicode.com
首页 > 其他好文 > 详细

spark 省份次数统计实例

时间:2019-06-25 13:15:18      阅读:90      评论:0      收藏:0      [点我收藏+]

标签:sip   nts   var   partition   使用   enc   char   park   conf   

//统计access.log文件里面IP地址对应的省份,并把结果存入到mysql



package access1 import java.sql.DriverManager import org.apache.spark.broadcast.Broadcast import org.apache.spark.{SparkConf, SparkContext}
object AccessIp { def main(args: Array[String]): Unit = { //new sc val conf = new SparkConf () .setAppName ( this.getClass.getSimpleName ) .setMaster ( "local[*]" ) val sc = new SparkContext ( conf ) //读取数据 val accesslines = sc.textFile ( "D:\\学习笔记\\资料汇总\\day02\\资料\\省份次数统计的数据\\access.log" ) val iplines = sc.textFile ( "D:\\学习笔记\\资料汇总\\day02\\资料\\省份次数统计的数据\\ip.txt" ) //处理数据 val ip1 = iplines.map ( tp => { val splits = tp.split ( "[|]" ) val start = splits ( 2 ).toLong val end = splits ( 3 ).toLong val province = splits ( 6 ) (start, end, province) } ).collect () //广播变量(这里使用是不对,当数据使用三次的时候,在使用广播变量,否则会占内存) val broads: Broadcast[Array[(Long, Long, String)]] = sc.broadcast ( ip1 ) //处理数据 val result2 = accesslines.map ( tp => { val splits = tp.split ( "[|]" ) val ip = splits ( 1 ) val ips = MyUtils.ip2Long ( ip ) val valiues: Array[(Long, Long, String)] = broads.value val index = MyUtils.binarSearch ( valiues, ips ) var province = "" if (index != -1) { province = valiues ( index )._3 } (province, 1) } ).reduceByKey ( _ + _ ).sortBy ( -_._2 ) //写入mysql result2.foreachPartition ( filter => { //获取mysql的链接 val connection = DriverManager.getConnection ( "jdbc:mysql://localhost:3306/test1?characterEncoding=UTF-8&serverTimezone=GMT%2B8", "root", "123456" ) filter.foreach ( tp => { val ps = connection.prepareStatement ( "insert into suibian values(?,?)" ) //设置参数 ps.setString ( 1, tp._1 ) ps.setInt ( 2, tp._2 ) //提交 ps.executeLargeUpdate () ps.close () } ) connection.close () } ) sc.stop () broads.unpersist ( true ) } }
package access1

object MyUtils {
  //ip地址转换为lang类型
  def ip2Long(ip: String): Long = {
    val fragments = ip.split ( "[.]" )
    var ipNum = 0L
    for (i <- 0 until fragments.length) {
      ipNum = fragments ( i ).toLong | ipNum << 8L
    }
    ipNum
  }

  //二分查找法
  def binarSearch(array: Array[(Long, Long, String)], target: Long): Int = {
    var low = 0
    var high = array.length - 1

    while (low <= high) {
      var mid = low + ( high - low ) / 2
      if (array ( mid )._1 <= target && array ( mid )._2 >= target) {
        return mid
      } else if (array ( mid )._1 > target) {
        high = mid - 1
      } else {
        low = mid + 1
      }
    }
    return -1
  }
}

 

spark 省份次数统计实例

标签:sip   nts   var   partition   使用   enc   char   park   conf   

原文地址:https://www.cnblogs.com/wangshuang123/p/11082113.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!